redis发布订阅Java代码实现过程解析
前言
Redis除了可以用作缓存数据外,另一个重要用途是它实现了发布订阅(pub/sub)消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
为了实现redis的发布订阅机制,首先要打开redis服务;其次,引入redis需要的jar包,在pom.xml配置文件加入以下代码:
redis.clients jedis 2.1.0
由于订阅消息通道需要再tomcat启动时触发,因此,需要创建一个listener监听器,在监听器里实现redis订阅,在web.xml里配置监听器如下:
com.test.listener.InitListener
一、订阅消息(InitListener实现)
redis支持多通道订阅,一个客户端可以同时订阅多个消息通道,如下代码所示,订阅了13个通道。由于订阅机制是线程阻塞的,需要额外开启一个线程专门用于处理订阅消息及接收消息处理。
publicclassInitListenerimplementsServletContextListener{ privateLoggerlogger=Logger.getLogger(InitListener.class); @Override publicvoidcontextInitialized(ServletContextEventsce){ logger.info("启动tomcat");//连接redis MapproMap=PropertyReader.getProperties(); finalStringurl=proMap.get("redis.host"); finalIntegerport=Integer.parseInt(proMap.get("redis.port")); finalClassPathXmlApplicationContextclassPathXmlApplicationContext=newClassPathXmlApplicationContext("classpath*:applicationContext.xml"); finalRedisSubListenerredisSubListener=(RedisSubListener)classPathXmlApplicationContext.getBean("redisSubListener"); //为防止阻塞tomcat启动,开启线程执行 newThread(newRunnable(){ publicvoidrun(){ //连接redis,建立监听 Jedisjedis=null; while(true){ //解码资源更新通知,画面选看回复,画面选看停止回复,预案启动,预案停止,轮切启动,轮切停止,预案启动回复,预案停止回复,轮切启动回复,轮切停止回复,监视屏分屏状态通知,画面状态通知 String[]channels=newString[]{"decodeResourceUpdateNtf","tvSplitPlayRsp","tvSplitPlayStopRsp", "planStartStatusNtf","planStopStatusNtf","pollStartStatusNtf","pollStopStatusNtf", "planStartRsp","planStopRsp","pollStartRsp","pollStopRsp","tvSplitTypeNtf","tvSplitStatusNtf"}; try{ jedis=newJedis(url,port); logger.info("redis请求订阅通道"); jedis.subscribe(redisSubListener,channels); logger.info("redis订阅结束"); }catch(JedisConnectionExceptione){ logger.error("Jedis连接异常,异常信息:"+e); }catch(IllegalStateExceptione){ logger.error("Jedis异常,异常信息:"+e); } try{ Thread.sleep(1000); }catch(InterruptedExceptione){ e.printStackTrace(); } if(jedis!=null){ jedis=null; } } }}) .start(); }
最后在spring配置文件里接入以下配置:
RedisMessageServiceImpl用于处理接收的redis消息。
二、发布消息
publicclassRedisPublishUtil{ privateLoggerlogger=Logger.getLogger(RedisPublishUtil.class); publicstaticJedispubJedis; privatestaticMapproMap=PropertyReader.getProperties(); privatestaticfinalStringredisPort=proMap.get("redis.port"); privatestaticStringurl=proMap.get("redis.host"); privatestaticfinalintport=Integer.parseInt(redisPort); publicvoidsetPubJedis(Jedisjedis){ RedisPublishUtil.pubJedis=jedis; } publicJedisgetPubJedis(){ if(pubJedis==null){ createJedisConnect(); } //返回对象 returnpubJedis; } publicJediscreateJedisConnect(){ //连接redis logger.info("===创建连接jedis====="); try{ pubJedis=newJedis(url,port); }catch(JedisConnectionExceptione){ logger.error("Jedis连接异常,异常信息:"+e.getMessage()); try{ Thread.sleep(1000); logger.info("发起重新连接jedis"); createJedisConnect(); }catch(InterruptedExceptionexcept){ except.printStackTrace(); } } //返回对象 returnpubJedis; } //公共发布接口 publicvoidpubRedisMsg(StringmsgType,Stringmsg){ logger.info("redis准备发布消息内容:"+msg); try{ this.getPubJedis().publish(msgType,msg); }catch(JedisConnectionExceptione){ logger.error("redis发布消息失败!",e); this.setPubJedis(null); logger.info("重新发布消息,channel="+msgType); pubRedisMsg(msgType,msg); } } }
publicclassPropertyReader{ privatestaticLoggerlogger=Logger.getLogger(PropertyReader.class); /* *获得数据库链接的配置文件 */ publicstaticMapgetProperties(){ logger.info("读取redis配置文件开始。。。"); Propertiesprop=newProperties(); Map proMap=newHashMap (); try{ //读取属性文件redis.properties InputStreamin=PropertyReader.class.getClassLoader().getResourceAsStream("redis.properties"); prop.load(in);///加载属性列表 Iterator it=prop.stringPropertyNames().iterator(); while(it.hasNext()){ Stringkey=it.next(); proMap.put(key,prop.getProperty(key)); } in.close(); logger.info("读取redis配置文件成功。。。"); }catch(Exceptione){ logger.error("读取redis配置文件异常!",e); e.printStackTrace(); } returnproMap; } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。