node.js中stream流中可读流和可写流的实现与使用方法实例分析
本文实例讲述了node.js中stream流中可读流和可写流的实现与使用方法。分享给大家供大家参考,具体如下:
node.js中的流stream是处理流式数据的抽象接口。node.js提供了很多流对象,像http中的request和response,和process.stdout都是流的实例。
流可以是可读的,可写的,或是可读可写的。所有流都是events的实例。
一、流的类型
node.js中有四种基本流类型:
1、Writable可写流(例:fs.createWriteStream())
2、Readable可读流(例:fs.createReadStream())
3、Duplex可读又可写流(例:net.Socket)
4、Transform读写过程中可修改或转换数据的Duplex流(例:zlib.createDeflate())
二、流中的数据有两种模式
1、二进制模式,都是string字符串 和Buffer。
2、对象模式,流内部处理的是一系统普通对象。
三、可读流的两种模式
1、流动模式(flowing),数据自动从系统底层读取,并通过事件,尽可能快地提供给应用程序。
2、暂停模式(paused),必须显式的调用read()读取数据。
可读流都开始于暂停模式,可以通过如下方法切换到流动模式:
1、添加'data'事件回调。
2、调用resume()。
3、调用pipe()。
可读流通过如下方法切换回暂停模式:
1、如果没有管道目标,调用pause()。
2、如果有管道目标,移除所有管道目标,调用unpipe()移除多个管道目标。
四、创建可读流,并监听事件
constfs=require('fs'); //创建一个文件可读流 letrs=fs.createReadStream('./1.txt',{ //文件系统标志 flags:'r', //数据编码,如果调置了该参数,则读取的数据会自动解析 //如果没调置,则读取的数据会是Buffer //也可以通过rs.setEncoding()进行设置 encoding:'utf8', //文件描述符,默认为null fd:null, //文件权限 mode:0o666, //文件读取的开始位置 start:0, //文件读取的结束位置(包括结束位置) end:Infinity, //读取缓冲区的大小,默认64K highWaterMark:3 }); //文件被打开时触发 rs.on('open',function(){ console.log('文件打开'); }); //监听data事件,会让当前流切换到流动模式 //当流中将数据传给消费者后触发 //由于我们在上面配置了highWaterMark为3字节,所以下面会打印多次。 rs.on('data',function(data){ console.log(data); }); //流中没有数据可供消费者时触发 rs.on('end',function(){ console.log('数据读取完毕'); }); //读取数据出错时触发 rs.on('error',function(){ console.log('读取错误'); }); //当文件被关闭时触发 rs.on('close',function(){ console.log('文件关闭'); });
注意,'open'和'close'事件并不是所有流都会触发。
当们监听'data'事件后,系统会尽可能快的读取出数据。但有时候,我们需要暂停一下流的读取,操作其他事情。
这时候就需要用到pause()和resume()方法。
constfs=require('fs'); //创建一个文件可读流 letrs=fs.createReadStream('./1.txt',{ highWaterMark:3 }); rs.on('data',function(data){ console.log(`读取了${data.length}字节数据:${data.toString()}`); //使流动模式的流停止触发'data'事件,切换出流动模式,数据都会保留在内部缓存中。 rs.pause(); //等待3秒后,再恢复触发'data'事件,将流切换回流动模式。 setTimeout(function(){ rs.resume(); },3000); });
可读流的'readable'事件,当流中有数据可供读取时就触发。
注意当监听'readable'事件后,会导致流停止流动,需调用read()方法读取数据。
注意on('data'),on('readable'),pipe()不要混合使用,会导致不明确的行为。
constfs=require('fs'); letrs=fs.createReadStream('./1.txt',{ highWaterMark:1 }); //当流中有数据可供读取时就触发 rs.on('readable',function(){ letdata; //循环读取数据 //参数表示要读取的字节数 //如果可读的数据不足字节数,则返回缓冲区剩余数据 //如是没有指定字节数,则返回缓冲区中所有数据 while(data=rs.read()){ console.log(`读取到${data.length}字节数据`); console.log(data.toString()); } });
五、创建可写流,并监听事件
constfs=require('fs'); //创建一个文件可写流 letws=fs.createWriteStream('./1.txt',{ highWaterMark:3 }); //往流中写入数据 //参数一表示要写入的数据 //参数二表示编码方式 //参数三表示写入成功的回调 //缓冲区满时返回false,未满时返回true。 //由于上面我们设置的缓冲区大小为3字节,所以到写入第3个时,就返回了false。 console.log(ws.write('1','utf8')); console.log(ws.write('2','utf8')); console.log(ws.write('3','utf8')); console.log(ws.write('4','utf8')); functionwriteData(){ letcnt=9; returnfunction(){ letflag=true; while(cnt&&flag){ flag=ws.write(`${cnt}`); console.log('缓冲区中写入的字节数',ws.writableLength); cnt--; } }; } letwd=writeData(); wd(); //当缓冲区中的数据满的时候,应停止写入数据, //一旦缓冲区中的数据写入文件了,并清空了,则会触发'drain'事件,告诉生产者可以继续写数据了。 ws.on('drain',function(){ console.log('可以继续写数据了'); console.log('缓冲区中写入的字节数',ws.writableLength); wd(); }); //当流或底层资源关闭时触发 ws.on('close',function(){ console.log('文件被关闭'); }); //当写入数据出错时触发 ws.on('error',function(){ console.log('写入数据错误'); });
写入流的end()方法和'finish'事件监听
constfs=require('fs'); //创建一个文件可写流 letws=fs.createWriteStream('./1.txt',{ highWaterMark:3 }); //往流中写入数据 //参数一表示要写入的数据 //参数二表示编码方式 //参数三表示写入成功的回调 //缓冲区满时返回false,未满时返回true。 //由于上面我们设置的缓冲区大小为3字节,所以到写入第3个时,就返回了false。 console.log(ws.write('1','utf8')); console.log(ws.write('2','utf8')); console.log(ws.write('3','utf8')); console.log(ws.write('4','utf8')); //调用end()表明已经没有数据要被写入,在关闭流之前再写一块数据。 //如果传入了回调函数,则将作为'finish'事件的回调函数 ws.end('最后一点数据','utf8'); //调用end()且缓冲区数据都已传给底层系统时触发 ws.on('finish',function(){ console.log('写入完成'); });
写入流的cork()和uncork()方法,主要是为了解决大量小块数据写入时,内部缓冲可能失效,导致的性能下降。
constfs=require('fs'); letws=fs.createWriteStream('./1.txt',{ highWaterMark:1 }); //调用cork()后,会强制把所有写入的数据缓冲到内存中。 //不会因为写入的数据超过了highWaterMark的设置而写入到文件中。 ws.cork(); ws.write('1'); console.log(ws.writableLength); ws.write('2'); console.log(ws.writableLength); ws.write('3'); console.log(ws.writableLength); //将调用cork()后的缓冲数据都输出到目标,也就是写入文件中。 ws.uncork();
注意cork()的调用次数要与uncork()一致。
constfs=require('fs'); letws=fs.createWriteStream('./1.txt',{ highWaterMark:1 }); //调用一次cork()就应该写一次uncork(),两者要一一对应。 ws.cork(); ws.write('4'); ws.write('5'); ws.cork(); ws.write('6'); process.nextTick(function(){ //注意这里只调用了一次uncork() ws.uncork(); //只有调用同样次数的uncork()数据才会被输出。 ws.uncork(); });
六、可读流的pipe()方法
pipe()方法类似下面的代码,在可读流与可写流之前架起一座桥梁。
constfs=require('fs'); //创建一个可读流 letrs=fs.createReadStream('./1.txt',{ highWaterMark:3 }); //创建一个可写流 letws=fs.createWriteStream('./2.txt',{ highWaterMark:3 }); rs.on('data',function(data){ letflag=ws.write(data); console.log(`往可写流中写入${data.length}字节数据`); //如果写入缓冲区已满,则暂停可读流的读取 if(!flag){ rs.pause(); console.log('暂停可读流'); } }); //监控可读流数据是否读完 rs.on('end',function(){ console.log('数据已读完'); //如果可读流读完了,则调用end()表示可写流已写入完成 ws.end(); }); //如果可写流缓冲区已清空,可以再次写入,则重新打开可读流 ws.on('drain',function(){ rs.resume(); console.log('重新开启可读流'); });
我们用pipe()方法完成上面的功能。
constfs=require('fs'); //创建一个可读流 letrs=fs.createReadStream('./1.txt',{ highWaterMark:3 }); //创建一个可写流 letws=fs.createWriteStream('./2.txt',{ highWaterMark:3 }); letws2=fs.createWriteStream('./3.txt',{ highWaterMark:3 }); //绑定可写流到可读流,自动将可读流切换到流动模式,将可读流的所有数据推送到可写流。 rs.pipe(ws); //可以绑定多个可写流 rs.pipe(ws2);
我们也可以用unpipe()手动的解绑可写流。
constfs=require('fs'); //创建一个可读流 letrs=fs.createReadStream('./1.txt',{ highWaterMark:3 }); //创建一个可写流 letws=fs.createWriteStream('./2.txt',{ highWaterMark:3 }); letws2=fs.createWriteStream('./3.txt',{ highWaterMark:3 }); rs.pipe(ws); rs.pipe(ws2); //解绑可写流,如果参数没写,则解绑所有管道 setTimeout(function(){ rs.unpipe(ws2); },0);
希望本文所述对大家node.js程序设计有所帮助。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。