SpringBoot使用RabbitMQ延时队列(小白必备)
1.什么是MQ
MQ,是一种跨进程的通信机制,用于上下游传递消息。
在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。
使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。
为什么会产生消息列队?
- 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
- 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;
延时列队的使用场景?
- 订单业务:在淘宝或者京东购买东西,用户下单后未付款则30分钟后取消订单。
- 短信通知:手机用户交完话费后,几分钟之内将会收到缴费信息
2.什么是RabbitMQ(这里就做了一下简单介绍)
RabbitMQ是一种消息队列,用于常见的进程通信。支持点对点,请求应答和发布订阅模式并且提供多种语言的支持。常见的java,c#,php都支持。
常被用在异步处理,应用解耦。流量消锋等复杂的业务场景中。和java的kafka一样都属于消息中间件。
下载地址:
https://www.rabbitmq.com/download.html
进入RabbitMQ官网
1.第一步
第二步
下载好后不要着急安装RabbitMQ,我们这里还需要安装Erlang
下载地址:http://www.erlang.org/download/otp_win64_17.3.exe
安装步骤
步骤一
现在安装RabbitMQ
步骤一
启动RabbitMQ管理工具
开始菜单—最新添加—展开—选中双击
输入命令:rabbitmq-pluginsenablerabbitmq_management
效果如果图
在浏览器中输入地址查看:http://127.0.0.1:15672/
若不出现此页面,就是安装失败了,不要慌,多半问题在系统用户名必须是中文(放心有解决办法):
Windows下安装RabbitMQ后,按正常RabbitMQ会自动注册服务并自动启动,但是如果有的道友不注意中英文目录就会出现服务启动后几秒钟自动停止,而且反反复复。
出现这种情况一般都是由我们的用户名是中文,而导致默认的DB和log访问出现问。所以我建议以后大家在使用windows操作系统的时候尽量用英文来命名文件或目录,这样会极大的减小以后安装软件出现莫名其妙的问题的bug。
接下来我们先卸载我们的RabbitMQ,然后在我们的系统变量里设置一个RABBITMQ_BASE的变量路径为一个不含英文的路径比如E:\rabbit,最后我们重新安装RabbitMQ即可,然后就会看到RabbitMQ服务自动注册了,并且不会自动停止。
SpringBoot整合RabbitMQ
1.添加依赖
pom.xml中添加spring-boot-starter-amqp的依赖
org.springframework.boot spring-boot-starter-amqp
其他依赖
org.springframework.boot spring-boot-starter-web org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.junit.vintage junit-vintage-engine junit junit 4.12 test
application.yml文件中配置rabbitmq相关内容
spring: rabbitmq: host:localhost port:5672 username:guest password:guest
这里我们环境就搭建起来了
2.具体编码实现
配置列队
packagecom.example.spring_boot_rabbitmq; importlombok.extern.slf4j.Slf4j; importorg.springframework.amqp.core.*; importorg.springframework.context.annotation.Bean; importorg.springframework.context.annotation.Configuration; importjava.util.HashMap; importjava.util.Map; /** *@author:zq *@date:Greatedin2019/12/1911:46 *配置队列 */ @Configuration @Slf4j publicclassDelayRabbitConfig{ /** *延迟队列TTL名称 */ privatestaticfinalStringORDER_DELAY_QUEUE="user.order.delay.queue"; /** *DLX,deadletter发送到的exchange *延时消息就是发送到该交换机的 */ publicstaticfinalStringORDER_DELAY_EXCHANGE="user.order.delay.exchange"; /** *routingkey名称 *具体消息发送在该routingKey的 */ publicstaticfinalStringORDER_DELAY_ROUTING_KEY="order_delay"; publicstaticfinalStringORDER_QUEUE_NAME="user.order.queue"; publicstaticfinalStringORDER_EXCHANGE_NAME="user.order.exchange"; publicstaticfinalStringORDER_ROUTING_KEY="order"; /** *延迟队列配置 **1、params.put("x-message-ttl",5*1000); *第一种方式是直接设置Queue延迟时间但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先) *2、rabbitTemplate.convertAndSend(book,message->{ *message.getMessageProperties().setExpiration(2*1000+""); *returnmessage; *}); *第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制 **/ @Bean publicQueuedelayOrderQueue(){ Map
params=newHashMap<>(); //x-dead-letter-exchange声明了队列里的死信转发到的DLX名称, params.put("x-dead-letter-exchange",ORDER_EXCHANGE_NAME); //x-dead-letter-routing-key声明了这些死信在转发时携带的routing-key名称。 params.put("x-dead-letter-routing-key",ORDER_ROUTING_KEY); returnnewQueue(ORDER_DELAY_QUEUE,true,false,false,params); } /** *需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。 *这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键“dog”,则只有被标记为“dog”的消息才被转发, *不会转发dog.puppy,也不会转发dog.guard,只会转发dog。 *@returnDirectExchange */ @Bean publicDirectExchangeorderDelayExchange(){ returnnewDirectExchange(ORDER_DELAY_EXCHANGE); } @Bean publicBindingdlxBinding(){ returnBindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY); } @Bean publicQueueorderQueue(){ returnnewQueue(ORDER_QUEUE_NAME,true); } /** *将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。 *符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”只会匹配到“audit.irs”。 **/ @Bean publicTopicExchangeorderTopicExchange(){ returnnewTopicExchange(ORDER_EXCHANGE_NAME); } @Bean publicBindingorderBinding(){ //TODO如果要让延迟队列之间有关联,这里的routingKey和绑定的交换机很关键 returnBindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY); } }
创建一个Order实体类
packagecom.example.spring_boot_rabbitmq.pojo; importlombok.Data; importjava.io.Serializable; /** *@author:zq *@date:Greatedin2019/12/1911:49 */ @Data publicclassOrderimplementsSerializable{ privatestaticfinallongserialVersionUID=-2221214252163879885L; privateStringorderId;//订单id privateIntegerorderStatus;//订单状态0:未支付,1:已支付,2:订单已取消 privateStringorderName;//订单名字 }
接收者
packagecom.example.spring_boot_rabbitmq; importcom.example.spring_boot_rabbitmq.pojo.Order; importcom.rabbitmq.client.Channel; importlombok.extern.slf4j.Slf4j; importorg.springframework.amqp.core.Message; importorg.springframework.amqp.rabbit.annotation.RabbitListener; importorg.springframework.stereotype.Component; importjava.util.Date; /** *@author:zq *@date:Greatedin2019/12/1911:53 *接收者 */ @Component @Slf4j publicclassDelayReceiver{ @RabbitListener(queues={DelayRabbitConfig.ORDER_QUEUE_NAME}) publicvoidorderDelayQueue(Orderorder,Messagemessage,Channelchannel){ log.info("###########################################"); log.info("【orderDelayQueue监听的消息】-【消费时间】-[{}]-【订单内容】-[{}]",newDate(),order.toString()); if(order.getOrderStatus()==0){ order.setOrderStatus(2); log.info("【该订单未支付,取消订单】"+order.toString()); }elseif(order.getOrderStatus()==1){ log.info("【该订单已完成支付】"); }elseif(order.getOrderStatus()==2){ log.info("【该订单已取消】"); } log.info("###########################################"); } }
发送者
packagecom.example.spring_boot_rabbitmq; importcom.example.spring_boot_rabbitmq.pojo.Order; importlombok.extern.slf4j.Slf4j; importorg.springframework.amqp.core.AmqpTemplate; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.stereotype.Component; importjava.util.Date; /** *@author:zq *@date:Greatedin2019/12/1911:55 *发送者 */ @Component @Slf4j publicclassDelaySender{ @Autowired privateAmqpTemplateamqpTemplate; publicvoidsendDelay(Orderorder){ log.info("【订单生成时间】"+newDate().toString()+"【1分钟后检查订单是否已经支付】"+order.toString()); this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE,DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY,order,message->{ //如果配置了params.put("x-message-ttl",5*1000);那么这一句也可以省略,具体根据业务需要是声明Queue的时候就指定好延迟时间还是在发送自己控制时间 message.getMessageProperties().setExpiration(1*1000*60+""); returnmessage; }); } }
测试,访问http://localhost:8080/sendDelay查看日志输出
packagecom.example.spring_boot_rabbitmq; importcom.example.spring_boot_rabbitmq.pojo.Order; importorg.springframework.web.bind.annotation.RestController; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.web.bind.annotation.GetMapping; /** *@author:zq *@date:Greatedin2019/12/1911:57 *测试 */ @RestController publicclassTestController{ @Autowired privateDelaySenderdelaySender; @GetMapping("/sendDelay") publicObjectsendDelay(){ Orderorder1=newOrder(); order1.setOrderStatus(0); order1.setOrderId("123456"); order1.setOrderName("小米6"); Orderorder2=newOrder(); order2.setOrderStatus(1); order2.setOrderId("456789"); order2.setOrderName("小米8"); delaySender.sendDelay(order1); delaySender.sendDelay(order2); return"ok"; } }
输出
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。