RabbitMQ 的消息持久化与 Spring AMQP 的实现详解
前言
要从奔溃的RabbitMQ中恢复的消息,我们需要做消息持久化。如果消息要从RabbitMQ奔溃中恢复,那么必须满足三点,且三者缺一不可。
- 交换器必须是持久化。
- 队列必须是持久化的。
- 消息必须是持久化的。
原生的实现方式
原生的RabbitMQ客户端需要完成三个步骤。
第一步,交换器的持久化。
//参数1exchange:交换器名 //参数2type:交换器类型 //参数3durable:是否持久化 channel.exchangeDeclare(EXCHANGE_NAME,"topic",true);
第二步,队列的持久化。
//参数1queue:队列名 //参数2durable:是否持久化 //参数3exclusive:仅创建者可以使用的私有队列,断开后自动删除 //参数4autoDelete:当所有消费客户端连接断开后,是否自动删除队列 //参数5arguments channel.queueDeclare(QUEUE_NAME,true,false,false,null);
第三步,消息的持久化。
//参数1exchange:交换器 //参数2routingKey:路由键 //参数3props:消息的其他参数,其中MessageProperties.PERSISTENT_TEXT_PLAIN表示持久化 //参数4body:消息体 channel.basicPublish("",queue_name,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
SpringAMQP的实现方式
SpringAMQP是对原生的RabbitMQ客户端的封装。一般情况下,我们只需要定义交换器的持久化和队列的持久化。
其中,交换器的持久化配置如下。
//参数1name:交互器名 //参数2durable:是否持久化 //参数3autoDelete:当所有消费客户端连接断开后,是否自动删除队列 newTopicExchange(name,durable,autoDelete)
此外,还需要再配置队列的持久化。
//参数1name:队列名 //参数2durable:是否持久化 //参数3exclusive:仅创建者可以使用的私有队列,断开后自动删除 //参数4autoDelete:当所有消费客户端连接断开后,是否自动删除队列 newQueue(name,durable,exclusive,autoDelete);
至此,RabbitMQ的消息持久化配置完毕。
那么,消息的持久化难道不需要配置么?确实如此,我们来看下源码。
一般情况下,我们会通过这种方式发送消息。
rabbitTemplate.convertAndSend(exchange,routeKey,message);
其中,调用了convertAndSend(Stringexchange,StringroutingKey,finalObjectobject)方法。
@Override publicvoidconvertAndSend(Stringexchange,StringroutingKey,finalObjectobject)throwsAmqpException{ convertAndSend(exchange,routingKey,object,(CorrelationData)null); }
接着,用调用了convertAndSend(Stringexchange,StringroutingKey,finalObjectobject,CorrelationDatacorrelationData)方法。
publicvoidconvertAndSend(Stringexchange,StringroutingKey,finalObjectobject,CorrelationDatacorrelationData)throwsAmqpException{ send(exchange,routingKey,convertMessageIfNecessary(object),correlationData); }
此时,最关键的方法出现了,它是convertMessageIfNecessary(finalObjectobject)。
protectedMessageconvertMessageIfNecessary(finalObjectobject){ if(objectinstanceofMessage){ return(Message)object; } returngetRequiredMessageConverter().toMessage(object,newMessageProperties()); }
其中,关键的是MessageProperties类,它持久化的策略是MessageDeliveryMode.PERSISTENT,因此它会初始化时默认消息是持久化的。
publicclassMessagePropertiesimplementsSerializable{ publicMessageProperties(){ this.deliveryMode=DEFAULT_DELIVERY_MODE; this.priority=DEFAULT_PRIORITY; } static{ DEFAULT_DELIVERY_MODE=MessageDeliveryMode.PERSISTENT; DEFAULT_PRIORITY=Integer.valueOf(0); } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。