Nodejs Stream 数据流使用手册
1、介绍
本文介绍了使用node.jsstreams开发程序的基本方法。
<codeclass="hljsmizar">"Weshouldhavesomewaysofconnectingprogramslikegardenhose--screwin anothersegmentwhenitbecomesnecessarytomassagedatain anotherway.ThisisthewayofIOalso." DougMcIlroy.October11,1964</code>
最早接触Stream是从早期的unix开始的数十年的实践证明Stream思想可以很简单的开发出一些庞大的系统。在unix里,Stream是通过|实现的;在node中,作为内置的stream模块,很多核心模块和三方模块都使用到。和unix一样,nodeStream主要的操作也是.pipe(),使用者可以使用反压力机制来控制读和写的平衡。
Stream可以为开发者提供可以重复使用统一的接口,通过抽象的Stream接口来控制Stream之间的读写平衡。
2、为什么使用Stream
node中的I/O是异步的,因此对磁盘和网络的读写需要通过回调函数来读取数据,下面是一个文件下载服务器的简单代码:
<codeclass="hljsjavascript">varhttp=require('http'); varfs=require('fs'); varserver=http.createServer(function(req,res){ fs.readFile(__dirname+'/data.txt',function(err,data){ res.end(data); }); }); server.listen(8000);</code>
这些代码可以实现需要的功能,但是服务在发送文件数据之前需要缓存整个文件数据到内存,如果"data.txt"文件很大且并发量很大的话,会浪费很多内存。因为用户需要等到整个文件缓存到内存才能接受的文件数据,这样导致用户体验相当不好。不过还好(req,res)两个参数都是Stream,这样我们可以用fs.createReadStream()代替fs.readFile():
<codeclass="hljsjavascript">varhttp=require('http'); varfs=require('fs'); varserver=http.createServer(function(req,res){ varstream=fs.createReadStream(__dirname+'/data.txt'); stream.pipe(res); }); server.listen(8000);</code>
.pipe()方法监听fs.createReadStream()的'data'和'end'事件,这样"data.txt"文件就不需要缓存整个文件,当客户端连接完成之后马上可以发送一个数据块到客户端。使用.pipe()另一个好处是可以解决当客户端延迟非常大时导致的读写不平衡问题。如果想压缩文件再发送,可以使用三方模块实现:
<codeclass="hljsjavascript">varhttp=require('http'); varfs=require('fs'); varoppressor=require('oppressor'); varserver=http.createServer(function(req,res){ varstream=fs.createReadStream(__dirname+'/data.txt'); stream.pipe(oppressor(req)).pipe(res); }); server.listen(8000);</code>
这样文件就会对支持gzip和deflate的浏览器进行压缩。oppressor模块会处理所有的content-encoding。
Stream使开发程序变得简单。
3、基础概念
有五种基本的Stream:readable,writable,transform,duplex,and”classic”.
3-1、pipe
所有类型的Stream收是使用.pipe()来创建一个输入输出对,接收一个可读流src并将其数据输出到可写流dst,如下:
<codeclass="hljsperl">src.pipe(dst)</code>
.pipe(dst)方法为返回dst流,这样就可以接连使用多个.pipe(),如下:
<codeclass="hljsperl">a.pipe(b).pipe(c).pipe(d)</code>
功能与下面的代码相同:
<codeclass="hljsperl">a.pipe(b); b.pipe(c); c.pipe(d);</code>
3-2、readablestreams
通过调用Readablestreams的.pipe()方法可以把Readablestreams的数据写入一个Writable,Transform,或者Duplexstream。
<codeclass="hljsperl">readableStream.pipe(dst)</code>
1>创建readablestream
这里我们创建一个readablestream!
<codeclass="hljsperl">varReadable=require('stream').Readable; varrs=newReadable; rs.push('beep'); rs.push('boop\n'); rs.push(null); rs.pipe(process.stdout); $noderead0.js beepboop </code>
rs.push(null)通知数据接收者数据已经发送完毕.
注意到我们在将所有数据内容压入可读流之前并没有调用rs.pipe(process.stdout);,但是我们压入的所有数据内容还是完全的输出了,这是因为可读流在接收者没有读取数据之前,会缓存所有压入的数据。但是在很多情况下,更好的方法是只有数据接收着请求数据的时候,才压入数据到可读流而不是缓存整个数据。下面我们重写一下._read()函数:
<codeclass="hljsjavascript">varReadable=require('stream').Readable; varrs=Readable(); varc=97; rs._read=function(){ rs.push(String.fromCharCode(c++)); if(c>'z'.charCodeAt(0))rs.push(null); }; rs.pipe(process.stdout);</code> <codeclass="hljsbash">$noderead1.js abcdefghijklmnopqrstuvwxyz</code>
上面的代码通过重写_read()方法实现了只有在数据接受者请求数据才向可读流中压入数据。_read()方法也可以接收一个size参数表示数据请求着请求的数据大小,但是可读流可以根据需要忽略这个参数。
注意我们也可以用util.inherits()继承可读流。为了说明只有在数据接受者请求数据时_read()方法才被调用,我们在向可读流压入数据时做一个延时,如下:
<codeclass="hljsjavascript">varReadable=require('stream').Readable; varrs=Readable(); varc=97-1; rs._read=function(){ if(c>='z'.charCodeAt(0))returnrs.push(null); setTimeout(function(){ rs.push(String.fromCharCode(++c)); },100); }; rs.pipe(process.stdout); process.on('exit',function(){ console.error('\n_read()called'+(c-97)+'times'); }); process.stdout.on('error',process.exit);</code>
用下面的命令运行程序我们发现_read()方法只调用了5次:
<codeclass="hljsbash">$noderead2.js|head-c5 abcde _read()called5times</code>
使用计时器的原因是系统需要时间来发送信号来通知程序关闭管道。使用process.stdout.on('error',fn)是为了处理系统因为header命令关闭管道而发送SIGPIPE信号,因为这样会导致process.stdout触发EPIPE事件。如果想创建一个的可以压入任意形式数据的可读流,只要在创建流的时候设置参数objectMode为true即可,例如:Readable({objectMode:true})。
2>读取readablestream数据
大部分情况下我们只要简单的使用pipe方法将可读流的数据重定向到另外形式的流,但是在某些情况下也许直接从可读流中读取数据更有用。如下:
<codeclass="hljsphp">process.stdin.on('readable',function(){ varbuf=process.stdin.read(); console.dir(buf); }); $(echoabc;sleep1;echodef;sleep1;echoghi)|nodeconsume0.js <buffer0a=""61=""62=""63=""> <buffer0a=""64=""65=""66=""> <buffer0a=""67=""68=""69=""> null</buffer></buffer></buffer></code>
当可读流中有数据可读取时,流会触发'readable'事件,这样就可以调用.read()方法来读取相关数据,当可读流中没有数据可读取时,.read()会返回null,这样就可以结束.read()的调用,等待下一次'readable'事件的触发。下面是一个使用.read(n)从标准输入每次读取3个字节的例子:
<codeclass="hljsjavascript">process.stdin.on('readable',function(){ varbuf=process.stdin.read(3); console.dir(buf); });</code>
如下运行程序发现,输出结果并不完全!
<codeclass="hljsbash">$(echoabc;sleep1;echodef;sleep1;echoghi)|nodeconsume1.js <buffer61=""62=""63=""> <buffer0a=""64=""65=""> <buffer0a=""66=""67=""></buffer></buffer></buffer></code>
这是应为额外的数据数据留在流的内部缓冲区里了,而我们需要通知流我们要读取更多的数据.read(0)可以达到这个目的。
<codeclass="hljsjavascript">process.stdin.on('readable',function(){ varbuf=process.stdin.read(3); console.dir(buf); process.stdin.read(0); });</code>
这次运行结果如下:
<codeclass="hljsxml">$(echoabc;sleep1;echodef;sleep1;echoghi)|nodeconsume2.js <buffer0a=""64=""65=""> <buffer0a=""68=""69=""></buffer></buffer></code>
我们可以使用.unshift()将数据重新押回流数据队列的头部,这样可以接续读取押回的数据。如下面的代码,会按行输出标准输入的内容:
<codeclass="hljsjavascript">varoffset=0; process.stdin.on('readable',function(){ varbuf=process.stdin.read(); if(!buf)return; for(;offset<buf.length;offset++){ if(buf[offset]===0x0a){ console.dir(buf.slice(0,offset).toString()); buf=buf.slice(offset+1); offset=0; process.stdin.unshift(buf); return; } } process.stdin.unshift(buf); }); $tail-n+50000/usr/share/dict/american-english|head-n10|nodelines.js 'hearties' 'heartiest' 'heartily' 'heartiness' 'heartiness\'s' 'heartland' 'heartland\'s' 'heartlands' 'heartless' 'heartlessly'</code>
当然,有很多模块可以实现这个功能,如:split。
3-3、writablestreams
writablestreams只可以作为.pipe()函数的目的参数。如下代码:
<codeclass="hljsperl">src.pipe(writableStream);</code>
1>创建writablestream
重写._write(chunk,enc,next)方法就可以接受一个readablestream的数据。
<codeclass="hljsphp">varWritable=require('stream').Writable; varws=Writable(); ws._write=function(chunk,enc,next){ console.dir(chunk); next(); }; process.stdin.pipe(ws); $(echobeep;sleep1;echoboop)|nodewrite0.js <buffer0a=""62=""65=""70=""> <buffer0a=""62=""6f=""70=""></buffer></buffer></code>
第一个参数chunk是数据输入者写入的数据。第二个参数end是数据的编码格式。第三个参数next(err)通过回调函数通知数据写入者可以写入更多的时间。如果readablestream写入的是字符串,那么字符串会默认转换为Buffer,如果在创建流的时候设置Writable({decodeStrings:false})参数,那么不会做转换。如果readablestream写入的数据时对象,那么需要这样创建writablestream
<codeclass="hljscss">Writable({objectMode:true})</code>
2>写数据到writablestream
调用writablestream的.write(data)方法即可完成数据写入。
<codeclass="hljsvala">process.stdout.write('beepboop\n');</code>
调用.end()方法通知writablestream数据已经写入完成。
<codeclass="hljsjavascript">varfs=require('fs'); varws=fs.createWriteStream('message.txt'); ws.write('beep'); setTimeout(function(){ ws.end('boop\n'); },1000); $nodewriting1.js $catmessage.txt beepboop</code>
如果需要设置writablestream的缓冲区的大小,那么在创建流的时候,需要设置opts.highWaterMark,这样如果缓冲区里的数据超过opts.highWaterMark,.write(data)方法会返回false。当缓冲区可写的时候,writablestream会触发'drain'事件。
3-4、classicstreams
Classicstreams比较老的接口了,最早出现在node0.4版本中,但是了解一下其运行原理还是十分有好
处的。当一个流被注册了"data"事件的回到函数,那么流就会工作在老版本模式下,即会使用老的API。
1>classicreadablestreams
Classicreadablestreams事件就是一个事件触发器,如果Classicreadablestreams有数据可读取,那么其触发"data"事件,等到数据读取完毕时,会触发"end"事件。.pipe()方法通过检查stream.readable的值确定流是否有数据可读。下面是一个使用Classicreadablestreams打印A-J字母的例子:
<codeclass="hljsjavascript">varStream=require('stream'); varstream=newStream; stream.readable=true; varc=64; variv=setInterval(function(){ if(++c>=75){ clearInterval(iv); stream.emit('end'); } elsestream.emit('data',String.fromCharCode(c)); },100); stream.pipe(process.stdout); $nodeclassic0.js ABCDEFGHIJ</code>
如果要从classicreadablestream中读取数据,注册"data"和"end"两个事件的回调函数即可,代码如下:
<codeclass="hljsphp">process.stdin.on('data',function(buf){ console.log(buf); }); process.stdin.on('end',function(){ console.log('__END__'); }); $(echobeep;sleep1;echoboop)|nodeclassic1.js <buffer0a=""62=""65=""70=""> <buffer0a=""62=""6f=""70=""> __END__</buffer></buffer></code>
需要注意的是如果你使用这种方式读取数据,那么会失去使用新接口带来的好处。比如你在往一个延迟非常大的流写数据时,需要注意读取数据和写数据的平衡问题,否则会导致大量数据缓存在内存中,导致浪费大量内存。一般这时候强烈建议使用流的.pipe()方法,这样就不用自己监听”data”和”end”事件了,也不用担心读写不平衡的问题了。当然你也可以用through代替自己监听”data”和”end”事件,如下面的代码:
<codeclass="hljsphp">varthrough=require('through'); process.stdin.pipe(through(write,end)); functionwrite(buf){ console.log(buf); } functionend(){ console.log('__END__'); } $(echobeep;sleep1;echoboop)|nodethrough.js <buffer0a=""62=""65=""70=""> <buffer0a=""62=""6f=""70=""> __END__</buffer></buffer></code>
或者也可以使用concat-stream来缓存整个流的内容:
<codeclass="hljsoxygene">varconcat=require('concat-stream'); process.stdin.pipe(concat(function(body){ console.log(JSON.parse(body)); })); $echo'{"beep":"boop"}'|nodeconcat.js {beep:'boop'}</code>
当然如果你非要自己监听"data"和"end"事件,那么你可以在写数据的流不可写的时候使用.pause()方法暂停Classicreadablestreams继续触发”data”事件。等到写数据的流可写的时候再使用.resume()方法通知流继续触发"data"事件继续读取
数据。
2>classicwritablestreams
Classicwritablestreams非常简单。只有.write(buf),.end(buf)和.destroy()三个方法。.end(buf)方法的buf参数是可选的,如果选择该参数,相当于stream.write(buf);stream.end()这样的操作,需要注意的是当流的缓冲区写满即流不可写时.write(buf)方法会返回false,如果流再次可写时,流会触发drain事件。
4、transform
transform是一个对读入数据过滤然输出的流。
5、duplex
duplexstream是一个可读也可写的双向流,如下面的a就是一个duplexstream:
<codeclass="hljslivecodeserver">a.pipe(b).pipe(a)</code>
以上内容是小编给大家介绍的NodejsStream数据流使用手册,希望对大家有所帮助!