redis发布订阅Java代码实现过程解析
前言
Redis除了可以用作缓存数据外,另一个重要用途是它实现了发布订阅(pub/sub)消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
为了实现redis的发布订阅机制,首先要打开redis服务;其次,引入redis需要的jar包,在pom.xml配置文件加入以下代码:
redis.clients jedis 2.1.0
由于订阅消息通道需要再tomcat启动时触发,因此,需要创建一个listener监听器,在监听器里实现redis订阅,在web.xml里配置监听器如下:
com.test.listener.InitListener
一、订阅消息(InitListener实现)
redis支持多通道订阅,一个客户端可以同时订阅多个消息通道,如下代码所示,订阅了13个通道。由于订阅机制是线程阻塞的,需要额外开启一个线程专门用于处理订阅消息及接收消息处理。
publicclassInitListenerimplementsServletContextListener{
privateLoggerlogger=Logger.getLogger(InitListener.class);
@Override
publicvoidcontextInitialized(ServletContextEventsce){
logger.info("启动tomcat");//连接redis
MapproMap=PropertyReader.getProperties();
finalStringurl=proMap.get("redis.host");
finalIntegerport=Integer.parseInt(proMap.get("redis.port"));
finalClassPathXmlApplicationContextclassPathXmlApplicationContext=newClassPathXmlApplicationContext("classpath*:applicationContext.xml");
finalRedisSubListenerredisSubListener=(RedisSubListener)classPathXmlApplicationContext.getBean("redisSubListener");
//为防止阻塞tomcat启动,开启线程执行
newThread(newRunnable(){
publicvoidrun(){
//连接redis,建立监听
Jedisjedis=null;
while(true){
//解码资源更新通知,画面选看回复,画面选看停止回复,预案启动,预案停止,轮切启动,轮切停止,预案启动回复,预案停止回复,轮切启动回复,轮切停止回复,监视屏分屏状态通知,画面状态通知
String[]channels=newString[]{"decodeResourceUpdateNtf","tvSplitPlayRsp","tvSplitPlayStopRsp",
"planStartStatusNtf","planStopStatusNtf","pollStartStatusNtf","pollStopStatusNtf",
"planStartRsp","planStopRsp","pollStartRsp","pollStopRsp","tvSplitTypeNtf","tvSplitStatusNtf"};
try{
jedis=newJedis(url,port);
logger.info("redis请求订阅通道");
jedis.subscribe(redisSubListener,channels);
logger.info("redis订阅结束");
}catch(JedisConnectionExceptione){
logger.error("Jedis连接异常,异常信息:"+e);
}catch(IllegalStateExceptione){
logger.error("Jedis异常,异常信息:"+e);
}
try{
Thread.sleep(1000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
if(jedis!=null){
jedis=null;
}
}
}})
.start();
}
最后在spring配置文件里接入以下配置:
RedisMessageServiceImpl用于处理接收的redis消息。
二、发布消息
publicclassRedisPublishUtil{
privateLoggerlogger=Logger.getLogger(RedisPublishUtil.class);
publicstaticJedispubJedis;
privatestaticMapproMap=PropertyReader.getProperties();
privatestaticfinalStringredisPort=proMap.get("redis.port");
privatestaticStringurl=proMap.get("redis.host");
privatestaticfinalintport=Integer.parseInt(redisPort);
publicvoidsetPubJedis(Jedisjedis){
RedisPublishUtil.pubJedis=jedis;
}
publicJedisgetPubJedis(){
if(pubJedis==null){
createJedisConnect();
}
//返回对象
returnpubJedis;
}
publicJediscreateJedisConnect(){
//连接redis
logger.info("===创建连接jedis=====");
try{
pubJedis=newJedis(url,port);
}catch(JedisConnectionExceptione){
logger.error("Jedis连接异常,异常信息:"+e.getMessage());
try{
Thread.sleep(1000);
logger.info("发起重新连接jedis");
createJedisConnect();
}catch(InterruptedExceptionexcept){
except.printStackTrace();
}
}
//返回对象
returnpubJedis;
}
//公共发布接口
publicvoidpubRedisMsg(StringmsgType,Stringmsg){
logger.info("redis准备发布消息内容:"+msg);
try{
this.getPubJedis().publish(msgType,msg);
}catch(JedisConnectionExceptione){
logger.error("redis发布消息失败!",e);
this.setPubJedis(null);
logger.info("重新发布消息,channel="+msgType);
pubRedisMsg(msgType,msg);
}
}
}
publicclassPropertyReader{
privatestaticLoggerlogger=Logger.getLogger(PropertyReader.class);
/*
*获得数据库链接的配置文件
*/
publicstaticMapgetProperties(){
logger.info("读取redis配置文件开始。。。");
Propertiesprop=newProperties();
MapproMap=newHashMap();
try{
//读取属性文件redis.properties
InputStreamin=PropertyReader.class.getClassLoader().getResourceAsStream("redis.properties");
prop.load(in);///加载属性列表
Iteratorit=prop.stringPropertyNames().iterator();
while(it.hasNext()){
Stringkey=it.next();
proMap.put(key,prop.getProperty(key));
}
in.close();
logger.info("读取redis配置文件成功。。。");
}catch(Exceptione){
logger.error("读取redis配置文件异常!",e);
e.printStackTrace();
}
returnproMap;
}
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。