SpringBoot如何优雅的使用RocketMQ
本文内容纲要:
-SpringBoot如何优雅的使用RocketMQ
目录
-
SpringBoot如何优雅的使用RocketMQ
- 什么是RocketMQ?
- RocketMQ环境安装
- SpringBoot环境中使用RocketMQ
SpringBoot如何优雅的使用RocketMQ
MQ,是一种跨进程的通信机制,用于上下游传递消息。在传统的互联网架构中通常使用MQ来对上下游来做解耦合。
举例:当A系统对B系统进行消息通讯,如A系统发布一条系统公告,B系统可以订阅该频道进行系统公告同步,整个过程中A系统并不关系B系统会不会同步,由订阅该频道的系统自行处理。
什么是RocketMQ?
官方说明:
随着使用越来越多的队列和虚拟主题,ActiveMQIO模块遇到了瓶颈。我们尽力通过节流,断路器或降级来解决此问题,但效果不佳。因此,我们那时开始关注流行的消息传递解决方案Kafka。不幸的是,Kafka不能满足我们的要求,特别是在低延迟和高可靠性方面。
看到这里可以很清楚的知道RcoketMQ是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。
具有以下特性:
- 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
- 能够保证严格的消息顺序,在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
- 提供丰富的消息拉取模式,支持拉(pull)和推(push)两种消息模式
- 单一队列百万消息的堆积能力,亿级消息堆积能力
- 支持多种消息协议,如JMS、MQTT等
- 分布式高可用的部署架构,满足至少一次消息传递语义
RocketMQ环境安装
下载地址:https://rocketmq.apache.org/dowloading/releases/
从官方下载二进制或者源码来进行使用。源码编译需要Maven3.2x,JDK8
在根目录进行打包:
mvn-Prelease-all-DskipTestscleanpackager-U
distribution/target/apache-rocketmq
文件夹中会存在一个文件夹版,zip,tar三个可运行的完整程序。
使用rocketmq-4.6.0.zip
:
- 启动名称服务mqnamesrv.cmd
- 启动数据中心mqbroker.cmd-nlocalhost:9876
SpringBoot环境中使用RocketMQ
SpringBoot入门:https://www.cnblogs.com/SimpleWu/p/10027237.html
SpringBoot常用start:https://www.cnblogs.com/SimpleWu/p/9798146.html
当前环境版本为:
-
SpringBoot2.0.6.RELEASE
-
SpringCloudFinchley.RELEASE
-
SpringCldodAlibaba0.2.1.RELEASE
-
RocketMQ4.3.0
在项目工程中导入:org.apache.rocketmq rocketmq-client ${rocketmq.version}
由于我们这边已经有工程了所以就不在进行创建这种过程了。主要是看看如何使用RocketMQ。
创建RocketMQProperties配置属性类,类中内容如下:
@ConfigurationProperties(prefix="rocketmq")
publicclassRocketMQProperties{
privatebooleanisEnable=false;
privateStringnamesrvAddr="localhost:9876";
privateStringgroupName="default";
privateintproducerMaxMessageSize=1024;
privateintproducerSendMsgTimeout=2000;
privateintproducerRetryTimesWhenSendFailed=2;
privateintconsumerConsumeThreadMin=5;
privateintconsumerConsumeThreadMax=30;
privateintconsumerConsumeMessageBatchMaxSize=1;
//省略getset
}
现在我们所有子系统中的生产者,消费者对应:
isEnable是否开启mq
namesrvAddr集群地址
groupName分组名称
设置为统一已方便系统对接,如有其它需求在进行扩展,类中我们已经给了默认值也可以在配置文件或配置中心中获取配置,配置如下:
#发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
rocketmq.groupName=please_rename_unique_group_name
#是否开启自动配置
rocketmq.isEnable=true
#mq的nameserver地址
rocketmq.namesrvAddr=127.0.0.1:9876
#消息最大长度默认1024*4(4M)
rocketmq.producer.maxMessageSize=4096
#发送消息超时时间,默认3000
rocketmq.producer.sendMsgTimeout=3000
#发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2
#消费者线程数量
rocketmq.consumer.consumeThreadMin=5
rocketmq.consumer.consumeThreadMax=32
#设置一次消费消息的条数,默认为1条
rocketmq.consumer.consumeMessageBatchMaxSize=1
创建消费者接口RocketConsumer.java该接口用户约束消费者需要的核心步骤:
/**
*消费者接口
*
*@authorSimpleWu
*
*/
publicinterfaceRocketConsumer{
/**
*初始化消费者
*/
publicabstractvoidinit();
/**
*注册监听
*
*@parammessageListener
*/
publicvoidregisterMessageListener(MessageListenermessageListener);
}
创建抽象消费者AbstractRocketConsumer.java:
/**
*消费者基本信息
*
*@authorSimpelWu
*/
publicabstractclassAbstractRocketConsumerimplementsRocketConsumer{
protectedStringtopics;
protectedStringtags;
protectedMessageListenermessageListener;
protectedStringconsumerTitel;
protectedMQPushConsumermqPushConsumer;
/**
*必要的信息
*
*@paramtopics
*@paramtags
*@paramconsumerTitel
*/
publicvoidnecessary(Stringtopics,Stringtags,StringconsumerTitel){
this.topics=topics;
this.tags=tags;
this.consumerTitel=consumerTitel;
}
publicabstractvoidinit();
@Override
publicvoidregisterMessageListener(MessageListenermessageListener){
this.messageListener=messageListener;
}
}
在类中我们必须指定这个topics,tags与消息监听逻辑
publicabstractvoidinit();
该方法是用于初始化消费者,由子类实现。
接下来我们编写自动配置类RocketMQConfiguation.java,该类用户初始化一个默认的生产者连接,以及加载所有的消费者。
@EnableConfigurationProperties({RocketMQProperties.class})使用该配置文件
@Configuration标注为配置类
@ConditionalOnProperty(prefix="rocketmq",value="isEnable",havingValue="true")只有当配置中指定rocketmq.isEnable=true的时候才会生效
核心内容如下:
/**
*mq配置
*
*@authorSimpleWu
*/
@Configuration
@EnableConfigurationProperties({RocketMQProperties.class})
@ConditionalOnProperty(prefix="rocketmq",value="isEnable",havingValue="true")
publicclassRocketMQConfiguation{
privateRocketMQPropertiesproperties;
privateApplicationContextapplicationContext;
privateLoggerlog=LoggerFactory.getLogger(RocketMQConfiguation.class);
publicRocketMQConfiguation(RocketMQPropertiesproperties,ApplicationContextapplicationContext){
this.properties=properties;
this.applicationContext=applicationContext;
}
/**
*注入一个默认的消费者
*@return
*@throwsMQClientException
*/
@Bean
publicDefaultMQProducergetRocketMQProducer()throwsMQClientException{
if(StringUtils.isEmpty(properties.getGroupName())){
thrownewMQClientException(-1,"groupNameisblank");
}
if(StringUtils.isEmpty(properties.getNamesrvAddr())){
thrownewMQClientException(-1,"nameServerAddrisblank");
}
DefaultMQProducerproducer;
producer=newDefaultMQProducer(properties.getGroupName());
producer.setNamesrvAddr(properties.getNamesrvAddr());
//producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
//如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
//producer.setInstanceName(instanceName);
producer.setMaxMessageSize(properties.getProducerMaxMessageSize());
producer.setSendMsgTimeout(properties.getProducerSendMsgTimeout());
//如果发送消息失败,设置重试次数,默认为2次
producer.setRetryTimesWhenSendFailed(properties.getProducerRetryTimesWhenSendFailed());
try{
producer.start();
log.info("producerisstart!groupName:{},namesrvAddr:{}",properties.getGroupName(),
properties.getNamesrvAddr());
}catch(MQClientExceptione){
log.error(String.format("produceriserror{}",e.getMessage(),e));
throwe;
}
returnproducer;
}
/**
*SpringBoot启动时加载所有消费者
*/
@PostConstruct
publicvoidinitConsumer(){
Map<String,AbstractRocketConsumer>consumers=applicationContext.getBeansOfType(AbstractRocketConsumer.class);
if(consumers==null||consumers.size()==0){
log.info("initrocketconsumer0");
}
Iterator<String>beans=consumers.keySet().iterator();
while(beans.hasNext()){
StringbeanName=(String)beans.next();
AbstractRocketConsumerconsumer=consumers.get(beanName);
consumer.init();
createConsumer(consumer);
log.info("initsuccessconsumertitle{},toips{},tags{}",consumer.consumerTitel,consumer.tags,
consumer.topics);
}
}
/**
*通过消费者信心创建消费者
*
*@paramconsumerPojo
*/
publicvoidcreateConsumer(AbstractRocketConsumerarc){
DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer(this.properties.getGroupName());
consumer.setNamesrvAddr(this.properties.getNamesrvAddr());
consumer.setConsumeThreadMin(this.properties.getConsumerConsumeThreadMin());
consumer.setConsumeThreadMax(this.properties.getConsumerConsumeThreadMax());
consumer.registerMessageListener(arc.messageListenerConcurrently);
/**
*设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费如果非第一次启动,那么按照上次消费的位置继续消费
*/
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/**
*设置消费模型,集群还是广播,默认为集群
*/
//consumer.setMessageModel(MessageModel.CLUSTERING);
/**
*设置一次消费消息的条数,默认为1条
*/
consumer.setConsumeMessageBatchMaxSize(this.properties.getConsumerConsumeMessageBatchMaxSize());
try{
consumer.subscribe(arc.topics,arc.tags);
consumer.start();
arc.mqPushConsumer=consumer;
}catch(MQClientExceptione){
log.error("infoconsumertitle{}",arc.consumerTitel,e);
}
}
}
然后在src/main/resources文件夹中创建目录与文件META-INF/spring.factories里面添加自动配置类即可开启启动配置,我们只需要导入依赖即可:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.xcloud.config.rocketmq.RocketMQConfiguation
接下来在服务中导入依赖,然后通过我们的抽象类获取所有必要信息对消费者进行创建,该步骤会在所有消费者初始化完成后进行,且只会管理是SpringBean的消费者。
下面我们看看如何创建一个消费者,创建消费者的步骤非常简单,只需要继承AbstractRocketConsumer然后再加上Spring的@Component就能够完成消费者的创建,我们可以在类中自定义消费的主题与标签。
在项目可以根据需求当消费者创建失败的时候是否继续启动工程。
创建一个默认的消费者DefaultConsumerMQ.java
@Component
publicclassDefaultConsumerMQextendsAbstractRocketConsumer{
/**
*初始化消费者
*/
@Override
publicvoidinit(){
//设置主题,标签与消费者标题
super.necessary("TopicTest","*","这是标题");
//消费者具体执行逻辑
registerMessageListener(newMessageListenerConcurrently(){
@Override
publicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt>msgs,ConsumeConcurrentlyContextcontext){
msgs.forEach(msg->{
System.out.printf("consumermessageboyd%s%n",newString(msg.getBody()));
});
//标记该消息已经被成功消费
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
}
super.necessary("TopicTest","*","这是标题");是必须要设置的,代表该消费者监听TopicTest主题下所有tags,标题那个字段是我自己定义的,所以对于该配置来说没什么意义。
我们可以在这里注入Spring的Bean来进行任意逻辑处理。
创建一个消息发送类进行测试
@Override
publicStringqmtest(@PathVariable("name")Stringname)throwsMQClientException,RemotingException,MQBrokerException,InterruptedException,UnsupportedEncodingException{
Messagemsg=newMessage("TopicTest","tags1",name.getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送消息到一个Broker
SendResultsendResult=defaultMQProducer.send(msg);
//通过sendResult返回消息是否成功送达
System.out.printf("%s%n",sendResult);
returnnull;
}
我们来通过Http请求测试:
http://localhost:10001/demo/base/mq/helloconsumermessageboydhello
http://localhost:10001/demo/base/mq/嘿嘿嘿嘿嘿consumermessageboyd嘿嘿嘿嘿嘿
好了到这里简单的start算是设计完成了,后面还有一些:顺序消息生产,顺序消费消息,异步消息生产等一系列功能,官人可参照官方去自行处理。
- ActiveMQ没经过大规模吞吐量场景的验证,社区不高不活跃。
- RabbitMQ集群动态扩展麻烦,且与当前程序语言不至于难以定制化。
- kafka支持主要的MQ功能,功能无法达到程序需求的要求,所以不使用,且与当前程序语言不至于难以定制化。
- rocketMQMQ功能较为完善,还是分布式的,扩展性好;支持复杂MQ业务场景。(业务复杂可做首选)
本文内容总结:SpringBoot如何优雅的使用RocketMQ,
原文链接:https://www.cnblogs.com/SimpleWu/p/12112351.html