RabbitMQ延迟队列及消息延迟推送实现详解
这篇文章主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
应用场景
目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如:
- 淘宝七天自动确认收货。在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能。
- 12306购票支付确认页面。我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着30分钟内订单不确认的话将会自动取消订单。其实在下订单那一刻开始购票业务系统就会发送一个延时消息给订单系统,延时30分钟,告诉订单系统订单未完成,如果我们在30分钟内完成了订单,则可以通过逻辑代码判断来忽略掉收到的消息。
在上面两种场景中,如果我们使用下面两种传统解决方案无疑大大降低了系统的整体性能和吞吐量:
- 使用redis给订单设置过期时间,最后通过判断redis中是否还有该订单来决定订单是否已经完成。这种解决方案相较于消息的延迟推送性能较低,因为我们知道redis都是存储于内存中,我们遇到恶意下单或者刷单的将会给内存带来巨大压力。
- 使用传统的数据库轮询来判断数据库表中订单的状态,这无疑增加了IO次数,性能极低。
- 使用jvm原生的DelayQueue,也是大量占用内存,而且没有持久化策略,系统宕机或者重启都会丢失订单信息。
消息延迟推送的实现
在RabbitMQ3.6.x之前我们一般采用死信队列+TTL过期时间来实现延迟队列,我们这里不做过多介绍,可以参考之前文章来了解:TTL、死信队列
在RabbitMQ3.6.x开始,RabbitMQ官方提供了延迟队列的插件,可以下载放置到RabbitMQ根目录下的plugins下。延迟队列插件下载
首先我们创建交换机和消息队列
importorg.springframework.amqp.core.*;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importjava.util.HashMap;
importjava.util.Map;
@Configuration
publicclassMQConfig{
publicstaticfinalStringLAZY_EXCHANGE="Ex.LazyExchange";
publicstaticfinalStringLAZY_QUEUE="MQ.LazyQueue";
publicstaticfinalStringLAZY_KEY="lazy.#";
@Bean
publicTopicExchangelazyExchange(){
//Mappros=newHashMap<>();
//设置交换机支持延迟消息推送
//pros.put("x-delayed-message","topic");
TopicExchangeexchange=newTopicExchange(LAZY_EXCHANGE,true,false,pros);
exchange.setDelayed(true);
returnexchange;
}
@Bean
publicQueuelazyQueue(){
returnnewQueue(LAZY_QUEUE,true);
}
@Bean
publicBindinglazyBinding(){
returnBindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
}
}
我们在Exchange的声明中可以设置exchange.setDelayed(true)来开启延迟队列,也可以设置为以下内容传入交换机声明的方法中,因为第一种方式的底层就是通过这种方式来实现的。
//Mappros=newHashMap<>(); //设置交换机支持延迟消息推送 //pros.put("x-delayed-message","topic"); TopicExchangeexchange=newTopicExchange(LAZY_EXCHANGE,true,false,pros);
发送消息时我们需要指定延迟推送的时间,我们这里在发送消息的方法中传入参数newMessagePostProcessor()是为了获得Message对象,因为需要借助Message对象的api来设置延迟时间。
importcom.anqi.mq.config.MQConfig;
importorg.springframework.amqp.AmqpException;
importorg.springframework.amqp.core.Message;
importorg.springframework.amqp.core.MessageDeliveryMode;
importorg.springframework.amqp.core.MessagePostProcessor;
importorg.springframework.amqp.rabbit.connection.CorrelationData;
importorg.springframework.amqp.rabbit.core.RabbitTemplate;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.stereotype.Component;
importjava.util.Date;
@Component
publicclassMQSender{
@Autowired
privateRabbitTemplaterabbitTemplate;
//confirmCallbackreturnCallback代码省略,请参照上一篇
publicvoidsendLazy(Objectmessage){
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id+时间戳全局唯一
CorrelationDatacorrelationData=newCorrelationData("12345678909"+newDate());
//发送消息时指定header延迟时间
rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE,"lazy.boot",message,
newMessagePostProcessor(){
@Override
publicMessagepostProcessMessage(Messagemessage)throwsAmqpException{
//设置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//message.getMessageProperties().setHeader("x-delay","6000");
message.getMessageProperties().setDelay(6000);
returnmessage;
}
},correlationData);
}
}
我们可以观察setDelay(Integeri)底层代码,也是在header中设置x-delay。等同于我们手动设置header
message.getMessageProperties().setHeader("x-delay","6000");
/**
*Setthex-delayheader.
*@paramdelaythedelay.
*@since1.6
*/
publicvoidsetDelay(Integerdelay){
if(delay==null||delay<0){
this.headers.remove(X_DELAY);
}
else{
this.headers.put(X_DELAY,delay);
}
}
消费端进行消费
importcom.rabbitmq.client.Channel;
importorg.springframework.amqp.rabbit.annotation.*;
importorg.springframework.amqp.support.AmqpHeaders;
importorg.springframework.stereotype.Component;
importjava.io.IOException;
importjava.util.Map;
@Component
publicclassMQReceiver{
@RabbitListener(queues="MQ.LazyQueue")
@RabbitHandler
publicvoidonLazyMessage(Messagemsg,Channelchannel)throwsIOException{
longdeliveryTag=msg.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag,true);
System.out.println("lazyreceive"+newString(msg.getBody()));
}
测试结果
importorg.junit.Test;
importorg.junit.runner.RunWith;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.boot.test.context.SpringBootTest;
importorg.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
publicclassMQSenderTest{
@Autowired
privateMQSendermqSender;
@Test
publicvoidsendLazy()throwsException{
Stringmsg="hellospringboot";
mqSender.sendLazy(msg+":");
}
}
果然在6秒后收到了消息lazyreceivehellospringboot:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。