Spring Boot教程之利用ActiveMQ实现延迟消息
一、安装activeMQ
Linux环境ActiveMQ部署方法:https://www.nhooo.com/article/162320.htm
安装步骤参照上面这篇文章,本文不做介绍
Windows下安装ActiveMQ:
到官网(http://activemq.apache.org/download-archives.html)下载最新发布的压缩包(我下的是5.15.9)到本地后解压(我解压到D盘Dev目录下)即可。进入解压后的bin目录,我是64位机器,再进入win64目录后,双击activemq.bat启动:
wrapper|-->WrapperStartedasConsole
wrapper|LaunchingaJVM...
jvm1|Wrapper(Version3.2.3)http://wrapper.tanukisoftware.org
jvm1|Copyright1999-2006TanukiSoftware,Inc.AllRightsReserved.
jvm1|
jvm1|JavaRuntime:OracleCorporation1.8.0_181C:\ProgramFiles\Java\jre1.8.0_181
jvm1|Heapsizes:current=125952kfree=115299kmax=932352k
jvm1|JVMargs:-Dactivemq.home=../..-Dactivemq.base=../..-Djavax.net.ssl.keyStorePassword=password-Djavax.net.ssl.trustStorePassword=password-Djavax.net.ssl.keyStore=../../conf/broker.ks-Djavax.net.ssl.trustStore=../../conf/broker.ts-Dcom.sun.management.jmxremote-Dorg.apache.activemq.UseDedicatedTaskRunner=true-Djava.util.logging.config.file=logging.properties-Dactivemq.conf=../../conf-Dactivemq.data=../../data-Djava.security.auth.login.config=../../conf/login.config-Xmx1024m-Djava.library.path=../../bin/win64-Dwrapper.key=mChNCWMZ2FoXhZ9g-Dwrapper.port=32000-Dwrapper.jvm.port.min=31000-Dwrapper.jvm.port.max=31999-Dwrapper.pid=3500-Dwrapper.version=3.2.3-Dwrapper.native_library=wrapper-Dwrapper.cpu.timeout=10-Dwrapper.jvmid=1
jvm1|Extensionsclasspath:
jvm1|[..\..\lib,..\..\lib\camel,..\..\lib\optional,..\..\lib\web,..\..\lib\extra]
jvm1|ACTIVEMQ_HOME:..\..
jvm1|ACTIVEMQ_BASE:..\..
jvm1|ACTIVEMQ_CONF:..\..\conf
jvm1|ACTIVEMQ_DATA:..\..\data
jvm1|Loadingmessagebrokerfrom:xbean:activemq.xml
jvm1|INFO|Refreshingorg.apache.activemq.xbean.XBeanBrokerFactory$1@f0ef68d:startupdate[FriMay2415:16:21CST2019];rootofcontexthierarchy
jvm1|INFO|UsingPersistenceAdapter:KahaDBPersistenceAdapter[D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\kahadb]
jvm1|INFO|PListStore:[D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\localhost\tmp_storage]started
jvm1|INFO|ApacheActiveMQ5.15.9(localhost,ID:wulf00-51057-1558682182909-0:1)isstarting
jvm1|INFO|Listeningforconnectionsat:tcp://wulf00:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm1|INFO|Connectoropenwirestarted
jvm1|INFO|Listeningforconnectionsat:amqp://wulf00:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm1|INFO|Connectoramqpstarted
jvm1|INFO|Listeningforconnectionsat:stomp://wulf00:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm1|INFO|Connectorstompstarted
jvm1|INFO|Listeningforconnectionsat:mqtt://wulf00:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm1|INFO|Connectormqttstarted
jvm1|INFO|StartingJettyserver
jvm1|INFO|CreatingJettyconnector
jvm1|WARN|ServletContext@o.e.j.s.ServletContextHandler@17bc7c8a{/,null,STARTING}hasuncoveredhttpmethodsforpath:/
jvm1|INFO|Listeningforconnectionsatws://wulf00:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600
jvm1|INFO|Connectorwsstarted
jvm1|INFO|ApacheActiveMQ5.15.9(localhost,ID:wulf00-51057-1558682182909-0:1)started
jvm1|INFO|Forhelpormoreinformationpleasesee:http://activemq.apache.org
jvm1|WARN|Storelimitis102400mb(currentstoreusageis0mb).Thedatadirectory:D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\kahadbonlyhas92649mbofusablespace.-resettingtomaximumavailablediskspace:92649mb
jvm1|INFO|NoSpringWebApplicationInitializertypesdetectedonclasspath
jvm1|INFO|ActiveMQWebConsoleavailableathttp://0.0.0.0:8161/
jvm1|INFO|ActiveMQJolokiaRESTAPIavailableathttp://0.0.0.0:8161/api/jolokia/
jvm1|INFO|InitializingSpringFrameworkServlet'dispatcher'
jvm1|INFO|NoSpringWebApplicationInitializertypesdetectedonclasspath
jvm1|INFO|jolokia-agent:Usingpolicyaccessrestrictorclasspath:/jolokia-access.xml
默认端口8161,访问下http://localhost:8161/admin,用户名密码都是admin,进入控制台页面:
我们用坐上方的Queues来创建一个叫vboxlog的队列:
二、修改activeMQ配置文件
broker新增配置信息schedulerSupport="true"
">
三、创建SpringBoot工程
1、配置ActiveMQ工厂信息,信任包必须配置否则会报错
packagecom.example.demoactivemq.config;
importorg.apache.activemq.ActiveMQConnectionFactory;
importorg.apache.activemq.RedeliveryPolicy;
importorg.springframework.beans.factory.annotation.Value;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importjava.util.ArrayList;
importjava.util.List;
/**
*@authorshankson2019-11-12
*/
@Configuration
publicclassActiveMqConfig{
@Bean
publicActiveMQConnectionFactoryfactory(@Value("${spring.activemq.broker-url}")Stringurl){
ActiveMQConnectionFactoryfactory=newActiveMQConnectionFactory(url);
//设置信任序列化包集合
Listmodels=newArrayList<>();
models.add("com.example.demoactivemq.domain");
factory.setTrustedPackages(models);
returnfactory;
}
}
消息实体类
packagecom.example.demoactivemq.domain;
importlombok.Builder;
importlombok.Data;
importjava.io.Serializable;
/**
*@authorshankson2019-11-12
*/
@Builder
@Data
publicclassMessageModelimplementsSerializable{
privateStringtitile;
privateStringmessage;
}
生产者
packagecom.example.demoactivemq.producer;
importlombok.extern.slf4j.Slf4j;
importorg.apache.activemq.ScheduledMessage;
importorg.apache.activemq.command.ActiveMQQueue;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.boot.autoconfigure.jms.JmsProperties;
importorg.springframework.jms.core.JmsMessagingTemplate;
importorg.springframework.stereotype.Service;
importjavax.jms.*;
importjava.io.Serializable;
/**
*消息生产者
*
*@authorshanks
*/
@Service
@Slf4j
publicclassProducer{
publicstaticfinalDestinationDEFAULT_QUEUE=newActiveMQQueue("delay.queue");
@Autowired
privateJmsMessagingTemplatetemplate;
/**
*发送消息
*
*@paramdestinationdestination是发送到的队列
*@parammessagemessage是待发送的消息
*/
publicvoidsend(Destinationdestination,Tmessage){
template.convertAndSend(destination,message);
}
/**
*延时发送
*
*@paramdestination发送的队列
*@paramdata发送的消息
*@paramtime延迟时间
*/
publicvoiddelaySend(Destinationdestination,Tdata,Longtime){
Connectionconnection=null;
Sessionsession=null;
MessageProducerproducer=null;
//获取连接工厂
ConnectionFactoryconnectionFactory=template.getConnectionFactory();
try{
//获取连接
connection=connectionFactory.createConnection();
connection.start();
//获取session,true开启事务,false关闭事务
session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//创建一个消息队列
producer=session.createProducer(destination);
producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
ObjectMessagemessage=session.createObjectMessage(data);
//设置延迟时间
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,time);
//发送消息
producer.send(message);
log.info("发送消息:{}",data);
session.commit();
}catch(Exceptione){
e.printStackTrace();
}finally{
try{
if(producer!=null){
producer.close();
}
if(session!=null){
session.close();
}
if(connection!=null){
connection.close();
}
}catch(Exceptione){
e.printStackTrace();
}
}
}
}
消费者
packagecom.example.demoactivemq.producer;
importcom.example.demoactivemq.domain.MessageModel;
importlombok.extern.slf4j.Slf4j;
importorg.springframework.jms.annotation.JmsListener;
importorg.springframework.stereotype.Component;
/**
*消费者
*/
@Component
@Slf4j
publicclassConsumer{
@JmsListener(destination="delay.queue")
publicvoidreceiveQueue(MessageModelmessage){
log.info("收到消息:{}",message);
}
}
application.yml
spring: activemq: broker-url:tcp://localhost:61616
测试类
packagecom.example.demoactivemq;
importcom.example.demoactivemq.domain.MessageModel;
importcom.example.demoactivemq.producer.Producer;
importorg.junit.jupiter.api.Test;
importorg.junit.runner.RunWith;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.boot.test.context.SpringBootTest;
importorg.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes=DemoActivemqApplication.class)
@RunWith(SpringRunner.class)
classDemoActivemqApplicationTests{
/**
*消息生产者
*/
@Autowired
privateProducerproducer;
/**
*及时消息队列测试
*/
@Test
publicvoidtest(){
MessageModelmessageModel=MessageModel.builder()
.message("测试消息")
.titile("消息000")
.build();
//发送消息
producer.send(Producer.DEFAULT_QUEUE,messageModel);
}
/**
*延时消息队列测试
*/
@Test
publicvoidtest2(){
for(inti=0;i<5;i++){
MessageModelmessageModel=MessageModel.builder()
.titile("延迟10秒执行")
.message("测试消息"+i)
.build();
//发送延迟消息
producer.delaySend(Producer.DEFAULT_QUEUE,messageModel,10000L);
}
try{
//休眠100秒,等等消息执行
Thread.currentThread().sleep(100000L);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
执行结果
2019-11-1222:18:52.939 INFO17263---[ main]c.e.demoactivemq.producer.Producer :发送消息:MessageModel(titile=延迟10秒执行,message=测试消息0)
2019-11-1222:18:52.953 INFO17263---[ main]c.e.demoactivemq.producer.Producer :发送消息:MessageModel(titile=延迟10秒执行,message=测试消息1)
2019-11-1222:18:52.958 INFO17263---[ main]c.e.demoactivemq.producer.Producer :发送消息:MessageModel(titile=延迟10秒执行,message=测试消息2)
2019-11-1222:18:52.964 INFO17263---[ main]c.e.demoactivemq.producer.Producer :发送消息:MessageModel(titile=延迟10秒执行,message=测试消息3)
2019-11-1222:18:52.970 INFO17263---[ main]c.e.demoactivemq.producer.Producer :发送消息:MessageModel(titile=延迟10秒执行,message=测试消息4)
2019-11-1222:19:03.012 INFO17263---[enerContainer-1]c.e.demoactivemq.producer.Consumer :收到消息:MessageModel(titile=延迟10秒执行,message=测试消息0)
2019-11-1222:19:03.017 INFO17263---[enerContainer-1]c.e.demoactivemq.producer.Consumer :收到消息:MessageModel(titile=延迟10秒执行,message=测试消息1)
2019-11-1222:19:03.019 INFO17263---[enerContainer-1]c.e.demoactivemq.producer.Consumer :收到消息:MessageModel(titile=延迟10秒执行,message=测试消息2)
2019-11-1222:19:03.020 INFO17263---[enerContainer-1]c.e.demoactivemq.producer.Consumer :收到消息:MessageModel(titile=延迟10秒执行,message=测试消息3)
2019-11-1222:19:03.021 INFO17263---[enerContainer-1]c.e.demoactivemq.producer.Consumer :收到消息:MessageModel(titile=延迟10秒执行,message=测试消息4)
比你优秀的人比你还努力,你有什么资格不去奋斗!!!
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对毛票票的支持。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。