Node.js中你不可不精的Stream(流)
一、什么是Stream(流)
流(stream)在Node.js中是处理流数据的抽象接口(abstractinterface)。stream模块提供了基础的API。使用这些API可以很容易地来构建实现流接口的对象。例如,HTTP请求和process.stdout就都是流的实例。
流可以是可读的、可写的,或是可读写的。注意,所有的流都是EventEmitter的实例。
二、流的类型
Node.js中有四种基本的流类型:
- Readable-可读的流(例如fs.createReadStream())。
- Writable-可写的流(例如fs.createWriteStream())。
- Duplex-可读写的流(双工流)(例如net.Socket)。
- Transform-在读写过程中可以修改和变换数据的Duplex流(例如zlib.createDeflate())。
varStream=require('stream')//stream模块引入方式 varReadable=Stream.Readable//可读的流 varWritable=Stream.Writable//可写的流 varDuplex=Stream.Duplex//可读写的流 varTransform=Stream.Transform//在读写过程中可以修改和变换数据的Duplex流
Node.js中关于流的操作被封装到了Stream模块中,这个模块也被多个核心模块所引用。例如在fs.createReadStream()和fs.createWriteStream()的源码实现里,都调用了Stream模块提供的抽象接口来实现对流数据的操作。
三、为什么使用Stream?
我们通过两个例子,了解一下为什么要使用Stream。
Exp1:
下面是一个读取文件内容的例子:
constfs=require('fs') fs.readFile(file,function(err,content){//读出来的content是Buffer console.log(content) console.log(content.toString()) })
但如果文件内容较大,譬如在500M时,执行上述代码的输出为:
buffer.js:382 thrownewError('toStringfailed'); ^ Error:toStringfailed atBuffer.toString(buffer.js:382:11)
报错的原因是content这个Buffer对象的长度过大,导致toString方法失败。
可见,这种一次获取全部内容的做法,不适合操作大文件。
可以考虑使用流来读取文件内容。
varfs=require('fs') fs.createReadStream(bigFile).pipe(process.stdout)
fs.createReadStream创建一个可读流,连接了源头(上游,文件)和消耗方(下游,标准输出)。
执行上面代码时,流会逐次调用fs.read(ReadStream这个类的源码里有一个_read方法,这个_read方法在内部调用了fs.read来实现对文件的读取),将文件中的内容分批取出传给下游。
在文件看来,它的内容被分块地连续取走了。
在下游看来,它收到的是一个先后到达的数据序列。
如果不需要一次操作全部内容,它可以处理完一个数据便丢掉。
在流看来,任一时刻它都只存储了文件中的一部分数据,只是内容在变化而已。
这种情况就像是用水管去取池子中的水。
每当用掉一点水,水管便会从池子中再取出一点。
无论水池有多大,都只存储了与水管容积等量的水。
Exp2:
下面是一个在线看视频的例子,假定我们通过HTTP请求返回视频内容给用户
consthttp=require('http'); constfs=require('fs'); http.createServer((req,res)=>{ fs.readFile(videoPath,(err,data)=>{ res.end(data); }); }).listen(8080);
但这样有两个明显的问题
- 视频文件需要全部读取完,才能返回给用户,这样等待时间会很长。
- 视频文件一次全放入内存中,内存吃不消。
用流可以将视频文件一点一点读到内存中,再一点一点返回给用户,读一部分,写一部分。(利用了HTTP协议的Transfer-Encoding:chunked分段传输特性),用户体验得到优化,同时对内存的开销明显下降。
consthttp=require('http'); constfs=require('fs'); http.createServer((req,res)=>{ fs.createReadStream(videoPath).pipe(res); }).listen(8080);
通过上述两个例子,我们知道,在大数据情况下必须使用流式处理。
四、可读流(ReadableStream)
可读流(Readablestreams)是对提供数据的源头(source)的抽象。
常见的可读流:
- HTTPresponses,ontheclient
- HTTPrequests,ontheserver
- fsreadstreams
- TCPsockets//sockets是一个双工流,即可读可写的流
- process.stdin//标准输入
所有的ReadableStream都实现了stream.Readable类定义的接口。
可读流的两种模式(flowing和paused)
- 在flowing模式下,可读流自动从系统底层读取数据,并通过EventEmitter接口的事件尽快将数据提供给应用(所有的流都是EventEmitter的实例)。
- 在paused模式下,必须显式调用stream.read()方法来从流中读取数据片段。
创建流的Readable流,默认是非流动模式(paused模式),默认不会读取数据。所有初始工作模式为paused的Readable流,可以通过下面三种途径切换为flowing模式:
- 监听'data'事件
- 调用stream.resume()方法
- 调用stream.pipe()方法将数据发送到Writable
fs.createReadStream(path[,options])源码实现
//文件名ReadStream.js letfs=require('fs');//读取文件 letEventEmitter=require('events'); classReadStreamextendsEventEmitter{//流操作都是基于事件的 constructor(path,options={}){ super(); //需要的参数 this.path=path;//读取文件的路径 this.highWaterMark=options.highWaterMark||64*1024;//缓冲区大小,默认64KB this.autoClose=options.autoClose||true;//是否需要自动关闭文件描述符,默认为true this.start=options.start||0;//options可以包括start和end值,使其可以从文件读取一定范围的字节而不是整个文件 this.pos=this.start;//从文件的那个位置开始读取内容,pos会随着读取的位置而改变 this.end=options.end||null;//null表示没传递 this.encoding=options.encoding||null; this.flags=options.flags||'r';//以何种方式操作文件 //参数的问题 this.flowing=null;//默认为非流动模式 //建一个buffer存放读出来的数据 this.buffer=Buffer.alloc(this.highWaterMark); this.open(); //{newListener:[fn]} //次方法默认同步调用的 this.on('newListener',(type)=>{//等待着它监听data事件 if(type==='data'){//当监听到data事件时,把流设置为流动模式 this.flowing=true; this.read();//开始读取客户已经监听了data事件 } }) } pause(){//将流从flowing模式切换为paused模式 this.flowing=false; } resume(){//将流从paused模式切换为flowing模式 this.flowing=true; this.read();//将流从paused模式切换为flowing模式后,继续读取文件内容 } read(){//默认第一次调用read方法时还没有获取fd,文件的打开是异步的,所以不能直接读 if(typeofthis.fd!=='number'){//如果fd不是number类型,证明文件还没有打开,此时需要监听一次open事件,因为文件一打开,就会触发open事件,这个在this.open()里写了 returnthis.once('open',()=>this.read());//等待着触发open事件后fd肯定拿到了,拿到以后再去执行read方法 } //当获取到fd时开始读取文件了 //第一次应该读2个第二次应该读2个 //第二次pos的值是4end是4 //读取文件里一共4有个数为1234,我们读取里面的1234 lethowMuchToRead=this.end?Math.min(this.end-this.pos+1,this.highWaterMark):this.highWaterMark;//规定每次读取多少个字节 fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(error,byteRead)=>{//byteRead为真实的读到了几个字节的内容 //读取完毕 this.pos+=byteRead;//读出来两个,pos位置就往后移两位 //this.buffer默认就是三个 letb=this.encoding?this.buffer.slice(0,byteRead).toString(this.encoding):this.buffer.slice(0,byteRead);//对读出来的内容进行编码 this.emit('data',b);//触发data事件,将读到的内容输出给用户 if((byteRead===this.highWaterMark)&&this.flowing){ returnthis.read();//继续读 } //这里就是没有更多的逻辑了 if(byteRead{ //如果文件打开过了那就关闭文件并且触发close事件 this.emit('close'); }); } open(){ fs.open(this.path,this.flags,(err,fd)=>{//fd是文件描述符,它标识的就是当前this.path这个文件,从3开始(number类型) if(err){ if(this.autoClose){//如果需要自动关闭我再去销毁fd this.destroy();//销毁(关闭文件,触发关闭事件) } this.emit('error',err);//如果有错误触发error事件 return; } this.fd=fd;//保存文件描述符 this.emit('open',this.fd);//文件被打开了,触发文件被打开的方法 }); } pipe(dest){//管道流的实现pipe()方法是ReadStream下的方法,它里面的参数是WritableStream this.on('data',(data)=>{ letflag=dest.write(data); if(!flag){//这个flag就是每次调用ws.write()后返回的读状态值 this.pause();//已经不能继续写了,等他写完了再恢复 } }); dest.on('drain',()=>{//当读取缓存区清空后 console.log('写一下停一下') this.resume();//继续往dest写入数据 }); } } module.exports=ReadStream;//导出可读流
使用fs.createReadStream()
//流:有序的有方向的,可以自己控制速率 //读:读是将内容读取到内存中 //写:写是将内存或者文件的内容写入到文件内 //读取的时候默认读默认一次读取64k,encoding读取出来的内容默认都是buffer //letfs=require('fs'); //letrs=fs.createReadStream({...});//原生实现可读流 letReadStream=require('./ReadStream'); letrs=newReadStream('./2.txt',{ highWaterMark:3,//字节 flags:'r',//读文件 autoClose:true,//默认读取完毕后自动关闭文件描述符 start:0, //end:3,//流是闭合区间包start也包end encoding:'utf8' }); //默认创建一个流是非流动模式(上述源码中有写的),默认不会读取数据 //如果我们需要接收数据,那我们要监听data事件,这样数据会自动的流出来 rs.on('error',function(err){//通常,这会在底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。 console.log(err) }); rs.on('open',function(){//文件被打开了,获取到了fd。内部会自动的触发这个事件rs.emit('data'); console.log('文件打开了'); }); rs.on('data',function(data){//有数据流出来了 console.log(data); rs.pause();//暂停触发on('data')事件,将流动模式又转化成了非流动模式 }); setTimeout(()=>{rs.resume()},3000);//三秒钟之后再将非流动模式转化为流动模式 rs.on('end',function(){//读取完毕 console.log('读取完毕了'); }); rs.on('close',function(){//close事件将在流或其底层资源(比如一个文件)关闭后触发。close事件触发后,该流将不会再触发任何事件。 //console.log('关闭') });
四、可写流(WritableStream)
可写流是对数据流向设备的抽象,用来消费上游流过来的数据,通过可写流程序可以把数据写入设备,常见的是本地磁盘文件或者TCP、HTTP等网络响应。
常见的可写流:
- HTTPrequests,ontheclient
- HTTPresponses,ontheserver
- fswritestreams
- zlibstreams
- cryptostreams
- TCPsockets
- childprocessstdin
- process.stdout,process.stderr
所有Writable流都实现了stream.Writable类定义的接口。
可写流的使用
调用可写流实例的write()方法就可以把数据写入可写流
constfs=require('fs'); constrs=fs.createReadStream(sourcePath); constws=fs.createWriteStream(destPath); rs.setEncoding('utf-8');//设置编码格式 rs.on('data',chunk=>{ ws.write(chunk);//写入数据 });
监听了可读流的data事件就会使可读流进入流动模式,我们在回调事件里调用了可写流的write()方法,这样数据就被写入了可写流抽象的设备destPath中。
write()方法有三个参数
- chunk{String|Buffer},表示要写入的数据
- encoding当写入的数据是字符串的时候可以设置编码
- callback数据被写入之后的回调函数
drain事件
如果调用stream.write(chunk)方法返回false,表示当前缓存区已满,流将在适当的时机(缓存区清空后)触发drain事件。
constfs=require('fs'); constrs=fs.createReadStream(sourcePath); constws=fs.createWriteStream(destPath); rs.setEncoding('utf-8');//设置编码格式 rs.on('data',chunk=>{ letflag=ws.write(chunk);//写入数据 if(!flag){//如果缓存区已满暂停读取 rs.pause(); } }); ws.on('drain',()=>{ rs.resume();//缓存区已清空继续读取写入 });
fs.createWriteStream(path[,options])源码实现
//文件WriteStream.js letfs=require('fs'); letEventEmitter=require('events'); classWriteStreamextendsEventEmitter{ constructor(path,options={}){ super(); this.path=path; this.flags=options.flags||'w'; this.encoding=options.encoding||'utf8'; this.start=options.start||0; this.pos=this.start; this.mode=options.mode||0o666; this.autoClose=options.autoClose||true; this.highWaterMark=options.highWaterMark||16*1024; this.open();//fd异步的//触发一个open事件,当触发open事件后fd肯定就存在了 //写文件的时候需要的参数有哪些 //第一次写入是真的往文件里写 this.writing=false;//默认第一次就不是正在写入 //用简单的数组来模拟一下缓存 this.cache=[]; //维护一个变量,表示缓存的长度 this.len=0; //是否触发drain事件 this.needDrain=false; } clearBuffer(){ letbuffer=this.cache.shift(); if(buffer){//如果缓存里有 this._write(buffer.chunk,buffer.encoding,()=>this.clearBuffer()); }else{//如果缓存里没有了 if(this.needDrain){//需要触发drain事件 this.writing=false;//告诉下次直接写就可以了不需要写到内存中了 this.needDrain=false; this.emit('drain'); } } } _write(chunk,encoding,clearBuffer){//因为write方法是同步调用的此时fd还没有获取到,所以等待获取到再执行write操作 if(typeofthis.fd!='number'){ returnthis.once('open',()=>this._write(chunk,encoding,clearBuffer)); } fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{ this.pos+=byteWritten; this.len-=byteWritten;//每次写入后就要在内存中减少一下 clearBuffer();//第一次就写完了 }) } write(chunk,encoding=this.encoding){//客户调用的是write方法去写入内容 //要判断chunk必须是buffer或者字符串为了统一,如果传递的是字符串也要转成buffer chunk=Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding); this.len+=chunk.length;//维护缓存的长度3 letret=this.lenthis.clearBuffer());//专门实现写的方法 } returnret;//能不能继续写了,false表示下次的写的时候就要占用更多内存了 } destroy(){ if(typeofthis.fd!='number'){ this.emit('close'); }else{ fs.close(this.fd,()=>{ this.emit('close'); }); } } open(){ fs.open(this.path,this.flags,this.mode,(err,fd)=>{ if(err){ this.emit('error',err); if(this.autoClose){ this.destroy();//如果自动关闭就销毁文件描述符 } return; } this.fd=fd; this.emit('open',this.fd); }); } } module.exports=WriteStream;
使用fs.createWriteStream()
//可写流有缓存区的概念 //1.第一次写入是真的向文件里写,第二次在写入的时候是放到了缓存区里 //2.写入时会返回一个boolean类型,返回为false时表示缓存区满了,不要再写入了 //3.当内存和正在写入的内容消耗完后,会触发一个drain事件 //letfs=require('fs'); //letrs=fs.createWriteStream({...});//原生实现可写流 letWS=require('./WriteStream') letws=newWS('./2.txt',{ flags:'w',//写入文件,默认文件不存在会创建 highWaterMark:1,//设置当前缓存区的大小 encoding:'utf8',//文件里存放的都是二进制 start:0, autoClose:true,//自动关闭文件描述符 mode:0o666,//可读可写 }); //drain的触发时机,只有当highWaterMark填满时,才可能触发drain //当嘴里的和地下的都吃完了,就会触发drain方法 leti=9; functionwrite(){ letflag=true; while(flag&&i>=0){ i--; flag=ws.write('111');//987//654//321//0 console.log(flag) } } write(); ws.on('drain',function(){ console.log('dry'); write(); });
总结
stream(流)分为可读流(flowingmode和pausedmode)、可写流、可读写流,Node.js提供了多种流对象。例如,HTTP请求和process.stdout就都是流的实例。stream模块提供了基础的API。使用这些API可以很容易地来构建实现流接口的对象。它们底层都调用了stream模块并进行封装。
好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对毛票票的支持。