Springboot 配置RabbitMQ文档的方法步骤
简介
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗
概念:
- 生产者消息的产生方,负责将消息推送到消息队列
- 消费者消息的最终接受方,负责监听队列中的对应消息,消费消息
- 队列消息的寄存器,负责存放生产者发送的消息
- 交换机负责根据一定规则分发生产者产生的消息
- 绑定完成交换机和队列之间的绑定
模式:
- direct:直连模式,用于实例间的任务分发
- topic:话题模式,通过可配置的规则分发给绑定在该exchange上的队列
- headers:适用规则复杂的分发,用headers里的参数表达规则
- fanout:分发给所有绑定到该exchange上的队列,忽略routingkey
SpringBoot集成RabbitMQ
一、引入maven依赖
org.springframework.boot spring-boot-starter-amqp 1.5.2.RELEASE
二、配置application.properties
#rabbitmq spring.rabbitmq.host=dev-mq.a.pa.com spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.rabbitmq.virtualHost=/message-test/
三、编写AmqpConfiguration配置文件
packagemessage.test.configuration; importorg.springframework.amqp.core.AcknowledgeMode; importorg.springframework.amqp.core.AmqpTemplate; importorg.springframework.amqp.core.Binding; importorg.springframework.amqp.core.BindingBuilder; importorg.springframework.amqp.core.DirectExchange; importorg.springframework.amqp.core.Queue; importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory; importorg.springframework.amqp.rabbit.connection.ConnectionFactory; importorg.springframework.amqp.rabbit.core.RabbitTemplate; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.beans.factory.annotation.Qualifier; importorg.springframework.boot.autoconfigure.amqp.RabbitProperties; importorg.springframework.context.annotation.Bean; importorg.springframework.context.annotation.Configuration; @Configuration publicclassAmqpConfiguration{ /** *消息编码 */ publicstaticfinalStringMESSAGE_ENCODING="UTF-8"; publicstaticfinalStringEXCHANGE_ISSUE="exchange_message_issue"; publicstaticfinalStringQUEUE_ISSUE_USER="queue_message_issue_user"; publicstaticfinalStringQUEUE_ISSUE_ALL_USER="queue_message_issue_all_user"; publicstaticfinalStringQUEUE_ISSUE_ALL_DEVICE="queue_message_issue_all_device"; publicstaticfinalStringQUEUE_ISSUE_CITY="queue_message_issue_city"; publicstaticfinalStringROUTING_KEY_ISSUE_USER="routing_key_message_issue_user"; publicstaticfinalStringROUTING_KEY_ISSUE_ALL_USER="routing_key_message_issue_all_user"; publicstaticfinalStringROUTING_KEY_ISSUE_ALL_DEVICE="routing_key_message_issue_all_device"; publicstaticfinalStringROUTING_KEY_ISSUE_CITY="routing_key_message_issue_city"; publicstaticfinalStringEXCHANGE_PUSH="exchange_message_push"; publicstaticfinalStringQUEUE_PUSH_RESULT="queue_message_push_result"; @Autowired privateRabbitPropertiesrabbitProperties; @Bean publicQueueissueUserQueue(){ returnnewQueue(QUEUE_ISSUE_USER); } @Bean publicQueueissueAllUserQueue(){ returnnewQueue(QUEUE_ISSUE_ALL_USER); } @Bean publicQueueissueAllDeviceQueue(){ returnnewQueue(QUEUE_ISSUE_ALL_DEVICE); } @Bean publicQueueissueCityQueue(){ returnnewQueue(QUEUE_ISSUE_CITY); } @Bean publicQueuepushResultQueue(){ returnnewQueue(QUEUE_PUSH_RESULT); } @Bean publicDirectExchangeissueExchange(){ returnnewDirectExchange(EXCHANGE_ISSUE); } @Bean publicDirectExchangepushExchange(){ //参数1:队列 //参数2:是否持久化 //参数3:是否自动删除 returnnewDirectExchange(EXCHANGE_PUSH,true,true); } @Bean publicBindingissueUserQueueBinding(@Qualifier("issueUserQueue")Queuequeue, @Qualifier("issueExchange")DirectExchangeexchange){ returnBindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_USER); } @Bean publicBindingissueAllUserQueueBinding(@Qualifier("issueAllUserQueue")Queuequeue, @Qualifier("issueExchange")DirectExchangeexchange){ returnBindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_USER); } @Bean publicBindingissueAllDeviceQueueBinding(@Qualifier("issueAllDeviceQueue")Queuequeue, @Qualifier("issueExchange")DirectExchangeexchange){ returnBindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_DEVICE); } @Bean publicBindingissueCityQueueBinding(@Qualifier("issueCityQueue")Queuequeue, @Qualifier("issueExchange")DirectExchangeexchange){ returnBindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_CITY); } @Bean publicBindingpushResultQueueBinding(@Qualifier("pushResultQueue")Queuequeue, @Qualifier("pushExchange")DirectExchangeexchange){ returnBindingBuilder.bind(queue).to(exchange).withQueueName(); } @Bean publicConnectionFactorydefaultConnectionFactory(){ CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory(); connectionFactory.setHost(rabbitProperties.getHost()); connectionFactory.setPort(rabbitProperties.getPort()); connectionFactory.setUsername(rabbitProperties.getUsername()); connectionFactory.setPassword(rabbitProperties.getPassword()); connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost()); returnconnectionFactory; } @Bean publicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory( @Qualifier("defaultConnectionFactory")ConnectionFactoryconnectionFactory){ SimpleRabbitListenerContainerFactoryfactory=newSimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); returnfactory; } @Bean publicAmqpTemplaterabbitTemplate(@Qualifier("defaultConnectionFactory")ConnectionFactoryconnectionFactory) { returnnewRabbitTemplate(connectionFactory); } }
三、编写生产者
body=JSON.toJSONString(issueMessage).getBytes(AmqpConfiguration.MESSAGE_ENCODING); rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE_ISSUE, AmqpConfiguration.ROUTING_KEY_ISSUE_USER,body);
四、编写消费者
@RabbitListener(queues=AmqpConfiguration.QUEUE_PUSH_RESULT) publicvoidhandlePushResult(@Payloadbyte[]data,Channelchannel, @Header(AmqpHeaders.DELIVERY_TAG)longdeliveryTag){ }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。