Java搭建RabbitMq消息中间件过程详解
这篇文章主要介绍了Java搭建RabbitMq消息中间件过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
前言
当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列。
名词
- exchange:交换机
 - routingkey:路由key
 - queue:队列
 
控制台端口:15672
exchange和queue是需要绑定在一起的,然后消息发送到exchange再由exchange通过routingkey发送到对应的队列中。
使用场景
1.技能订单3分钟自动取消,改变状态
2.直播开始前15分钟提醒
3.直播状态自动结束
流程
生产者发送消息—>order_pre_exchange交换机—>order_per_ttl_delay_queue队列
—>时间到期—>order_delay_exchange交换机—>order_delay_process_queue队列—>消费者
第一步:在pom文件中添加
org.springframework.boot spring-boot-starter-amqp 
第二步:在application.properties文件中添加
spring.rabbitmq.host=172.xx.xx.xxx spring.rabbitmq.port=5672 spring.rabbitmq.username=rabbit spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true
第三步:配置OrderQueueConfig
packagecom.tuohang.platform.config;
importorg.springframework.amqp.core.Binding;
importorg.springframework.amqp.core.BindingBuilder;
importorg.springframework.amqp.core.DirectExchange;
importorg.springframework.amqp.core.Queue;
importorg.springframework.amqp.core.QueueBuilder;
importorg.springframework.amqp.rabbit.connection.ConnectionFactory;
importorg.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
importorg.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
/**
*rabbitMQ的队列设置(生产者发送的消息,永远是先进入exchange,再通过路由,转发到队列)
*
*
*@authorAdministrator
*@version1.0
*@Date2018年9月18日
*/
@Configuration
publicclassOrderQueueConfig{
/**
*订单缓冲交换机名称
*/
publicfinalstaticStringORDER_PRE_EXCHANGE_NAME="order_pre_exchange";
/**
*发送到该队列的message会在一段时间后过期进入到order_delay_process_queue【队列里所有的message都有统一的失效时间】
*/
publicfinalstaticStringORDER_PRE_TTL_DELAY_QUEUE_NAME="order_pre_ttl_delay_queue";
/**
*订单的交换机DLX名字
*/
finalstaticStringORDER_DELAY_EXCHANGE_NAME="order_delay_exchange";
/**
*订单message时间过期后进入的队列,也就是订单实际的消费队列
*/
publicfinalstaticStringORDER_DELAY_PROCESS_QUEUE_NAME="order_delay_process_queue";
/**
*订单在缓冲队列过期时间(毫秒)30分钟
*/
publicfinalstaticintORDER_QUEUE_EXPIRATION=1800000;
/**
*订单缓冲交换机
*
*@return
*/
@Bean
publicDirectExchangepreOrderExange(){
returnnewDirectExchange(ORDER_PRE_EXCHANGE_NAME);
}
/**
*创建order_per_ttl_delay_queue队列,订单消息经过缓冲交换机,会进入该队列
*
*@return
*/
@Bean
publicQueuedelayQueuePerOrderTTLQueue(){
returnQueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME)
.withArgument("x-dead-letter-exchange",ORDER_DELAY_EXCHANGE_NAME)//DLX
.withArgument("x-dead-letter-routing-key",ORDER_DELAY_PROCESS_QUEUE_NAME)//deadletter携带的routingkey
.withArgument("x-message-ttl",ORDER_QUEUE_EXPIRATION)//设置订单队列的过期时间
.build();
}
/**
*将order_pre_exchange绑定到order_pre_ttl_delay_queue队列
*
*@paramdelayQueuePerOrderTTLQueue
*@parampreOrderExange
*@return
*/
@Bean
publicBindingqueueOrderTTLBinding(QueuedelayQueuePerOrderTTLQueue,DirectExchangepreOrderExange){
returnBindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME);
}
/**
*创建订单的DLXexchange
*
*@return
*/
@Bean
publicDirectExchangedelayOrderExchange(){
returnnewDirectExchange(ORDER_DELAY_EXCHANGE_NAME);
}
/**
*创建order_delay_process_queue队列,也就是订单实际消费队列
*
*@return
*/
@Bean
publicQueuedelayProcessOrderQueue(){
returnQueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build();
}
/**
*将DLX绑定到实际消费队列
*
*@paramdelayProcessOrderQueue
*@paramdelayExchange
*@return
*/
@Bean
publicBindingdlxOrderBinding(QueuedelayProcessOrderQueue,DirectExchangedelayOrderExchange){
returnBindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME);
}
/**
*监听订单实际消费者队列order_delay_process_queue
*
*@paramconnectionFactory
*@paramprocessReceiver
*@return
*/
@Bean
publicSimpleMessageListenerContainerorderProcessContainer(ConnectionFactoryconnectionFactory,
OrderProcessReceiverprocessReceiver){
SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME);//监听order_delay_process_queue
container.setMessageListener(newMessageListenerAdapter(processReceiver));
returncontainer;
}
}
消费者OrderProcessReceiver:
packagecom.tuohang.platform.config;
importjava.util.Objects;
importorg.apache.tools.ant.types.resources.selectors.Date;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.amqp.core.Message;
importorg.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
importorg.springframework.stereotype.Component;
importcom.rabbitmq.client.Channel;
/**
*订单延迟处理消费者
*
*
*@authorAdministrator
*@version1.0
*@Date2018年9月18日
*/
@Component
publicclassOrderProcessReceiverimplementsChannelAwareMessageListener{
privatestaticLoggerlogger=LoggerFactory.getLogger(OrderProcessReceiver.class);
Stringmsg="Thefailedmessagewillautoretryafteracertaindelay";
@Override
publicvoidonMessage(Messagemessage,Channelchannel)throwsException{
try{
processMessage(message);
}catch(Exceptione){
//如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做
channel.basicPublish(OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME,OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME,null,
msg.getBytes());
}
}
/**
*处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)
*
*@parammessage
*@throwsException
*/
publicvoidprocessMessage(Messagemessage)throwsException{
StringrealMessage=newString(message.getBody());
logger.info("Received<"+realMessage+">");
//取消订单
if(!Objects.equals(realMessage,msg)){
//SpringKit.getBean(ITestService.class).resetSexById(Long.valueOf(realMessage));
System.out.println("测试111111-----------"+newDate());
System.out.println(message);
}
}
}
或者
/**
*测试rabbit消费者
*
*
*@authorAdministrator
*@version1.0
*@Date2018年9月25日
*/
@Component
@RabbitListener(queues=TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME)
publicclassTestProcessReceiver{
privatestaticLoggerlogger=LoggerFactory.getLogger(TestProcessReceiver.class);
Stringmsg="Thefailedmessagewillautoretryafteracertaindelay";
@RabbitHandler
publicvoidonMessage(Messagemessage,Channelchannel)throwsException{
try{
processMessage(message);
//告诉服务器收到这条消息已经被我消费了可以在队列删掉;否则消息服务器以为这条消息没处理掉后续还会在发
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch(Exceptione){
//如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做
channel.basicPublish(TestQueueConfig.TEST_PRE_EXCHANGE_NAME,TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME,null,
msg.getBytes());
}
}
/**
*处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)
*
*@parammessage
*@throwsException
*/
publicvoidprocessMessage(Messagemessage)throwsException{
StringrealMessage=newString(message.getBody());
logger.info("Received<"+realMessage+">");
//取消订单
if(!Objects.equals(realMessage,msg)){
System.out.println("测试111111-----------"+newDate());
}else{
System.out.println("rabbitelse...");
}
}
}
生产者
/**
*测试rabbitmq
*
*@return
*/
@RequestMapping(value="/testrab")
publicStringtestraa(){
GenericResultgr=null;
try{
Stringname="test_pre_ttl_delay_queue";
longexpiration=10000;//10s过期时间
rabbitTemplate.convertAndSend(name,String.valueOf(123456));
//在单个消息上设置过期时间
//rabbitTemplate.convertAndSend(name,(Object)String.valueOf(123456),newExpirationMessagePostProcessor(expiration));
}catch(ServiceExceptione){
e.printStackTrace();
gr=newGenericResult(StateCode.ERROR,languageMap.get("network_error"),e.getMessage());
}
returngetWrite(gr);
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。