Python如何在RabbitMQ中创建延迟队列
示例
首先,我们需要设置两个基本通道,一个用于主队列,一个用于延迟队列。在最后的示例中,我添加了一些不需要的其他标志,但这些标志使代码更可靠。如confirmdelivery,delivery_mode和durable。您可以在RabbitMQ手册中找到有关这些的更多信息。
设置通道后,我们将绑定添加到主通道,以便将消息从延迟通道发送到主队列。
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
接下来,我们需要配置延迟通道,以在消息过期后将其转发到主队列。
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={ 'x-message-ttl': 5000, 'x-dead-letter-exchange': 'amq.direct', 'x-dead-letter-routing-key': 'hello' })
x-message-ttl(消息-生存时间)
通常,它用于在特定的持续时间后自动删除队列中的旧消息,但是通过添加两个可选参数,我们可以更改此行为,而是让此参数以毫秒为单位确定消息在延迟队列中保留的时间。
x死信路由键
此变量使我们可以在消息过期后将其传输到其他队列,而不是将其完全删除的默认行为。
x死信交换
此变量确定用于将邮件从hello_delay传输到hello队列的Exchange。
发布到延迟队列
设置完所有基本的Pika参数后,您只需使用基本发布就可以将消息发送到延迟队列。
delay_channel.basic.publish(exchange='', routing_key='hello_delay', body='test', properties={'delivery_mod': 2})
一旦执行了脚本,您应该会在RabbitMQ管理模块中看到以下队列。
例。
from amqpstorm import Connection connection = Connection('127.0.0.1', 'guest', 'guest') #创建普通的“HelloWorld”类型频道。 channel = connection.channel() channel.confirm_deliveries() channel.queue.declare(queue='hello', durable=True) #我们需要将此渠道绑定到交易所,该交易所将用于转移 #来自我们的延迟队列的消息。 channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello') #创建我们的延迟频道。 delay_channel = connection.channel() delay_channel.confirm_deliveries() #在这里我们声明延迟,并为延迟通道进行路由。 delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={ 'x-message-ttl': 5000, #延迟直到消息以毫秒为单位传输。 'x-dead-letter-exchange': 'amq.direct', #Exchange用于将邮件从A传输到B。 'x-dead-letter-routing-key': 'hello' #我们要将邮件传输到的队列的名称。 }) delay_channel.basic.publish(exchange='', routing_key='hello_delay', body='test', properties={'delivery_mode': 2}) print("[x] Sent")