Spring Boot教程之利用ActiveMQ实现延迟消息
一、安装activeMQ
Linux环境ActiveMQ部署方法:https://www.nhooo.com/article/162320.htm
安装步骤参照上面这篇文章,本文不做介绍
Windows下安装ActiveMQ:
到官网(http://activemq.apache.org/download-archives.html)下载最新发布的压缩包(我下的是5.15.9)到本地后解压(我解压到D盘Dev目录下)即可。进入解压后的bin目录,我是64位机器,再进入win64目录后,双击activemq.bat启动:
wrapper|-->WrapperStartedasConsole wrapper|LaunchingaJVM... jvm1|Wrapper(Version3.2.3)http://wrapper.tanukisoftware.org jvm1|Copyright1999-2006TanukiSoftware,Inc.AllRightsReserved. jvm1| jvm1|JavaRuntime:OracleCorporation1.8.0_181C:\ProgramFiles\Java\jre1.8.0_181 jvm1|Heapsizes:current=125952kfree=115299kmax=932352k jvm1|JVMargs:-Dactivemq.home=../..-Dactivemq.base=../..-Djavax.net.ssl.keyStorePassword=password-Djavax.net.ssl.trustStorePassword=password-Djavax.net.ssl.keyStore=../../conf/broker.ks-Djavax.net.ssl.trustStore=../../conf/broker.ts-Dcom.sun.management.jmxremote-Dorg.apache.activemq.UseDedicatedTaskRunner=true-Djava.util.logging.config.file=logging.properties-Dactivemq.conf=../../conf-Dactivemq.data=../../data-Djava.security.auth.login.config=../../conf/login.config-Xmx1024m-Djava.library.path=../../bin/win64-Dwrapper.key=mChNCWMZ2FoXhZ9g-Dwrapper.port=32000-Dwrapper.jvm.port.min=31000-Dwrapper.jvm.port.max=31999-Dwrapper.pid=3500-Dwrapper.version=3.2.3-Dwrapper.native_library=wrapper-Dwrapper.cpu.timeout=10-Dwrapper.jvmid=1 jvm1|Extensionsclasspath: jvm1|[..\..\lib,..\..\lib\camel,..\..\lib\optional,..\..\lib\web,..\..\lib\extra] jvm1|ACTIVEMQ_HOME:..\.. jvm1|ACTIVEMQ_BASE:..\.. jvm1|ACTIVEMQ_CONF:..\..\conf jvm1|ACTIVEMQ_DATA:..\..\data jvm1|Loadingmessagebrokerfrom:xbean:activemq.xml jvm1|INFO|Refreshingorg.apache.activemq.xbean.XBeanBrokerFactory$1@f0ef68d:startupdate[FriMay2415:16:21CST2019];rootofcontexthierarchy jvm1|INFO|UsingPersistenceAdapter:KahaDBPersistenceAdapter[D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\kahadb] jvm1|INFO|PListStore:[D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\localhost\tmp_storage]started jvm1|INFO|ApacheActiveMQ5.15.9(localhost,ID:wulf00-51057-1558682182909-0:1)isstarting jvm1|INFO|Listeningforconnectionsat:tcp://wulf00:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600 jvm1|INFO|Connectoropenwirestarted jvm1|INFO|Listeningforconnectionsat:amqp://wulf00:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600 jvm1|INFO|Connectoramqpstarted jvm1|INFO|Listeningforconnectionsat:stomp://wulf00:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600 jvm1|INFO|Connectorstompstarted jvm1|INFO|Listeningforconnectionsat:mqtt://wulf00:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600 jvm1|INFO|Connectormqttstarted jvm1|INFO|StartingJettyserver jvm1|INFO|CreatingJettyconnector jvm1|WARN|ServletContext@o.e.j.s.ServletContextHandler@17bc7c8a{/,null,STARTING}hasuncoveredhttpmethodsforpath:/ jvm1|INFO|Listeningforconnectionsatws://wulf00:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600 jvm1|INFO|Connectorwsstarted jvm1|INFO|ApacheActiveMQ5.15.9(localhost,ID:wulf00-51057-1558682182909-0:1)started jvm1|INFO|Forhelpormoreinformationpleasesee:http://activemq.apache.org jvm1|WARN|Storelimitis102400mb(currentstoreusageis0mb).Thedatadirectory:D:\Dev\apache-activemq-5.15.9\bin\win64\..\..\data\kahadbonlyhas92649mbofusablespace.-resettingtomaximumavailablediskspace:92649mb jvm1|INFO|NoSpringWebApplicationInitializertypesdetectedonclasspath jvm1|INFO|ActiveMQWebConsoleavailableathttp://0.0.0.0:8161/ jvm1|INFO|ActiveMQJolokiaRESTAPIavailableathttp://0.0.0.0:8161/api/jolokia/ jvm1|INFO|InitializingSpringFrameworkServlet'dispatcher' jvm1|INFO|NoSpringWebApplicationInitializertypesdetectedonclasspath jvm1|INFO|jolokia-agent:Usingpolicyaccessrestrictorclasspath:/jolokia-access.xml
默认端口8161,访问下http://localhost:8161/admin,用户名密码都是admin,进入控制台页面:
我们用坐上方的Queues来创建一个叫vboxlog的队列:
二、修改activeMQ配置文件
broker新增配置信息schedulerSupport="true"
">
三、创建SpringBoot工程
1、配置ActiveMQ工厂信息,信任包必须配置否则会报错
packagecom.example.demoactivemq.config; importorg.apache.activemq.ActiveMQConnectionFactory; importorg.apache.activemq.RedeliveryPolicy; importorg.springframework.beans.factory.annotation.Value; importorg.springframework.context.annotation.Bean; importorg.springframework.context.annotation.Configuration; importjava.util.ArrayList; importjava.util.List; /** *@authorshankson2019-11-12 */ @Configuration publicclassActiveMqConfig{ @Bean publicActiveMQConnectionFactoryfactory(@Value("${spring.activemq.broker-url}")Stringurl){ ActiveMQConnectionFactoryfactory=newActiveMQConnectionFactory(url); //设置信任序列化包集合 Listmodels=newArrayList<>(); models.add("com.example.demoactivemq.domain"); factory.setTrustedPackages(models); returnfactory; } }
消息实体类
packagecom.example.demoactivemq.domain; importlombok.Builder; importlombok.Data; importjava.io.Serializable; /** *@authorshankson2019-11-12 */ @Builder @Data publicclassMessageModelimplementsSerializable{ privateStringtitile; privateStringmessage; }
生产者
packagecom.example.demoactivemq.producer; importlombok.extern.slf4j.Slf4j; importorg.apache.activemq.ScheduledMessage; importorg.apache.activemq.command.ActiveMQQueue; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.boot.autoconfigure.jms.JmsProperties; importorg.springframework.jms.core.JmsMessagingTemplate; importorg.springframework.stereotype.Service; importjavax.jms.*; importjava.io.Serializable; /** *消息生产者 * *@authorshanks */ @Service @Slf4j publicclassProducer{ publicstaticfinalDestinationDEFAULT_QUEUE=newActiveMQQueue("delay.queue"); @Autowired privateJmsMessagingTemplatetemplate; /** *发送消息 * *@paramdestinationdestination是发送到的队列 *@parammessagemessage是待发送的消息 */ publicvoidsend(Destinationdestination,Tmessage){ template.convertAndSend(destination,message); } /** *延时发送 * *@paramdestination发送的队列 *@paramdata发送的消息 *@paramtime延迟时间 */ public voiddelaySend(Destinationdestination,Tdata,Longtime){ Connectionconnection=null; Sessionsession=null; MessageProducerproducer=null; //获取连接工厂 ConnectionFactoryconnectionFactory=template.getConnectionFactory(); try{ //获取连接 connection=connectionFactory.createConnection(); connection.start(); //获取session,true开启事务,false关闭事务 session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); //创建一个消息队列 producer=session.createProducer(destination); producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue()); ObjectMessagemessage=session.createObjectMessage(data); //设置延迟时间 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,time); //发送消息 producer.send(message); log.info("发送消息:{}",data); session.commit(); }catch(Exceptione){ e.printStackTrace(); }finally{ try{ if(producer!=null){ producer.close(); } if(session!=null){ session.close(); } if(connection!=null){ connection.close(); } }catch(Exceptione){ e.printStackTrace(); } } } }
消费者
packagecom.example.demoactivemq.producer; importcom.example.demoactivemq.domain.MessageModel; importlombok.extern.slf4j.Slf4j; importorg.springframework.jms.annotation.JmsListener; importorg.springframework.stereotype.Component; /** *消费者 */ @Component @Slf4j publicclassConsumer{ @JmsListener(destination="delay.queue") publicvoidreceiveQueue(MessageModelmessage){ log.info("收到消息:{}",message); } }
application.yml
spring: activemq: broker-url:tcp://localhost:61616
测试类
packagecom.example.demoactivemq; importcom.example.demoactivemq.domain.MessageModel; importcom.example.demoactivemq.producer.Producer; importorg.junit.jupiter.api.Test; importorg.junit.runner.RunWith; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.boot.test.context.SpringBootTest; importorg.springframework.test.context.junit4.SpringRunner; @SpringBootTest(classes=DemoActivemqApplication.class) @RunWith(SpringRunner.class) classDemoActivemqApplicationTests{ /** *消息生产者 */ @Autowired privateProducerproducer; /** *及时消息队列测试 */ @Test publicvoidtest(){ MessageModelmessageModel=MessageModel.builder() .message("测试消息") .titile("消息000") .build(); //发送消息 producer.send(Producer.DEFAULT_QUEUE,messageModel); } /** *延时消息队列测试 */ @Test publicvoidtest2(){ for(inti=0;i<5;i++){ MessageModelmessageModel=MessageModel.builder() .titile("延迟10秒执行") .message("测试消息"+i) .build(); //发送延迟消息 producer.delaySend(Producer.DEFAULT_QUEUE,messageModel,10000L); } try{ //休眠100秒,等等消息执行 Thread.currentThread().sleep(100000L); }catch(InterruptedExceptione){ e.printStackTrace(); } } }
执行结果
2019-11-1222:18:52.939 INFO17263---[ main]c.e.demoactivemq.producer.Producer :发送消息:MessageModel(titile=延迟10秒执行,message=测试消息0)
2019-11-1222:18:52.953 INFO17263---[ main]c.e.demoactivemq.producer.Producer :发送消息:MessageModel(titile=延迟10秒执行,message=测试消息1)
2019-11-1222:18:52.958 INFO17263---[ main]c.e.demoactivemq.producer.Producer :发送消息:MessageModel(titile=延迟10秒执行,message=测试消息2)
2019-11-1222:18:52.964 INFO17263---[ main]c.e.demoactivemq.producer.Producer :发送消息:MessageModel(titile=延迟10秒执行,message=测试消息3)
2019-11-1222:18:52.970 INFO17263---[ main]c.e.demoactivemq.producer.Producer :发送消息:MessageModel(titile=延迟10秒执行,message=测试消息4)
2019-11-1222:19:03.012 INFO17263---[enerContainer-1]c.e.demoactivemq.producer.Consumer :收到消息:MessageModel(titile=延迟10秒执行,message=测试消息0)
2019-11-1222:19:03.017 INFO17263---[enerContainer-1]c.e.demoactivemq.producer.Consumer :收到消息:MessageModel(titile=延迟10秒执行,message=测试消息1)
2019-11-1222:19:03.019 INFO17263---[enerContainer-1]c.e.demoactivemq.producer.Consumer :收到消息:MessageModel(titile=延迟10秒执行,message=测试消息2)
2019-11-1222:19:03.020 INFO17263---[enerContainer-1]c.e.demoactivemq.producer.Consumer :收到消息:MessageModel(titile=延迟10秒执行,message=测试消息3)
2019-11-1222:19:03.021 INFO17263---[enerContainer-1]c.e.demoactivemq.producer.Consumer :收到消息:MessageModel(titile=延迟10秒执行,message=测试消息4)
比你优秀的人比你还努力,你有什么资格不去奋斗!!!
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对毛票票的支持。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。