Docker MQTT安装使用教程
MQTT简介
MQTT(MessageQueuingTelemetryTransport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和制动器(比如通过Twitter让房屋联网)的通信协议。
Docker安装RabbitMQ配置MQTT
使用RabbitMQ作为MQTT服务端,EclipsePaho作为客户端。宿主机系统为ubuntu16.04
Docker下载镜像
dockerpulldaocloud.io/library/rabbitmq:3.7.4
启动RabbitMQ
dockerrun-d--hostnamemy-rabbit--namesome-rabbit-p15672:15672-p5672:5672-p1883:1883-p15675:15675daocloud.io/library/rabbitmq:3.7.4
注意映射容器端口
- 15672是rabbitmqmanagement管理界面默认访问端口
- 5672是amqp默认端口
- 1883是mqtttcp协议默认端口
- 15675是web_mqttwebsocket协议默认端口
启用插件
默认安装后我们需要手动开启rabbitmq_management插件,rabbitmq_mqtt插件和rabbitmq_web_mqtt插件。
执行如下三条命令
dockerexec<容器ID>rabbitmq-pluginsenablerabbitmq_management dockerexec<容器ID>rabbitmq-pluginsenablerabbitmq_mqtt dockerexec<容器ID>rabbitmq-pluginsenablerabbitmq_web_mqtt
当然你也可以写个脚本start.sh,复制到容器中
/usr/sbin/rabbitmq-pluginsenablerabbitmq_management /usr/sbin/rabbitmq-pluginsenablerabbitmq_mqtt /usr/sbin/rabbitmq-pluginsenablerabbitmq_web_mqtt
进入容器执行这个脚本。
shstart.sh
开放宿主机端口
firewall-cmd--zone=public--add-port=15672/tcp--permanent firewall-cmd--zone=public--add-port=5672/tcp--permanent firewall-cmd--zone=public--add-port=1883/tcp--permanent firewall-cmd--zone=public--add-port=15675/tcp--permanent firewall-cmd--reload
PythonMQTT客户端实现
安装python包
pipinstallpaho-mqtt
发送数据demo(消费者)
#使用前需要启动hbase和thrift服务器
#启动hbase在cd/usr/local/hbase下bin/start-hbase.sh默认端口为60000
#启动thrift服务器cd/usr/local/hbase/bin执行./hbase-daemon.shstartthrift默认端口为9090
importsys
importos
dir_common=os.path.split(os.path.realpath(__file__))[0]+'/../'
sys.path.append(dir_common)#将根目录添加到系统目录,才能正常引用common文件夹
importargparse#
importlogging
importtime,datetime
fromcommon.py_logimportinit_logger,init_console_logger
fromcommon.configimport*
fromcommon.py_hbaseimportPyHbase
importtime,json
fromcommon.py_rabbitimportRabbit_Consumer
importpaho.mqtt.clientasmqtt
importtime
HOST="192.168.2.46"
PORT=1883
defclient_loop():
client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
client=mqtt.Client(client_id)#ClientId不能重复,所以使用当前时间
client.username_pw_set("guest","guest")#必须设置,否则会返回「Connectedwithresultcode4」
client.on_connect=on_connect
client.on_message=on_message
client.connect(HOST,PORT,60)
client.loop_forever()
defon_connect(client,userdata,flags,rc):
print("Connectedwithresultcode"+str(rc))
client.subscribe("test")
defon_message(client,userdata,msg):
print(msg.topic+""+msg.payload.decode("utf-8"))
if__name__=='__main__':
client_loop()
接收数据demo(生产者)
importsys
importos
dir_common=os.path.split(os.path.realpath(__file__))[0]+'/../'
sys.path.append(dir_common)#将根目录添加到系统目录,才能正常引用common文件夹
importpaho.mqtt.clientasmqtt
importtime
HOST="192.168.2.46"
PORT=1883
defclient_loop():
client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
client=mqtt.Client(client_id)#ClientId不能重复,所以使用当前时间
client.username_pw_set("guest","guest")#必须设置,否则会返回「Connectedwithresultcode4」
client.on_connect=on_connect
client.on_message=on_message
client.connect(HOST,PORT,60)
client.loop_forever()
defon_connect(client,userdata,flags,rc):
print("Connectedwithresultcode"+str(rc))
client.subscribe("test")
defon_message(client,userdata,msg):
print(msg.topic+""+msg.payload.decode("utf-8"))
if__name__=='__main__':
client_loop()
生产者demo
#importpaho.mqtt.clientasmqtt
importpaho.mqtt.publishaspublish
importtime
HOST="192.168.2.46"
PORT=1883
defon_connect(client,userdata,flags,rc):
print("Connectedwithresultcode"+str(rc))
client.subscribe("test")
defon_message(client,userdata,msg):
print(msg.topic+""+msg.payload.decode("utf-8"))
if__name__=='__main__':
client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
#client=mqtt.Client(client_id)#ClientId不能重复,所以使用当前时间
#client.username_pw_set("guest","guest")#必须设置,否则会返回「Connectedwithresultcode4」
#client.on_connect=on_connect
#client.on_message=on_message
#client.connect(HOST,PORT,60)
#client.publish("test","你好MQTT",qos=0,retain=False)#发布消息
publish.single("test","你好MQTT",qos=1,hostname=HOST,port=PORT,client_id=client
官方文档:
mqtthttp://www.rabbitmq.com/mqtt.html
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对毛票票的支持。如果你想了解更多相关内容请查看下面相关链接