浅谈Node.js之异步流控制
前言
在没有深度使用函数回调的经验的时候,去看这些内容还是有一点吃力的。由于Node.js独特的异步特性,才出现了“回调地狱”的问题,这篇文章中,我比较详细的记录了如何解决异步流问题。
文章会很长,而且这篇是对异步流模式的解释。文中会使用一个简单的网络蜘蛛的例子,它的作用是抓取指定URL的网页内容并保存在项目中,在文章的最后,可以找到整篇文章中的源码demo。
1.原生JavaScript模式
本篇不针对初学者,因此会省略掉大部分的基础内容的讲解:
(spider_v1.js)
constrequest=require("request");
constfs=require("fs");
constmkdirp=require("mkdirp");
constpath=require("path");
constutilities=require("./utilities");
functionspider(url,callback){
constfilename=utilities.urlToFilename(url);
console.log(`filename:${filename}`);
fs.exists(filename,exists=>{
if(!exists){
console.log(`Downloading${url}`);
request(url,(err,response,body)=>{
if(err){
callback(err);
}else{
mkdirp(path.dirname(filename),err=>{
if(err){
callback(err);
}else{
fs.writeFile(filename,body,err=>{
if(err){
callback(err);
}else{
callback(null,filename,true);
}
});
}
});
}
});
}else{
callback(null,filename,false);
}
});
}
spider(process.argv[2],(err,filename,downloaded)=>{
if(err){
console.log(err);
}elseif(downloaded){
console.log(`Completedthedownloadof${filename}`);
}else{
console.log(`${filename}wasalreadydownloaded`);
}
});
上边的代码的流程大概是这样的:
- 把url转换成filename
- 判断该文件名是否存在,若存在直接返回,否则进入下一步
- 发请求,获取body
- 把body写入到文件中
这是一个非常简单版本的蜘蛛,他只能抓取一个url的内容,看到上边的回调多么令人头疼。那么我们开始进行优化。
首先,ifelse这种方式可以进行优化,这个很简单,不用多说,放一个对比效果:
///before
if(err){
callback(err);
}else{
callback(null,filename,true);
}
///after
if(err){
returncallback(err);
}
callback(null,filename,true);
代码这么写,嵌套就会少一层,但经验丰富的程序员会认为,这样写过重强调了error,我们编程的重点应该放在处理正确的数据上,在可读性上也存在这样的要求。
另一个优化是函数拆分,上边代码中的spider函数中,可以把下载文件和保存文件拆分出去。
(spider_v2.js)
constrequest=require("request");
constfs=require("fs");
constmkdirp=require("mkdirp");
constpath=require("path");
constutilities=require("./utilities");
functionsaveFile(filename,contents,callback){
mkdirp(path.dirname(filename),err=>{
if(err){
returncallback(err);
}
fs.writeFile(filename,contents,callback);
});
}
functiondownload(url,filename,callback){
console.log(`Downloading${url}`);
request(url,(err,response,body)=>{
if(err){
returncallback(err);
}
saveFile(filename,body,err=>{
if(err){
returncallback(err);
}
console.log(`Downloadedandsaved:${url}`);
callback(null,body);
});
})
}
functionspider(url,callback){
constfilename=utilities.urlToFilename(url);
console.log(`filename:${filename}`);
fs.exists(filename,exists=>{
if(exists){
returncallback(null,filename,false);
}
download(url,filename,err=>{
if(err){
returncallback(err);
}
callback(null,filename,true);
})
});
}
spider(process.argv[2],(err,filename,downloaded)=>{
if(err){
console.log(err);
}elseif(downloaded){
console.log(`Completedthedownloadof${filename}`);
}else{
console.log(`${filename}wasalreadydownloaded`);
}
});
上边的代码基本上是采用原生优化后的结果,但这个蜘蛛的功能太过简单,我们现在需要抓取某个网页中的所有url,这样才会引申出串行和并行的问题。
(spider_v3.js)
constrequest=require("request");
constfs=require("fs");
constmkdirp=require("mkdirp");
constpath=require("path");
constutilities=require("./utilities");
functionsaveFile(filename,contents,callback){
mkdirp(path.dirname(filename),err=>{
if(err){
returncallback(err);
}
fs.writeFile(filename,contents,callback);
});
}
functiondownload(url,filename,callback){
console.log(`Downloading${url}`);
request(url,(err,response,body)=>{
if(err){
returncallback(err);
}
saveFile(filename,body,err=>{
if(err){
returncallback(err);
}
console.log(`Downloadedandsaved:${url}`);
callback(null,body);
});
})
}
///最大的启发是实现了如何异步循环遍历数组
functionspiderLinks(currentUrl,body,nesting,callback){
if(nesting===0){
returnprocess.nextTick(callback);
}
constlinks=utilities.getPageLinks(currentUrl,body);
functioniterate(index){
if(index===links.length){
returncallback();
}
spider(links[index],nesting-1,err=>{
if(err){
returncallback(err);
}
iterate((index+1));
})
}
iterate(0);
}
functionspider(url,nesting,callback){
constfilename=utilities.urlToFilename(url);
fs.readFile(filename,"utf8",(err,body)=>{
if(err){
if(err.code!=='ENOENT'){
returncallback(err);
}
returndownload(url,filename,(err,body)=>{
if(err){
returncallback(err);
}
spiderLinks(url,body,nesting,callback);
});
}
spiderLinks(url,body,nesting,callback);
});
}
spider(process.argv[2],2,(err,filename,downloaded)=>{
if(err){
console.log(err);
}elseif(downloaded){
console.log(`Completedthedownloadof${filename}`);
}else{
console.log(`${filename}wasalreadydownloaded`);
}
});
上边的代码相比之前的代码多了两个核心功能,首先是通过辅助类获取到了某个body中的links:
constlinks=utilities.getPageLinks(currentUrl,body);
内部实现就不解释了,另一个核心代码就是:
///最大的启发是实现了如何异步循环遍历数组
functionspiderLinks(currentUrl,body,nesting,callback){
if(nesting===0){
returnprocess.nextTick(callback);
}
constlinks=utilities.getPageLinks(currentUrl,body);
functioniterate(index){
if(index===links.length){
returncallback();
}
spider(links[index],nesting-1,err=>{
if(err){
returncallback(err);
}
iterate((index+1));
})
}
iterate(0);
}
可以说上边这一小段代码,就是采用原生实现异步串行的pattern了。除了这些之外,还引入了nesting的概念,通过这是这个属性,可以控制抓取层次。
到这里我们就完整的实现了串行的功能,考虑到性能,我们要开发并行抓取的功能。
(spider_v4.js)
constrequest=require("request");
constfs=require("fs");
constmkdirp=require("mkdirp");
constpath=require("path");
constutilities=require("./utilities");
functionsaveFile(filename,contents,callback){
mkdirp(path.dirname(filename),err=>{
if(err){
returncallback(err);
}
fs.writeFile(filename,contents,callback);
});
}
functiondownload(url,filename,callback){
console.log(`Downloading${url}`);
request(url,(err,response,body)=>{
if(err){
returncallback(err);
}
saveFile(filename,body,err=>{
if(err){
returncallback(err);
}
console.log(`Downloadedandsaved:${url}`);
callback(null,body);
});
})
}
///最大的启发是实现了如何异步循环遍历数组
functionspiderLinks(currentUrl,body,nesting,callback){
if(nesting===0){
returnprocess.nextTick(callback);
}
constlinks=utilities.getPageLinks(currentUrl,body);
if(links.length===0){
returnprocess.nextTick(callback);
}
letcompleted=0,hasErrors=false;
functiondone(err){
if(err){
hasErrors=true;
returncallback(err);
}
if(++completed===links.length&&!hasErrors){
returncallback();
}
}
links.forEach(link=>{
spider(link,nesting-1,done);
});
}
constspidering=newMap();
functionspider(url,nesting,callback){
if(spidering.has(url)){
returnprocess.nextTick(callback);
}
spidering.set(url,true);
constfilename=utilities.urlToFilename(url);
///Inthispattern,therewillbesomeissues.
///Possibleproblemstodownloadthesameurlagainandagain。
fs.readFile(filename,"utf8",(err,body)=>{
if(err){
if(err.code!=='ENOENT'){
returncallback(err);
}
returndownload(url,filename,(err,body)=>{
if(err){
returncallback(err);
}
spiderLinks(url,body,nesting,callback);
});
}
spiderLinks(url,body,nesting,callback);
});
}
spider(process.argv[2],2,(err,filename,downloaded)=>{
if(err){
console.log(err);
}elseif(downloaded){
console.log(`Completedthedownloadof${filename}`);
}else{
console.log(`${filename}wasalreadydownloaded`);
}
});
这段代码同样很简单,也有两个核心内容。一个是如何实现并发:
///最大的启发是实现了如何异步循环遍历数组
functionspiderLinks(currentUrl,body,nesting,callback){
if(nesting===0){
returnprocess.nextTick(callback);
}
constlinks=utilities.getPageLinks(currentUrl,body);
if(links.length===0){
returnprocess.nextTick(callback);
}
letcompleted=0,hasErrors=false;
functiondone(err){
if(err){
hasErrors=true;
returncallback(err);
}
if(++completed===links.length&&!hasErrors){
returncallback();
}
}
links.forEach(link=>{
spider(link,nesting-1,done);
});
}
上边的代码可以说是实现并发的一个pattern。利用循环遍历来实现。另一个核心是,既然是并发的,那么利用fs.exists就会存在问题,可能会重复下载同一文件,这里的解决方案是:
- 使用Map缓存某一url,url应该作为key
现在我们又有了新的需求,要求限制同时并发的最大数,那么在这里就引进了一个我认为最重要的概念:队列。
(task-Queue.js)
classTaskQueue{
constructor(concurrency){
this.concurrency=concurrency;
this.running=0;
this.queue=[];
}
pushTask(task){
this.queue.push(task);
this.next();
}
next(){
while(this.running{
this.running--;
this.next();
});
this.running++;
}
}
}
module.exports=TaskQueue;
上边的代码就是队列的实现代码,核心是next()方法,可以看出,当task加入队列中后,会立刻执行,这不是说这个任务一定马上执行,而是指的是next会立刻调用。
(spider_v5.js)
constrequest=require("request");
constfs=require("fs");
constmkdirp=require("mkdirp");
constpath=require("path");
constutilities=require("./utilities");
constTaskQueue=require("./task-Queue");
constdownloadQueue=newTaskQueue(2);
functionsaveFile(filename,contents,callback){
mkdirp(path.dirname(filename),err=>{
if(err){
returncallback(err);
}
fs.writeFile(filename,contents,callback);
});
}
functiondownload(url,filename,callback){
console.log(`Downloading${url}`);
request(url,(err,response,body)=>{
if(err){
returncallback(err);
}
saveFile(filename,body,err=>{
if(err){
returncallback(err);
}
console.log(`Downloadedandsaved:${url}`);
callback(null,body);
});
})
}
///最大的启发是实现了如何异步循环遍历数组
functionspiderLinks(currentUrl,body,nesting,callback){
if(nesting===0){
returnprocess.nextTick(callback);
}
constlinks=utilities.getPageLinks(currentUrl,body);
if(links.length===0){
returnprocess.nextTick(callback);
}
letcompleted=0,hasErrors=false;
links.forEach(link=>{
///给队列出传递一个任务,这个任务首先是一个函数,其次该函数接受一个参数
///当调用任务时,触发该函数,然后给函数传递一个参数,告诉该函数在任务结束时干什么
downloadQueue.pushTask(done=>{
spider(link,nesting-1,err=>{
///这里表示,只要发生错误,队列就会退出
if(err){
hasErrors=true;
returncallback(err);
}
if(++completed===links.length&&!hasErrors){
callback();
}
done();
});
});
});
}
constspidering=newMap();
functionspider(url,nesting,callback){
if(spidering.has(url)){
returnprocess.nextTick(callback);
}
spidering.set(url,true);
constfilename=utilities.urlToFilename(url);
///Inthispattern,therewillbesomeissues.
///Possibleproblemstodownloadthesameurlagainandagain。
fs.readFile(filename,"utf8",(err,body)=>{
if(err){
if(err.code!=='ENOENT'){
returncallback(err);
}
returndownload(url,filename,(err,body)=>{
if(err){
returncallback(err);
}
spiderLinks(url,body,nesting,callback);
});
}
spiderLinks(url,body,nesting,callback);
});
}
spider(process.argv[2],2,(err,filename,downloaded)=>{
if(err){
console.log(`error:${err}`);
}elseif(downloaded){
console.log(`Completedthedownloadof${filename}`);
}else{
console.log(`${filename}wasalreadydownloaded`);
}
});
因此,为了限制并发的个数,只需在spiderLinks方法中,把task遍历放入队列就可以了。这相对来说很简单。
到这里为止,我们使用原生JavaScript实现了一个有相对完整功能的网络蜘蛛,既能串行,也能并发,还可以控制并发个数。
2.使用async库
把不同的功能放到不同的函数中,会给我们带来巨大的好处,async库十分流行,它的性能也不错,它内部基于callback。
(spider_v6.js)
constrequest=require("request");
constfs=require("fs");
constmkdirp=require("mkdirp");
constpath=require("path");
constutilities=require("./utilities");
constseries=require("async/series");
consteachSeries=require("async/eachSeries");
functiondownload(url,filename,callback){
console.log(`Downloading${url}`);
letbody;
series([
callback=>{
request(url,(err,response,resBody)=>{
if(err){
returncallback(err);
}
body=resBody;
callback();
});
},
mkdirp.bind(null,path.dirname(filename)),
callback=>{
fs.writeFile(filename,body,callback);
}
],err=>{
if(err){
returncallback(err);
}
console.log(`Downloadedandsaved:${url}`);
callback(null,body);
});
}
///最大的启发是实现了如何异步循环遍历数组
functionspiderLinks(currentUrl,body,nesting,callback){
if(nesting===0){
returnprocess.nextTick(callback);
}
constlinks=utilities.getPageLinks(currentUrl,body);
if(links.length===0){
returnprocess.nextTick(callback);
}
eachSeries(links,(link,cb)=>{
"usestrict";
spider(link,nesting-1,cb);
},callback);
}
constspidering=newMap();
functionspider(url,nesting,callback){
if(spidering.has(url)){
returnprocess.nextTick(callback);
}
spidering.set(url,true);
constfilename=utilities.urlToFilename(url);
fs.readFile(filename,"utf8",(err,body)=>{
if(err){
if(err.code!=='ENOENT'){
returncallback(err);
}
returndownload(url,filename,(err,body)=>{
if(err){
returncallback(err);
}
spiderLinks(url,body,nesting,callback);
});
}
spiderLinks(url,body,nesting,callback);
});
}
spider(process.argv[2],1,(err,filename,downloaded)=>{
if(err){
console.log(err);
}elseif(downloaded){
console.log(`Completedthedownloadof${filename}`);
}else{
console.log(`${filename}wasalreadydownloaded`);
}
});
在上边的代码中,我们只使用了async的三个功能:
constseries=require("async/series");//串行
consteachSeries=require("async/eachSeries");//并行
constqueue=require("async/queue");//队列
由于比较简单,就不做解释了。async中的队列的代码在(spider_v7.js)中,和上边我们自定义的队列很相似,也不做更多解释了。
3.Promise
Promise是一个协议,有很多库实现了这个协议,我们用的是ES6的实现。简单来说promise就是一个约定,如果完成了,就调用它的resolve方法,失败了就调用它的reject方法。它内有实现了then方法,then返回promise本身,这样就形成了调用链。
其实Promise的内容有很多,在实际应用中是如何把普通的函数promise化。这方面的内容在这里也不讲了,我自己也不够格
(spider_v8.js)
constutilities=require("./utilities");
constrequest=utilities.promisify(require("request"));
constfs=require("fs");
constreadFile=utilities.promisify(fs.readFile);
constwriteFile=utilities.promisify(fs.writeFile);
constmkdirp=utilities.promisify(require("mkdirp"));
constpath=require("path");
functionsaveFile(filename,contents,callback){
mkdirp(path.dirname(filename),err=>{
if(err){
returncallback(err);
}
fs.writeFile(filename,contents,callback);
});
}
functiondownload(url,filename){
console.log(`Downloading${url}`);
letbody;
returnrequest(url)
.then(response=>{
"usestrict";
body=response.body;
returnmkdirp(path.dirname(filename));
})
.then(()=>writeFile(filename,body))
.then(()=>{
"usestrict";
console.log(`Downloadedadnsaved:${url}`);
returnbody;
});
}
///promise编程的本质就是为了解决在函数中设置回调函数的问题
///通过中间层promise来实现异步函数同步化
functionspiderLinks(currentUrl,body,nesting){
letpromise=Promise.resolve();
if(nesting===0){
returnpromise;
}
constlinks=utilities.getPageLinks(currentUrl,body);
links.forEach(link=>{
"usestrict";
promise=promise.then(()=>spider(link,nesting-1));
});
returnpromise;
}
functionspider(url,nesting){
constfilename=utilities.urlToFilename(url);
returnreadFile(filename,"utf8")
.then(
body=>spiderLinks(url,body,nesting),
err=>{
"usestrict";
if(err.code!=='ENOENT'){
///抛出错误,这个方便与在整个异步链的最后通过呢catch来捕获这个链中的错误
throwerr;
}
returndownload(url,filename)
.then(body=>spiderLinks(url,body,nesting));
}
);
}
spider(process.argv[2],1)
.then(()=>{
"usestrict";
console.log('Downloadcomplete');
})
.catch(err=>{
"usestrict";
console.log(err);
});
可以看到上边的代码中的函数都是没有callback的,只需要在最后catch就可以了。
在设计api的时候,应该支持两种方式,及支持callback,又支持promise
functionasyncDivision(dividend,divisor,cb){
returnnewPromise((resolve,reject)=>{
"usestrict";
process.nextTick(()=>{
constresult=dividend/divisor;
if(isNaN(result)||!Number.isFinite(result)){
consterror=newError("Invalidoperands");
if(cb){
cb(error);
}
returnreject(error);
}
if(cb){
cb(null,result);
}
resolve(result);
});
});
}
asyncDivision(10,2,(err,result)=>{
"usestrict";
if(err){
returnconsole.log(err);
}
console.log(result);
});
asyncDivision(22,11)
.then((result)=>console.log(result))
.catch((err)=>console.log(err));
4.Generator
Generator很有意思,他可以让暂停函数和恢复函数,利用thunkify和co这两个库,我们下边的代码实现起来非常酷。
(spider_v9.js)
constthunkify=require("thunkify");
constco=require("co");
constpath=require("path");
constutilities=require("./utilities");
constrequest=thunkify(require("request"));
constfs=require("fs");
constmkdirp=thunkify(require("mkdirp"));
constreadFile=thunkify(fs.readFile);
constwriteFile=thunkify(fs.writeFile);
constnextTick=thunkify(process.nextTick);
function*download(url,filename){
console.log(`Downloading${url}`);
constresponse=yieldrequest(url);
console.log(response);
constbody=response[1];
yieldmkdirp(path.dirname(filename));
yieldwriteFile(filename,body);
console.log(`Downloadedandsaved${url}`);
returnbody;
}
function*spider(url,nesting){
constfilename=utilities.urlToFilename(url);
letbody;
try{
body=yieldreadFile(filename,"utf8");
}catch(err){
if(err.code!=='ENOENT'){
throwerr;
}
body=yielddownload(url,filename);
}
yieldspiderLinks(url,body,nesting);
}
function*spiderLinks(currentUrl,body,nesting){
if(nesting===0){
returnnextTick();
}
constlinks=utilities.getPageLinks(currentUrl,body);
for(leti=0;i
总结
我并没有写promise和generator并发的代码。以上这些内容来自于这本书nodejs-design-patterns。
demo下载
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。