Python RabbitMQ实现简单的进程间通信示例
RabbitMQ 消息队列
PY
threadingQueue
进程Queue父进程与子进程,或同一父进程下的多个子进程进行交互
缺点:两个不同Python文件不能通过上面两个Queue进行交互
erlong
基于这个语言创建的一种中间商
win中需要先安装erlong才能使用
rabbitmq_serverstart
安装Pythonmodule
pipinstallpika
or
easy_installpika
or
源码
rabbit 默认端口15672
查看当前时刻的队列数
rabbitmqctl.batlist_queue
exchange
在定义的时候就是有类型的,决定到底哪些queue符合条件,可以接受消息
fanout:所有bind到此exchange的queue都可以收到消息
direct:通过routingkey和exchange决定唯一的queue可以接受消息
topic:所有符合routingkey(此时可以是一个表达式)的routingkey所bind的queue都可以接受消息
表达式符号说明:
#代表一个或多个字符 *代表任何字符
RPC
remoteprocedurecall 双向传输,指令<-------->指令执行结果
实现方法: 创建两个队列,一个队列收指令,一个队列发送执行结果
用rabbitmq实现简单的生产者消费者模型
1)rabbit_producer.py
#Author:Xuefeng
importpika
connection=pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
#statementachannel
channel=connection.channel()
#createthequeue,thenameofqueueis"hello"
#durable=Truecanmakethequeuebeexist,althoughtheservicehavestoppedbefore.
channel.queue_declare(queue="hello",durable=True)
#nRabbitMQamessagecanneverbesentdirectlytoqueue,italwaysneedtogothrough
channel.basic_publish(exchange="",
routing_key="hello",
body="Helloworld!",
properties=pika.BasicPropreties(
delivery_mode=2,#makethemessagepersistence
)
)
print("[x]sent'Helloworld!'")
connection.close()
2)rabbit_consumer.py
#Author:Xuefeng
importpika
connection=pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
#statementachannel
channel=connection.channel()
channel.queue_declare(queue="hello",durable=True)
defcallback(ch,method,properties,body):
'''
Handletherecieveddata
:paramch:Theaddressofthechannel
:parammethod:Informationabouttheconnection
:paramproperties:
:parambody:
:return:
'''
print("------>",ch,method,properties)
print("[x]Recieved%r"%body)
#ackbyourself
ch.basic_ack(delivery_tag=method.delivery_tag)
#followisforconsumertoautochangewiththeability
channel.basic_qos(profetch_count=1)
#no_ack=Truerepresentthatthemessagecannotbetransfortonextconsumer,
#whenthecurrentconsumerisstopbyaccident.
channel.basic_consume(callback,#Ifhaverecievedmessage,enablethecallback()functiontohandlethemessage.
queue="hello",
no_ack=True)
print("[*]Waitingformessages.ToExitpressCTRL+C")
channel.start_consuming()
用rabbitmq中的fanout模式实现广播模式
1)fanout_rabbit_publish.py
#Author:Xuefeng
importpika
importsys
#广播模式:
#生产者发送一条消息,所有的开通链接的消费者都可以接收到消息
connection=pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
#statementachannel
channel=connection.channel()
channel.exchange_declare(exchange="logs",
type="fanout")
message=''.join(sys.argv[1:])or"info:Helloworld!"
channel.basic_publish(
exchange="logs",
routing_key="",
body=message
)
print("[x]Send%r"%message)
connection.close()
2)fanout_rabbit_consumer.py
#Author:Xuefeng
importpika
importsys
connection=pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
#statementachannel
channel=connection.channel()
#exclusive排他,唯一的随机生成queue
result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
print("Randomqueuename:",queue_name)
channel.queue_bind(exchange="logs",
queue=queue_name)
defcallback(ch,method,properties,body):
'''
Handletherecieveddata
:paramch:Theaddressofthechannel
:parammethod:Informationabouttheconnection
:paramproperties:
:parambody:
:return:
'''
print("------>",ch,method,properties)
print("[x]Recieved%r"%body)
#ackbyourself
ch.basic_ack(delivery_tag=method.delivery_tag)
#no_ack=Truerepresentthatthemessagecannotbetransfortonextconsumer,
#whenthecurrentconsumerisstopbyaccident.
channel.basic_consume(callback,#Ifhaverecievedmessage,enablethecallback()functiontohandlethemessage.
queue="hello",
no_ack=True)
print("[*]Waitingformessages.ToExitpressCTRL+C")
channel.start_consuming()
用rabbitmq中的direct模式实现消息过滤模式
1)direct_rabbit_publisher.py
#Author:Xuefeng
importpika
importsys
#消息过滤模式:
#生产者发送一条消息,通过severity优先级来确定是否可以接收到消息
connection=pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
#statementachannel
channel=connection.channel()
channel.exchange_declare(exchange="direct_logs",
type="direct")
severity=sys.argv[1]iflen(sys.argv)>1else"info"
message=''.join(sys.argv[2:])or"info:Helloworld!"
channel.basic_publish(
exchange="direct_logs",
routing_key=severity,
body=message
)
print("[x]Send%r:%r"%(severity,message))
connection.close()
2)direct_rabbit_consumer.py
#Author:Xuefeng
importpika
importsys
connection=pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
#statementachannel
channel=connection.channel()
channel.exchange_declare(exchange="direct_logs",
type="direct")
#exclusive排他,唯一的随机生成queue
result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
print("Randomqueuename:",queue_name)
severities=sys.argv[1:]
ifnotseverities:
sys.stderr.write("Usage:%s[info][warning][error]\n"%sys.argv[0])
sys.exit(1)
forseverityinseverities:
channel.queue_bind(exchange="direct_logs",
queue=queue_name,
routing_key=severity)
defcallback(ch,method,properties,body):
'''
Handletherecieveddata
:paramch:Theaddressofthechannel
:parammethod:Informationabouttheconnection
:paramproperties:
:parambody:
:return:
'''
print("------>",ch,method,properties)
print("[x]Recieved%r"%body)
#ackbyourself
ch.basic_ack(delivery_tag=method.delivery_tag)
#no_ack=Truerepresentthatthemessagecannotbetransfortonextconsumer,
#whenthecurrentconsumerisstopbyaccident.
channel.basic_consume(callback,#Ifhaverecievedmessage,enablethecallback()functiontohandlethemessage.
queue="hello",
no_ack=True)
print("[*]Waitingformessages.ToExitpressCTRL+C")
channel.start_consuming()
用rabbitmq中的topic模式实现细致消息过滤模式
1)topic_rabbit_publisher.py
#Author:Xuefeng
importpika
importsys
#消息细致过滤模式:
#生产者发送一条消息,通过运行脚本*.info等确定接收消息类型进行对应接收
connection=pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
#statementachannel
channel=connection.channel()
channel.exchange_declare(exchange="topic_logs",
type="topic")
binding_key=sys.argv[1]iflen(sys.argv)>1else"info"
message=''.join(sys.argv[2:])or"info:Helloworld!"
channel.basic_publish(
exchange="topic_logs",
routing_key=binding_key,
body=message
)
print("[x]Send%r:%r"%(binding_key,message))
connection.close()
2)topic_rabbit_consumer.py
#Author:Xuefeng
importpika
importsys
connection=pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
#statementachannel
channel=connection.channel()
channel.exchange_declare(exchange="topic_logs",
type="topic")
#exclusive排他,唯一的随机生成queue
result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
print("Randomqueuename:",queue_name)
binding_keys=sys.argv[1:]
ifnotbinding_keys:
sys.stderr.write("Usage:%s[info][warning][error]\n"%sys.argv[0])
sys.exit(1)
forbinding_keyinbinding_keys:
channel.queue_bind(exchange="topic_logs",
queue=queue_name,
routing_key=binding_key)
defcallback(ch,method,properties,body):
'''
Handletherecieveddata
:paramch:Theaddressofthechannel
:parammethod:Informationabouttheconnection
:paramproperties:
:parambody:
:return:
'''
print("------>",ch,method,properties)
print("[x]Recieved%r"%body)
#ackbyourself
ch.basic_ack(delivery_tag=method.delivery_tag)
#no_ack=Truerepresentthatthemessagecannotbetransfortonextconsumer,
#whenthecurrentconsumerisstopbyaccident.
channel.basic_consume(callback,#Ifhaverecievedmessage,enablethecallback()functiontohandlethemessage.
queue="hello",
no_ack=True)
print("[*]Waitingformessages.ToExitpressCTRL+C")
channel.start_consuming()
用rabbitmq实现rpc操作
1)Rpc_rabbit_client.py
#Author:Xuefeng
importpika
importtime
importuuid
classFibonacciRpcClient(object):
def__init__(self):
self.connection=pika.BlockingConnection(pika.Connection.Parameters(
"localhost"))
self.channel=self.connection.channel()
result=self.channel.queue_declare(exclusive=True)
self.callback_queue=result.method.queue#随机的生成一个接收命令执行结果的队列
self.channel.basic_consume(self.on_response,#只要收到消息就调用
no_ack=True,
queue=self.callback_queue)
defon_response(self,ch,method,props,body):
ifself.corr_id==props.correlation_id:
self.response=body
defcall(self,n):
self.response=None
self.corr_id=str(uuid.uuid4())
self.channel.basic_publish(
exchange="",
routing_key="rpc_queue",
properties=pika.BasicPropreties(
rely_to=self.callback_queue,
correlation_id=self.corr_id#通过随机生成的ID来验证指令执行结果与指令的匹配性
),
body=str(n)
)
whileself.responseisNone:
self.connection.process_data_events()#非阻塞版的start_consume,有没有消息都继续
print("nomessage...")
time.sleep(0.5)
returnint(self.response)
fibonacci_rcp=FibonacciRpcClient()
print("[x]Requestingfib(30)")
response=fibonacci_rcp.call(30)
print("[x]Rec%r"%response)
2)Rpc_rabbit_server.py
#Author:Xuefeng
importpika
importsys
connection=pika.BlockingConnection(pika.Connection.Parameters(
"localhost"
))
#statementachannel
channel=connection.channel()
channel.queue_declare(queue="rpc_queue")
deffib(n):
ifn==0:
return0
elifn==1:
return1
else:
returnfib(n-1)+fib(n-2)
defon_request(ch,method,props,body):
n=int(body)
print("[.]fib(%s)"%n)
response=fib(n)
ch.basic_publish(
exchange="",
routing_key=props.rely_to,
properties=pika.BasicPropreties(correlation_id=\
props.correlation),
body=str(body)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,queue="rpc_queue")
print("[x]AwaitingRPCrequests")
channel.start_consumeing()
channel.exchange_declare(exchange="direct_logs",
type="direct")
#exclusive排他,唯一的随机生成queue
result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
print("Randomqueuename:",queue_name)
severities=sys.argv[1:]
到此这篇关于PythonRabbitMQ实现简单的进程间通信示例的文章就介绍到这了,更多相关PythonRabbitMQ进程间通信内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!