Springboot 配置RabbitMQ文档的方法步骤
简介
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗
概念:
- 生产者消息的产生方,负责将消息推送到消息队列
- 消费者消息的最终接受方,负责监听队列中的对应消息,消费消息
- 队列消息的寄存器,负责存放生产者发送的消息
- 交换机负责根据一定规则分发生产者产生的消息
- 绑定完成交换机和队列之间的绑定
模式:
- direct:直连模式,用于实例间的任务分发
- topic:话题模式,通过可配置的规则分发给绑定在该exchange上的队列
- headers:适用规则复杂的分发,用headers里的参数表达规则
- fanout:分发给所有绑定到该exchange上的队列,忽略routingkey
SpringBoot集成RabbitMQ
一、引入maven依赖
org.springframework.boot spring-boot-starter-amqp 1.5.2.RELEASE
二、配置application.properties
#rabbitmq spring.rabbitmq.host=dev-mq.a.pa.com spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.rabbitmq.virtualHost=/message-test/
三、编写AmqpConfiguration配置文件
packagemessage.test.configuration;
importorg.springframework.amqp.core.AcknowledgeMode;
importorg.springframework.amqp.core.AmqpTemplate;
importorg.springframework.amqp.core.Binding;
importorg.springframework.amqp.core.BindingBuilder;
importorg.springframework.amqp.core.DirectExchange;
importorg.springframework.amqp.core.Queue;
importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;
importorg.springframework.amqp.rabbit.connection.ConnectionFactory;
importorg.springframework.amqp.rabbit.core.RabbitTemplate;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.beans.factory.annotation.Qualifier;
importorg.springframework.boot.autoconfigure.amqp.RabbitProperties;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
@Configuration
publicclassAmqpConfiguration{
/**
*消息编码
*/
publicstaticfinalStringMESSAGE_ENCODING="UTF-8";
publicstaticfinalStringEXCHANGE_ISSUE="exchange_message_issue";
publicstaticfinalStringQUEUE_ISSUE_USER="queue_message_issue_user";
publicstaticfinalStringQUEUE_ISSUE_ALL_USER="queue_message_issue_all_user";
publicstaticfinalStringQUEUE_ISSUE_ALL_DEVICE="queue_message_issue_all_device";
publicstaticfinalStringQUEUE_ISSUE_CITY="queue_message_issue_city";
publicstaticfinalStringROUTING_KEY_ISSUE_USER="routing_key_message_issue_user";
publicstaticfinalStringROUTING_KEY_ISSUE_ALL_USER="routing_key_message_issue_all_user";
publicstaticfinalStringROUTING_KEY_ISSUE_ALL_DEVICE="routing_key_message_issue_all_device";
publicstaticfinalStringROUTING_KEY_ISSUE_CITY="routing_key_message_issue_city";
publicstaticfinalStringEXCHANGE_PUSH="exchange_message_push";
publicstaticfinalStringQUEUE_PUSH_RESULT="queue_message_push_result";
@Autowired
privateRabbitPropertiesrabbitProperties;
@Bean
publicQueueissueUserQueue(){
returnnewQueue(QUEUE_ISSUE_USER);
}
@Bean
publicQueueissueAllUserQueue(){
returnnewQueue(QUEUE_ISSUE_ALL_USER);
}
@Bean
publicQueueissueAllDeviceQueue(){
returnnewQueue(QUEUE_ISSUE_ALL_DEVICE);
}
@Bean
publicQueueissueCityQueue(){
returnnewQueue(QUEUE_ISSUE_CITY);
}
@Bean
publicQueuepushResultQueue(){
returnnewQueue(QUEUE_PUSH_RESULT);
}
@Bean
publicDirectExchangeissueExchange(){
returnnewDirectExchange(EXCHANGE_ISSUE);
}
@Bean
publicDirectExchangepushExchange(){
//参数1:队列
//参数2:是否持久化
//参数3:是否自动删除
returnnewDirectExchange(EXCHANGE_PUSH,true,true);
}
@Bean
publicBindingissueUserQueueBinding(@Qualifier("issueUserQueue")Queuequeue,
@Qualifier("issueExchange")DirectExchangeexchange){
returnBindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_USER);
}
@Bean
publicBindingissueAllUserQueueBinding(@Qualifier("issueAllUserQueue")Queuequeue,
@Qualifier("issueExchange")DirectExchangeexchange){
returnBindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_USER);
}
@Bean
publicBindingissueAllDeviceQueueBinding(@Qualifier("issueAllDeviceQueue")Queuequeue,
@Qualifier("issueExchange")DirectExchangeexchange){
returnBindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_DEVICE);
}
@Bean
publicBindingissueCityQueueBinding(@Qualifier("issueCityQueue")Queuequeue,
@Qualifier("issueExchange")DirectExchangeexchange){
returnBindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_CITY);
}
@Bean
publicBindingpushResultQueueBinding(@Qualifier("pushResultQueue")Queuequeue,
@Qualifier("pushExchange")DirectExchangeexchange){
returnBindingBuilder.bind(queue).to(exchange).withQueueName();
}
@Bean
publicConnectionFactorydefaultConnectionFactory(){
CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory();
connectionFactory.setHost(rabbitProperties.getHost());
connectionFactory.setPort(rabbitProperties.getPort());
connectionFactory.setUsername(rabbitProperties.getUsername());
connectionFactory.setPassword(rabbitProperties.getPassword());
connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost());
returnconnectionFactory;
}
@Bean
publicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(
@Qualifier("defaultConnectionFactory")ConnectionFactoryconnectionFactory){
SimpleRabbitListenerContainerFactoryfactory=newSimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
returnfactory;
}
@Bean
publicAmqpTemplaterabbitTemplate(@Qualifier("defaultConnectionFactory")ConnectionFactoryconnectionFactory)
{
returnnewRabbitTemplate(connectionFactory);
}
}
三、编写生产者
body=JSON.toJSONString(issueMessage).getBytes(AmqpConfiguration.MESSAGE_ENCODING); rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE_ISSUE, AmqpConfiguration.ROUTING_KEY_ISSUE_USER,body);
四、编写消费者
@RabbitListener(queues=AmqpConfiguration.QUEUE_PUSH_RESULT)
publicvoidhandlePushResult(@Payloadbyte[]data,Channelchannel,
@Header(AmqpHeaders.DELIVERY_TAG)longdeliveryTag){
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。