nodejs redis 发布订阅机制封装实现方法及实例代码
nodejsredis发布订阅机制封装
最近项目使用redis,对publish和subscribe的使用进行了了解,并进行了封装。
varconfig=require('../config/config');
varlog=require("./loghelp");
varredis=require("redis");
functioninitialclient(param){
varoption={host:config.redis.host,port:config.redis.port};
if(param)
{
option=Object.assign(option,param);
}
redis.print
letclient=redis.createClient(option);
client.on("error",function(err){
log.error(err);
});
returnclient;
}
/*example:
*letchannel="ryan";
redis.pubSub.registerHandlers("ryan",msg=>console.log(msg));
redis.pubSub.subscribe(channel);
redis.pubSub.publish(channel,"hellofromchen");*/
classPubSub
{
constructor(){
this.sub=initialclient();
this.handlers=newMap();
this.subAction=(channle,message)=>{
letactions=this.handlers.get(channle)||newSet();
for(letactionofactions)
{
action(message);
}
}
this.alredyPublishs=[];
this.subConnected=false;
}
publish(channel,message)
{
letaction=()=>{
letpub=initialclient();
pub.publish(channel,message);
};
if(this.subConnected===false)
{
this.alredyPublishs.push(action);
}
else
action();
}
registerHandlers(channel,action)
{
varactions=this.handlers.get(channel)||newSet();
actions.add(action);
this.handlers.set(channel,actions);
}
subscribe(channel)
{
letself=this;
this.sub.subscribe(channel,function(err,reply){
if(err)
log.error(err);
self.subConnected=true;
for(letpublishofself.alredyPublishs)
publish();
console.log(reply);
});
this.sub.on("message",function(channel,message){
self.subAction(channel,message);
});
}
tearDown()
{
this.sub.quit();
}
}
然后通过exports.pubsub=newPubSub()将其暴漏,可保证是单例。在程序启动时,调用
registerHandlers 注册特定通道的处理逻辑,然后调用
subscribe 订阅通道。
在合适时机调用publish,这个机制可以实现分布式下所有客户端watch同一个数据的更改。
本人全手工打造的dotnetcorewebapi框架,可实现快速开发。
地址:http://xiazai.jb51.net/201612/yuanma/WebApiCore-master(jb51.net).rar。
1采用DDD模式开发,充血模型2添加Dapper扩展,默认实现增删改查基本操作。利用AutoMapper做实体转换,减少重复劳动。3依赖注入融合Autofac,仓储层和应用层自动注入4实现JWT验证5加入swagger文档6单元测试添加了xunit,MyMvc可以方便对webapi测试7数据库版本控制
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!