浅谈Node异步编程的机制
本文介绍了Node异步编程,分享给大家,具体如下:
目前的异步编程主要解决方案有:
- 事件发布/订阅模式
- Promise/Deferred模式
- 流程控制库
事件发布/订阅模式
Node自身提供了events模块,可以轻松实现事件的发布/订阅
//订阅 emmiter.on("event1",function(message){ console.log(message); }) //发布 emmiter.emit("event1","Iammesaage!");
侦听器可以很灵活地添加和删除,使得事件和具体处理逻辑之间可以很轻松的关联和解耦
事件发布/订阅模式常常用来解耦业务逻辑,事件发布者无需关注订阅的侦听器如何实现业务逻辑,甚至不用关注有多少个侦听器存在,数据通过消息的方式可以很灵活的进行传递。
下面的HTTP就是典型的应用场景
varreq=http.request(options,function(res){ res.on('data',function(chunk){ console.log('Body:'+chunk); }) res.on('end',function(){ //TODO }) })
如果一个事件添加了超过10个侦听器,将会得到一条警告,可以通过调用emmite.setMaxListeners(0)将这个限制去掉
继承events模块
varevents=require('events'); functionStream(){ events.EventEmiiter.call(this); } util.inherits(Stream,events.EventEmitter);
利用事件队列解决雪崩问题
所谓雪崩问题,就是在高访问量,大并发量的情况下缓存失效的情况,此时大量的请求同时融入数据库中,数据库无法同时承受如此大的查询请求,进而往前影响到网站整体的响应速度
解决方案:
varproxy=newevents.EventEmitter(); varstatus="ready"; varseletc=function(callback){ proxy.once("selected",callback);//为每次请求订阅这个查询时间,推入事件回调函数队列 if(status==='ready'){ status='pending';//设置状态为进行中以防止引起多次查询操作 db.select("SQL",function(results){ proxy.emit("selected",results);//查询操作完成后发布时间 status='ready';//重新定义为已准备状态 }) } }
多异步之间的协作方案
以上情况事件与侦听器的关系都是一对多的,但在异步编程中,也会出现事件与侦听器多对一的情况。
这里以渲染页面所需要的模板读取、数据读取和本地化资源读取为例简要介绍一下
varcount=0; varresults={}; vardone=function(key,value){ result[key]=value; count++; if(count===3){ render(results); } } fs.readFile(template_path,"utf8",function(err,template){ done('template',template) }) db.query(sql,function(err,data){ done('data',data); }) l10n.get(function(err,resources){ done('resources',resources) })
偏函数方案
varafter=function(times,callback){ varcount=0,result={}; returnfunction(key,value){ results[key]=value; count++; if(count===times){ callback(results); } } } vardone=after(times,render); varemitter=newevents.Emitter(); emitter.on('done',done);//一个侦听器 emitter.on('done',other);//如果业务增长,可以完成多对多的方案 fs.readFile(template_path,"utf8",function(err,template){ emitter.emit('done','template',template); }) db.query(sql,function(err,data){ emitter.emit('done','data',data); }) l10n.get(function(err,resources){ emitter.emit('done','resources',resources) })
引入EventProxy模块方案
varproxy=newEventProxy(); proxy.all('template','data','resources',function(template,data,resources){ //TODO }) fs.readFile(template_path,'utf8',function(err,template){ proxy.emit('template',template); }) db.query(sql,function(err,data){ proxy.emit('data',data); }) l10n.get(function(err,resources){ proxy.emit('resources',resources); })
Promise/Deferred模式
以上使用事件的方式时,执行流程都需要被预先设定,这是发布/订阅模式的运行机制所决定的。
$.get('/api',{ success:onSuccess, err:onError, complete:onComplete }) //需要严谨设置目标
那么是否有一种先执行异步调用,延迟传递处理的方式的?接下来要说的就是针对这种情况的方式:Promise/Deferred模式
Promise/A
Promise/A提议对单个异步操作做出了这样的抽象定义:
- Promise操作只会处在三种状态的一种:未完成态,完成态和失败态。
- Promise的状态只会出现从未完成态向完成态或失败态转化,不能逆反,完成态和失败态不能相互转化
- Promise的状态一旦转化,就不能被更改。
一个Promise对象只要具备then()即可
- 接受完成态、错误态的回调方法
- 可选地支持progress事件回调作为第三个方法
- then()方法只接受function对象,其余对象将被忽略
- then()方法继续返回Promise对象,以实现链式调用
通过Node的events模块来模拟一个Promise的实现
varPromise=function(){ EventEmitter.call(this) } util.inherits(Promise,EventEmitter); Promise.prototype.then=function(fulfilledHandler,errHandler,progeressHandler){ if(typeoffulfilledHandler==='function'){ this.once('success',fulfilledHandler);//实现监听对应事件 } if(typeoferrorHandler==='function'){ this.once('error',errorHandler) } if(typeofprogressHandler==='function'){ this.on('progress',progressHandler); } returnthis; }
以上通过then()将回调函数存放起来,接下来就是等待success、error、progress事件被触发,实现这个功能的对象称为Deferred对象,即延迟对象。
varDeferred=function(){ this.state='unfulfilled'; this.promise=newPromise(); } Deferred.prototype.resolve=function(obj){//当异步完成后可将resolve作为回调函数,触发相关事件 this.state='fulfilled'; this.promise.emit('success',obj); } Deferred.prototype.reject=function(err){ this.state='failed'; this.promise.emit('error',err); } Deferred.prototype.progress=function(data){ this.promise.emit('progress',data) }
因此,可以对一个典型的响应对象进行封装
res.setEncoding('utf8'); res.on('data',function(chunk){ console.log("Body:"+chunk); }) res.on('end',function(){ //done }) res.on('error',function(err){ //error }
转换成
res.then(function(){ //done },function(err){ //error },function(chunk){ console.log('Body:'+chunk); })
要完成上面的转换,首先需要对res对象进行封装,对data,end,error等事件进行promisify
varpromisify=function(res){ vardeferred=newDeferred();//创建一个延迟对象来在res的异步完成回调中发布相关事件 varresult='';//用来在progress中持续接收数据 res.on('data',function(chunk){//res的异步操作,回调中发布事件 result+=chunk; deferred.progress(chunk); }) res.on('end',function(){ deferred.resolve(result); }) res.on('error',function(err){ deferred.reject(err); }); returndeferred.promise//返回deferred.promise,让外界不能改变deferred的状态,只能让promise的then()方法去接收外界来侦听相关事件。 } promisify(res).then(function(){ //done },function(err){ //error },function(chunk){ console.log('Body:'+chunk); })
以上,它将业务中不可变的部分封装在了Deferred中,将可变的部分交给了Promise
Promise中的多异步协作
Deferred.prototype.all=function(promises){ varcount=promises.length;//记录传进的promise的个数 varthat=this;//保存调用all的对象 varresults=[];//存放所有promise完成的结果 promises.forEach(function(promise,i){//对promises逐个进行调用 promise.then(function(data){//每个promise成功之后,存放结果到result中,count--,直到所有promise被处理完了,才出发deferred的resolve方法,发布事件,传递结果出去 count--; result[i]=data; if(count===0){ that.resolve(results); } },function(err){ that.reject(err); }); }); returnthis.promise;//返回promise来让外界侦听这个deferred发布的事件。 } varpromise1=readFile('foo.txt','utf-8');//这里的文件读取已经经过promise化 varpromise2=readFile('bar.txt','utf-8'); vardeferred=newDeferred(); deferred.all([promise1,promise2]).thne(function(results){//promise1和promise2的then方法在deferred内部的all方法所调用,用于同步所有的promise //TODO },function(err){ //TODO })
支持序列执行的Promise
尝试改造一下代码以实现链式调用
varDeferred=function(){ this.promise=newPromise() } //完成态 Deferred.prototype.resolve=function(obj){ varpromise=this.promise; varhandler; while((handler=promise.queue.shift())){ if(handler&&handler.fulfilled){ varret=handler.fulfilled(obj); if(ret&&ret.isPromise){ ret.queue=promise.queue; this.promise=ret; return; } } } } //失败态 Deferred.prototype.reject=function(err){ varpromise=this.promise; varhandler; while((handler=promise.queue.shift())){ if(handler&&handler.error){ varret=handler.error(err); if(ret&&ret.isPromise){ ret.queue=promise.queue; this.promise=ret; return } } } } //生成回调函数 Deferred.prototype.callback=function(){ varthat=this; returnfunction(err,file){ if(err){ returnthat.reject(err); } that.resolve(file) } } varPromise=function(){ this.queue=[];//队列用于存储待执行的回到函数 this.isPromise=true; }; Promise.prototype.then=function(fulfilledHandler,errorHandler,progressHandler){ varhandler={}; if(typeoffulfilledHandler==='function'){ handler.fulfilled=fulfilledHandler; } if(typeoferrorHandler==='function'){ handler.error=errorHandler; } this.queue.push(handler); returnthis; } varreadFile1=function(file,encoding){ vardeferred=newDeferred(); fs.readFile(file,encoding,deferred.callback()); returndeferred.promise; } varreadFile2=function(file,encoding){ vardeferred=newDeferred(); fs.readFile(file,encoding,deferred.callback()); returndeferred.promise; } readFile1('file1.txt','utf8').then(function(file1){ returnreadFile2(file1.trim(),'utf8') }).then(function(file2){ console.log(file2) })
流程控制库另外进行总结
参考《深入浅出node.js》一书,想学学习可以下载电子书,下载地址:https://www.nhooo.com/books/481114.html
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。