python 多进程队列数据处理详解
我就废话不多说了,直接上代码吧!
#-*-coding:utf8-*-
importpaho.mqtt.clientasmqtt
frommultiprocessingimportProcess,Queue
importtime,random,os
importcamera_person_num
MQTTHOST="172.19.4.4"
MQTTPORT=1883
mqttClient=mqtt.Client()
q=Queue()
#连接MQTT服务器
defon_mqtt_connect():
mqttClient.connect(MQTTHOST,MQTTPORT,60)
mqttClient.loop_start()
#消息处理函数
defon_message_come(lient,userdata,msg):
#print(msg.topic+":"+str(msg.payload.decode("utf-8")))
q.put(msg.payload.decode("utf-8"))#放入队列
print("产生消息",msg.payload.decode("utf-8"))
#消息处理开启多进程
#p=Process(target=talk,args=("/camera/person/num/result",msg.payload.decode("utf-8")))
#p.start()
defconsumer(q,pid):
print("开启消费序列进程",pid)
whileTrue:
msg=q.get()
#p=Process(target=talk,args=("/camera/person/num/result",msg,pid))
#p.start()
talk("/camera/person/num/result",msg,pid)
#subscribe消息订阅
defon_subscribe():
mqttClient.subscribe("test123",1)#主题为"test"
mqttClient.on_message=on_message_come#消息到来处理函数
#publish消息发布
defon_publish(topic,msg,qos):
mqttClient.publish(topic,msg,qos);
#多进程中发布消息需要重新初始化mqttClient
deftalk(topic,msg,pid):
cameraPsersonNum=camera_person_num.CameraPsersonNum(msg)
t_max,t_mean,t_min=cameraPsersonNum.personNum()
#time.sleep(20)
print("消费消息",pid,msg)
mqttClient2=mqtt.Client()
mqttClient2.connect(MQTTHOST,MQTTPORT,60)
mqttClient2.loop_start()
mqttClient2.publish(topic,'{"max":'+str(t_max)+',"mean":'+str(t_mean)+',"min:"'+t_min+'}',1)
mqttClient2.disconnect()
defmain():
on_mqtt_connect()
on_subscribe()
foriinrange(1,3):
c1=Process(target=consumer,args=(q,i))
c1.start()
whileTrue:
pass
if__name__=='__main__':
main()
以上这篇python多进程队列数据处理详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。