SpringBoot 集成MQTT配置
本文内容纲要:
-1.前言
-2.MQTT介绍
-3.SpringBoot集成MQTT
-3.1导入mqtt库
-3.2配置MQTT订阅者
-3.3配置MQTT发布者
-3.4MQTT消息处理和发送
-4.开发常见问题
-4.1MQTT每次重连失败都会增长线程数
-4.2MQTT消息量大存在消息丢失的情况
目录
-
- 前言
-
- MQTT介绍
-
- SpringBoot集成MQTT
-
3.1导入mqtt库
-
3.2配置MQTT订阅者
-
3.3配置MQTT发布者
-
3.4MQTT消息处理和发送
- 3.4.1消息处理
- 3.4.1消息发送
-
- 开发常见问题
- 4.1MQTT每次重连失败都会增长线程数
- 4.2MQTT消息量大存在消息丢失的情况
1.前言
公司的IOT平台主要采用MQTT(消息队列遥测传输)对底层的驱动做命令下发和数据采集。也用到了redis、zeroMQ、nats等消息中间件。今天先整理SpringBoot集成MQTT笔记和工作中遇到的问题。
2.MQTT介绍
MQTTisamachine-to-machine(M2M)/"InternetofThings"connectivityprotocol.Itwasdesignedasanextremelylightweightpublish/subscribemessagingtransport.Itisusefulforconnectionswithremotelocationswhereasmallcodefootprintisrequiredand/ornetworkbandwidthisatapremium.
官网地址:http://mqtt.org/、https://www.mqtt.com/
MQTT除了具备大部分消息中间件拥有的功能外,其最大的特点就是小型传输。以减少开销,减低网络流量的方式去满足低带宽、不稳定的网络远程传输。
MQTT服务器有很多,比如Apache-Apollo和EMQX,ITDragon龙目前使用的时EMQX作为MQTT的服务器。使用也很简单,下载解压后,进入bin目录执行emqxconsole启动服务。
MQTT调试工具可以用MQTTBox
3.SpringBoot集成MQTT
3.1导入mqtt库
第一步:导入面向企业应用集成库和对应mqtt集成库
compile('org.springframework.boot:spring-boot-starter-integration')
compile('org.springframework.integration:spring-integration-mqtt')
这里要注意spring-integration-mqtt的版本。因为会存在org.eclipse.paho.client.mqttv3修复了一些bug,并迭代了新版本。但spring-integration-mqtt并没有及时更新的情况。修改方法如下
compile("org.springframework.integration:spring-integration-mqtt"){
excludegroup:"org.eclipse.paho",module:"org.eclipse.paho.client.mqttv3"
}
compile("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.2")
第二步:MQTT连接配置文件
#MQTTConfig
mqtt.server=tcp://x.x.x.x:1883
mqtt.username=xxx
mqtt.password=xxx
mqtt.client-id=clientID
mqtt.cache-number=100
mqtt.message.topic=itDragon/tags/cov
3.2配置MQTT订阅者
Inbound入站消息适配器
第一步:配置MQTT客户端工厂类DefaultMqttPahoClientFactory
第二步:配置MQTT入站消息适配器MqttPahoMessageDrivenChannelAdapter
第三步:定义MQTT入站消息通道MessageChannel
第四步:声明MQTT入站消息处理器MessageHandler
以下有些配置是冲突或者重复的,主要是体现一些重要配置。
packagecom.itdragon.server.config
importcom.itdragon.server.message.ITDragonMQTTMessageHandler
importorg.eclipse.paho.client.mqttv3.MqttConnectOptions
importorg.springframework.beans.factory.annotation.Value
importorg.springframework.context.annotation.Bean
importorg.springframework.context.annotation.Configuration
importorg.springframework.integration.annotation.ServiceActivator
importorg.springframework.integration.channel.DirectChannel
importorg.springframework.integration.core.MessageProducer
importorg.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory
importorg.springframework.integration.mqtt.core.MqttPahoClientFactory
importorg.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter
importorg.springframework.integration.mqtt.support.DefaultPahoMessageConverter
importorg.springframework.messaging.MessageChannel
importorg.springframework.messaging.MessageHandler
importjava.time.Instant
@Configuration
classMQTTConfig{
@Value("\${mqtt.server}")
lateinitvarmqttServer:String
@Value("\${mqtt.user-name}")
lateinitvarmqttUserName:String
@Value("\${mqtt.password}")
lateinitvarmqttUserPassword:String
@Value("\${mqtt.client-id}")
lateinitvarclientID:String
@Value("\${mqtt.cache-number}")
lateinitvarmaxMessageInFlight:String
@Value("\${mqtt.message.topic}")
lateinitvarmessageTopic:String
/**
*配置DefaultMqttPahoClientFactory
*1.配置基本的链接信息
*2.配置maxInflight,在mqtt消息量比较大的情况下将值设大
*/
funmqttClientFactory():MqttPahoClientFactory{
valmqttConnectOptions=MqttConnectOptions()
//配置mqtt服务端地址,登录账号和密码
mqttConnectOptions.serverURIs=arrayOf(mqttServer)
mqttConnectOptions.userName=mqttUserName
mqttConnectOptions.password=mqttUserPassword.toCharArray()
//配置最大不确定接收消息数量,默认值10,qos!=0时生效
mqttConnectOptions.maxInflight=maxMessageInFlight.toInt()
valfactory=DefaultMqttPahoClientFactory()
factory.connectionOptions=mqttConnectOptions
returnfactory
}
/**
*配置Inbound入站,消费者基本连接配置
*1.通过DefaultMqttPahoClientFactory初始化入站通道适配器
*2.配置超时时长,默认30000毫秒
*3.配置Paho消息转换器
*4.配置发送数据的服务质量0~2
*5.配置订阅通道
*/
@Bean
funitDragonMqttInbound():MessageProducer{
//初始化入站通道适配器,使用的是EclipsePahoMQTT客户端库
valadapter=MqttPahoMessageDrivenChannelAdapter(clientID+Instant.now().toEpochMilli(),mqttClientFactory(),messageTopic)
//设置连接超时时长(默认30000毫秒)
adapter.setCompletionTimeout(30000)
//配置默认Paho消息转换器(qos=0,retain=false,charset=UTF-8)
adapter.setConverter(DefaultPahoMessageConverter())
//设置服务质量
//0最多一次,数据可能丢失;
//1至少一次,数据可能重复;
//2只有一次,有且只有一次;最耗性能
adapter.setQos(0)
//设置订阅通道
adapter.outputChannel=itDragonMqttInputChannel()
returnadapter
}
/**
*配置Inbound入站,消费者订阅的消息通道
*/
@Bean
funitDragonMqttInputChannel():MessageChannel{
returnDirectChannel()
}
/**
*配置Inbound入站,消费者的消息处理器
*1.使用@ServiceActivator注解,表明所修饰的方法用于消息处理
*2.使用inputChannel值,表明从指定通道中取值
*3.利用函数式编程的思路,解耦MessageHandler的业务逻辑
*/
@Bean
@ServiceActivator(inputChannel="itDragonMqttInputChannel")
funcommandDataHandler():MessageHandler{
/*returnMessageHandler{message->
println(message.payload)
}*/
returnITDragonMQTTMessageHandler()
}
}
注意:
- 1)MQTT的客户端ID要唯一。
- 2)MQTT在消息量大的情况下会出现消息丢失的情况。
- 3)MessageHandler注意解耦问题。
3.3配置MQTT发布者
Outbound出站消息适配器
第一步:配置Outbound出站,出站通道适配器
第二步:配置Outbound出站,发布者发送的消息通道
第三步:对外提供推送消息的接口
在原有的MQTTConfig配置类的集成上补充以下内容
/**
*配置Outbound出站,出站通道适配器
*1.通过MqttPahoMessageHandler初始化出站通道适配器
*2.配置异步发送
*3.配置默认的服务质量
*/
@Bean
@ServiceActivator(inputChannel="itDragonMqttOutputChannel")
funitDragonMqttOutbound():MqttPahoMessageHandler{
//初始化出站通道适配器,使用的是EclipsePahoMQTT客户端库
valmessageHandler=MqttPahoMessageHandler(clientID+Instant.now().toEpochMilli()+"_set",mqttClientFactory())
//设置异步发送,默认是false(发送时阻塞)
messageHandler.setAsync(true)
//设置默认的服务质量
messageHandler.setDefaultQos(0)
returnmessageHandler
}
/**
*配置Outbound出站,发布者发送的消息通道
*/
@Bean
funitDragonMqttOutputChannel():MessageChannel{
returnDirectChannel()
}
/**
*对外提供推送消息的接口
*1.使用@MessagingGateway注解,配置MQTTMessageGateway消息推送接口
*2.使用defaultRequestChannel值,调用时将向其发送消息的默认通道
*3.配置灵活的topic主题
*/
@MessagingGateway(defaultRequestChannel="itDragonMqttOutputChannel")
interfaceMQTTMessageGateway{
funsendToMqtt(data:String,@Header(MqttHeaders.TOPIC)topic:String)
funsendToMqtt(data:String,@Header(MqttHeaders.QOS)qos:Int,@Header(MqttHeaders.TOPIC)topic:String)
}
注意:
- 1)发布者和订阅者的客户端ID不能相同。
- 2)消息的推送建议采用异步的方式。
- 3)消息的推送方法可以只传payload消息体,但需要配置setDefaultTopic。
3.4MQTT消息处理和发送
3.4.1消息处理
为了让消息处理函数和MQTT配置解耦,这里提供MessageHandler注册类,将消息处理的业务逻辑以函数式编程的思维注册到Handler中。
packagecom.itdragon.server.message
importorg.springframework.messaging.Message
importorg.springframework.messaging.MessageHandler
classITDragonMQTTMessageHandler:MessageHandler{
privatevarhandler:((String)->Unit)?=null
funregisterHandler(handler:(String)->Unit){
this.handler=handler
}
overridefunhandleMessage(message:Message<*>){
handler?.run{this.invoke(message.payload.toString())}
}
}
注册MessageHandler
packagecom.itdragon.server.message
importorg.slf4j.LoggerFactory
importorg.springframework.beans.factory.annotation.Autowired
importorg.springframework.stereotype.Service
importjavax.annotation.PostConstruct
@Service
classITDragonMessageDispatcher{
privatevallogger=LoggerFactory.getLogger(ITDragonMessageDispatcher::class.java)
@Autowired
lateinitvaritDragonMQTTMessageHandler:ITDragonMQTTMessageHandler
@PostConstruct
funinit(){
itDragonMQTTMessageHandler.registerHandler{itDragonMsgHandler(it)}
}
funitDragonMsgHandler(message:String){
logger.info("itdragonmqttreceivemessage:$message")
try{
//todo
}catch(ex:Exception){
ex.printStackTrace()
}
}
}
3.4.1消息发送
注入MQTT的MessageGateway,然后推送消息。
@Autowired
lateinitvarmqttGateway:MQTTConfig.MQTTMessageGateway
@Scheduled(fixedDelay=10*1000)
funsendMessage(){
mqttGateway.sendToMqtt("HelloITDragon${Instant.now()}","itDragon/tags/cov/set")
}
4.开发常见问题
4.1MQTT每次重连失败都会增长线程数
项目上线一段时间后,客户的服务器严重卡顿。原因是客户服务断网后,MQTT在每次尝试重连的过程中一直在创建新的线程,导致一个Java服务创建了上万个线程。解决方案是更新了org.eclipse.paho.client.mqttv3的版本,也是"3.1导入mqtt库"中提到的。后续就没有出现这个问题了。
4.2MQTT消息量大存在消息丢失的情况
MQTT的消息量大的情况下,既要保障数据的完整,又要保障性能的稳定。光从MQTT本身上来说,很难做到鱼和熊掌不可兼得。ITDragon龙先要理清需求:
- 1)数据的完整性,主要用于能耗的统计、报警的分析
- 2)性能的稳定性,服务器不挂????
在消息量大的情况下,ITDragon龙可以将服务质量设置成0(最多一次)以减少消息确认的开销,用来保证系统的稳定性。
将消息的服务质量设置成0后,会让消息的丢失可能性变得更大,如何保证数据的完整性?其实ITDragon龙可以在往MQTT通道推送消息之前,先将底层驱动采集的数据先异步保存到Inflxudb数据库中。
还有就是每次发送消息量不能太大,太大也会导致消息丢失。最直接的就是后端报错,比如:java.io.EOFException
和toolargemessage:xxxbytes
。但是有的场景后端没有报错,前端订阅的mqtt也没收到消息。最麻烦的是mqttbox工具因为数据量太大直接卡死。一时间真不知道把锅甩给谁。其实我们可以将消息拆包一批批发送。可以缓解这个问题????。
其实采集的数据消息,若在这一批推送过程中丢失。也会在下一批推送过程中补上。命令下发也是一样,如果下发失败,再重写下发一次。毕竟消息的丢失并不是必现的情况。也是小概率事件,系统的稳定性才是最重要的。
本文内容总结:1.前言,2.MQTT介绍,3.SpringBoot集成MQTT,3.1导入mqtt库,3.2配置MQTT订阅者,3.3配置MQTT发布者,3.4MQTT消息处理和发送,4.开发常见问题,4.1MQTT每次重连失败都会增长线程数,4.2MQTT消息量大存在消息丢失的情况,
原文链接:https://www.cnblogs.com/itdragon/p/12463050.html