解决python3 pika之连接断开的问题
问题描述
在消费rabbitMQ队列时,每次进入回调函数内需要进行一些比较耗时的操作;操作完成后给rabbitMQserver发送ack信号以dequeue本条消息。
问题就发生在发送ack操作时,程序提示链接已被断开或socketerror。
源码示例
#!/usr/bin #coding:utf-8 importpika importtime USER='guest' PWD='guest' TEST_QUEUE='just4test' defcallback(ch,method,properties,body): print(body) time.sleep(600) ch.basic_publish('',routing_key=TEST_QUEUE,body="fortest") ch.basic_ack(delivery_tag=method.delivery_tag) deftest_main(): s_conn=pika.BlockingConnection( pika.ConnectionParameters('127.0.0.1', credentials=pika.PlainCredentials(USER,PWD))) chan=s_conn.channel() chan.queue_declare(queue=TEST_QUEUE) chan.basic_publish('',routing_key=TEST_QUEUE,body="fortest") chan.basic_consume(callback,queue=TEST_QUEUE) chan.start_consuming() if__name__=="__main__": test_main()
运行一段时间后,就会报错:
[ERROR][pika.adapters.base_connection][2017-08-1812:33:49]Errorevent25,None [CRITICAL][pika.adapters.base_connection][2017-08-1812:33:49]Triedtohandleanerrorwherenoerrorexisted [ERROR][pika.adapters.base_connection][2017-08-1812:33:49]FatalSocketError:BrokenPipeError(32,'Brokenpipe')
问题排查
猜测:pika客户端没有及时发送心跳,连接被server断开
一开始修改了heartbeat_interval参数值,示例如下:
deftest_main(): s_conn=pika.BlockingConnection( pika.ConnectionParameters('127.0.0.1', heartbeat_interval=10, socket_timeout=5, credentials=pika.PlainCredentials(USER,PWD))) #....
修改后运行依然报错,后来想想应该单线程被一直占用,pika无法发送心跳;
于是又加了个心跳线程,示例如下:
#!/usr/bin #coding:utf-8 importpika importtime importlogging importthreading USER='guest' PWD='guest' TEST_QUEUE='just4test' classHeartbeat(threading.Thread): def__init__(self,connection): super(Heartbeat,self).__init__() self.lock=threading.Lock() self.connection=connection self.quitflag=False self.stopflag=True self.setDaemon(True) defrun(self): whilenotself.quitflag: time.sleep(10) self.lock.acquire() ifself.stopflag: self.lock.release() continue try: self.connection.process_data_events() exceptExceptionasex: logging.warn("Errorformat:%s"%(str(ex))) self.lock.release() return self.lock.release() defstartHeartbeat(self): self.lock.acquire() ifself.quitflag==True: self.lock.release() return self.stopflag=False self.lock.release() defcallback(ch,method,properties,body): logging.info("recv_body:%s"%body) time.sleep(600) ch.basic_ack(delivery_tag=method.delivery_tag) deftest_main(): s_conn=pika.BlockingConnection( pika.ConnectionParameters('127.0.0.1', heartbeat_interval=10, socket_timeout=5, credentials=pika.PlainCredentials(USER,PWD))) chan=s_conn.channel() chan.queue_declare(queue=TEST_QUEUE) chan.basic_consume(callback, queue=TEST_QUEUE) heartbeat=Heartbeat(s_conn) heartbeat.start()#开启心跳线程 heartbeat.startHeartbeat() chan.start_consuming() if__name__=="__main__": test_main()
尝试运行,结果还是不行,不得不安静下来思考自己是不是想错了。
去看它的api,看到heartbeat_interval的解析:
:paramintheartbeat_interval:Howoftentosendheartbeats. Minbetweenthisvalueandserver'sproposal willbeused.Use0todeactivateheartbeats andNonetoacceptserver'sproposal.
按这样说法,应该还是没有把心跳值给设置好。上面的程序期望是10秒发一次心跳,但是理论上发送心跳的间隔会比10秒多一点。所以艾玛,我应该是把heartbeat_interval的作用搞错了,它是指超过这个时间间隔不发心跳或不给server任何信息,server就会断开连接,而不是说pika会按这个间隔来发心跳。结果我把heartbeat_interval值设置高一点(比实际发送心跳/信息的间隔更长),比如上面设置成60秒,就正常运行了。
如果不指定heartbeat_interval,它默认为None,意味着按rabbitMQserver的配置来检测心跳是否正常。
如果设置heartbeat_interval=0,意味着不检测心跳,server端将不会主动断开连接。
以上这篇解决python3pika之连接断开的问题就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。