Spring Boot 入门之消息中间件的使用
一、前言
在消息中间件中有2个重要的概念:消息代理和目的地。当消息发送者发送消息后,消息就被消息代理接管,消息代理保证消息传递到指定目的地。
我们常用的消息代理有JMS和AMQP规范。对应地,它们常见的实现分别是ActiveMQ和RabbitMQ。
二、整合ActiveMQ
2.1添加依赖
org.springframework.boot spring-boot-starter-activemq org.apache.activemq activemq-pool 
2.2添加配置
#activemq配置 spring.activemq.broker-url=tcp://192.168.2.12:61616 spring.activemq.user=admin spring.activemq.password=admin spring.activemq.pool.enabled=false spring.activemq.pool.max-connections=50 #使用发布/订阅模式时,下边配置需要设置成true spring.jms.pub-sub-domain=false
此处spring.activemq.pool.enabled=false,表示关闭连接池。
2.3编码
配置类:
@Configuration
publicclassJmsConfirguration{
publicstaticfinalStringQUEUE_NAME="activemq_queue";
publicstaticfinalStringTOPIC_NAME="activemq_topic";
@Bean
publicQueuequeue(){
returnnewActiveMQQueue(QUEUE_NAME);
}
@Bean
publicTopictopic(){
returnnewActiveMQTopic(TOPIC_NAME);
}
}
负责创建队列和主题。
消息生产者:
@Component
publicclassJmsSender{
@Autowired
privateQueuequeue;
@Autowired
privateTopictopic;
@Autowired
privateJmsMessagingTemplatejmsTemplate;
publicvoidsendByQueue(Stringmessage){
this.jmsTemplate.convertAndSend(queue,message);
}
publicvoidsendByTopic(Stringmessage){
this.jmsTemplate.convertAndSend(topic,message);
}
}
消息消费者:
@Component
publicclassJmsReceiver{
@JmsListener(destination=JmsConfirguration.QUEUE_NAME)
publicvoidreceiveByQueue(Stringmessage){
System.out.println("接收队列消息:"+message);
}
@JmsListener(destination=JmsConfirguration.TOPIC_NAME)
publicvoidreceiveByTopic(Stringmessage){
System.out.println("接收主题消息:"+message);
}
}
消息消费者使用@JmsListener注解监听消息。
2.4测试
@RunWith(SpringRunner.class)
@SpringBootTest
publicclassJmsTest{
@Autowired
privateJmsSendersender;
@Test
publicvoidtestSendByQueue(){
for(inti=1;i<6;i++){
this.sender.sendByQueue("helloactivemqqueue"+i);
}
}
@Test
publicvoidtestSendByTopic(){
for(inti=1;i<6;i++){
this.sender.sendByTopic("helloactivemqtopic"+i);
}
}
}
打印结果:
接收队列消息:helloactivemqqueue1
接收队列消息:helloactivemqqueue2
接收队列消息:helloactivemqqueue3
接收队列消息:helloactivemqqueue4
接收队列消息:helloactivemqqueue5
测试发布/订阅模式时,设置spring.jms.pub-sub-domain=true
接收主题消息:helloactivemqtopic1
接收主题消息:helloactivemqtopic2
接收主题消息:helloactivemqtopic3
接收主题消息:helloactivemqtopic4
接收主题消息:helloactivemqtopic5
三、整合RabbitMQ
3.1添加依赖
org.springframework.boot spring-boot-starter-amqp 
3.2添加配置
spring.rabbitmq.host=192.168.2.30 spring.rabbitmq.port=5672 spring.rabbitmq.username=light spring.rabbitmq.password=light spring.rabbitmq.virtual-host=/test
3.3编码
配置类:
@Configuration
publicclassAmqpConfirguration{
//=============简单、工作队列模式===============
publicstaticfinalStringSIMPLE_QUEUE="simple_queue";
@Bean
publicQueuequeue(){
returnnewQueue(SIMPLE_QUEUE,true);
}
//===============发布/订阅模式============
publicstaticfinalStringPS_QUEUE_1="ps_queue_1";
publicstaticfinalStringPS_QUEUE_2="ps_queue_2";
publicstaticfinalStringFANOUT_EXCHANGE="fanout_exchange";
@Bean
publicQueuepsQueue1(){
returnnewQueue(PS_QUEUE_1,true);
}
@Bean
publicQueuepsQueue2(){
returnnewQueue(PS_QUEUE_2,true);
}
@Bean
publicFanoutExchangefanoutExchange(){
returnnewFanoutExchange(FANOUT_EXCHANGE);
}
@Bean
publicBindingfanoutBinding1(){
returnBindingBuilder.bind(psQueue1()).to(fanoutExchange());
}
@Bean
publicBindingfanoutBinding2(){
returnBindingBuilder.bind(psQueue2()).to(fanoutExchange());
}
//===============路由模式============
publicstaticfinalStringROUTING_QUEUE_1="routing_queue_1";
publicstaticfinalStringROUTING_QUEUE_2="routing_queue_2";
publicstaticfinalStringDIRECT_EXCHANGE="direct_exchange";
@Bean
publicQueueroutingQueue1(){
returnnewQueue(ROUTING_QUEUE_1,true);
}
@Bean
publicQueueroutingQueue2(){
returnnewQueue(ROUTING_QUEUE_2,true);
}
@Bean
publicDirectExchangedirectExchange(){
returnnewDirectExchange(DIRECT_EXCHANGE);
}
@Bean
publicBindingdirectBinding1(){
returnBindingBuilder.bind(routingQueue1()).to(directExchange()).with("user");
}
@Bean
publicBindingdirectBinding2(){
returnBindingBuilder.bind(routingQueue2()).to(directExchange()).with("order");
}
//===============主题模式============
publicstaticfinalStringTOPIC_QUEUE_1="topic_queue_1";
publicstaticfinalStringTOPIC_QUEUE_2="topic_queue_2";
publicstaticfinalStringTOPIC_EXCHANGE="topic_exchange";
@Bean
publicQueuetopicQueue1(){
returnnewQueue(TOPIC_QUEUE_1,true);
}
@Bean
publicQueuetopicQueue2(){
returnnewQueue(TOPIC_QUEUE_2,true);
}
@Bean
publicTopicExchangetopicExchange(){
returnnewTopicExchange(TOPIC_EXCHANGE);
}
@Bean
publicBindingtopicBinding1(){
returnBindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.add");
}
@Bean
publicBindingtopicBinding2(){
returnBindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#");
}
}
RabbitMQ有多种工作模式,因此配置比较多。想了解相关内容的读者可以查看《RabbitMQ工作模式介绍》或者自行百度相关资料。
消息生产者:
@Component
publicclassAmqpSender{
@Autowired
privateAmqpTemplateamqpTemplate;
/**
*简单模式发送
*
*@parammessage
*/
publicvoidsimpleSend(Stringmessage){
this.amqpTemplate.convertAndSend(AmqpConfirguration.SIMPLE_QUEUE,message);
}
/**
*发布/订阅模式发送
*
*@parammessage
*/
publicvoidpsSend(Stringmessage){
this.amqpTemplate.convertAndSend(AmqpConfirguration.FANOUT_EXCHANGE,"",message);
}
/**
*路由模式发送
*
*@parammessage
*/
publicvoidroutingSend(StringroutingKey,Stringmessage){
this.amqpTemplate.convertAndSend(AmqpConfirguration.DIRECT_EXCHANGE,routingKey,message);
}
/**
*主题模式发送
*
*@paramroutingKey
*@parammessage
*/
publicvoidtopicSend(StringroutingKey,Stringmessage){
this.amqpTemplate.convertAndSend(AmqpConfirguration.TOPIC_EXCHANGE,routingKey,message);
}
}
消息消费者:
@Component
publicclassAmqpReceiver{
/**
*简单模式接收
*
*@parammessage
*/
@RabbitListener(queues=AmqpConfirguration.SIMPLE_QUEUE)
publicvoidsimpleReceive(Stringmessage){
System.out.println("接收消息:"+message);
}
/**
*发布/订阅模式接收
*
*@parammessage
*/
@RabbitListener(queues=AmqpConfirguration.PS_QUEUE_1)
publicvoidpsReceive1(Stringmessage){
System.out.println(AmqpConfirguration.PS_QUEUE_1+"接收消息:"+message);
}
@RabbitListener(queues=AmqpConfirguration.PS_QUEUE_2)
publicvoidpsReceive2(Stringmessage){
System.out.println(AmqpConfirguration.PS_QUEUE_2+"接收消息:"+message);
}
/**
*路由模式接收
*
*@parammessage
*/
@RabbitListener(queues=AmqpConfirguration.ROUTING_QUEUE_1)
publicvoidroutingReceive1(Stringmessage){
System.out.println(AmqpConfirguration.ROUTING_QUEUE_1+"接收消息:"+message);
}
@RabbitListener(queues=AmqpConfirguration.ROUTING_QUEUE_2)
publicvoidroutingReceive2(Stringmessage){
System.out.println(AmqpConfirguration.ROUTING_QUEUE_2+"接收消息:"+message);
}
/**
*主题模式接收
*
*@parammessage
*/
@RabbitListener(queues=AmqpConfirguration.TOPIC_QUEUE_1)
publicvoidtopicReceive1(Stringmessage){
System.out.println(AmqpConfirguration.TOPIC_QUEUE_1+"接收消息:"+message);
}
@RabbitListener(queues=AmqpConfirguration.TOPIC_QUEUE_2)
publicvoidtopicReceive2(Stringmessage){
System.out.println(AmqpConfirguration.TOPIC_QUEUE_2+"接收消息:"+message);
}
}
消息消费者使用@RabbitListener注解监听消息。
3.4测试
@RunWith(SpringRunner.class)
@SpringBootTest
publicclassAmqpTest{
@Autowired
privateAmqpSendersender;
@Test
publicvoidtestSimpleSend(){
for(inti=1;i<6;i++){
this.sender.simpleSend("testsimpleSend"+i);
}
}
@Test
publicvoidtestPsSend(){
for(inti=1;i<6;i++){
this.sender.psSend("testpsSend"+i);
}
}
@Test
publicvoidtestRoutingSend(){
for(inti=1;i<6;i++){
this.sender.routingSend("order","testroutingSend"+i);
}
}
@Test
publicvoidtestTopicSend(){
for(inti=1;i<6;i++){
this.sender.topicSend("user.add","testtopicSend"+i);
}
}
}
测试结果略过。。。
踩坑提醒1:ACCESS_REFUSED–LoginwasrefusedusingauthenticationmechanismPLAIN
解决方案:
1)请确保用户名和密码是否正确,需要注意的是用户名和密码的值是否包含空格或制表符(笔者测试时就是因为密码多了一个制表符导致认证失败)。
2)如果测试账户使用的是guest,需要修改rabbitmq.conf文件。在该文件中添加“loopback_users=none”配置。
踩坑提醒2:Cannotpreparequeueforlistener.Eitherthequeuedoesn'texistorthebrokerwillnotallowustouseit
解决方案:
我们可以登陆RabbitMQ的管理界面,在Queue选项中手动添加对应的队列。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。