Stream 在 node.js 中是一个抽象的接口,基于 EventEmitter,也是一种 Buffer 的高级封装,用来处理流数据。流模块便是提供各种 API 让我们可以很简单的使用 Stream。
流分为四种类型,如下所示:
通过 stream.Readable 可创建一个可读流,它有两种模式:暂停和流动。
在流动模式下,将自动从下游系统读取数据并使用 data 事件输出;暂停模式下,必须显示调用
方法读取数据,并触发 data 事件。 所有的可读流最开始都是暂停模式,可以通过以下方法切换到流动模式:
- stream.read()
方法
- stream.resume()
方法将数据输出到一个可写流 Writable
- stream.pipe()
同样地,也可以切换到暂停模式,有两种方法:
方法即可。
- stream.pause()
方法
- stream.unpipe()
在 Readable 对象中有一个
的对象,通过该对象可以得知流当前处于什么模式,如下所示:
- _readableSate
为什么使用流取数据
对于小文件,使用
方法读取数据更方便,但需要读取大文件的时候,比如几 G 大小的文件,使用该方法将消耗大量的内存,甚至使程序崩溃。这种情况下,使用流来处理是更合适的,采用分段读取,便不会造成内存的'爆仓'问题。 data 事件 在 stream 提供数据块给消费者时触发,有可能是切换到流动模式的时候,也有可能是调用
- fs.readFile()
方法且有有效数据块的时候,使用如下所示:
- readable.read()
- const fs = require('fs');
- const rs = fs.createReadStream('./appbak.js');
- var chunkArr = [],
- chunkLen = 0;
- rs.on('data', (chunk) = >{
- chunkArr.push(chunk);
- chunkLen += chunk.length;
- });
- rs.on('end', (chunk) = >{
- console.log(Buffer.concat(chunkArr, chunkLen).toString());
- });
readable 事件
当流中有可用数据能被读取时触发,分为两种,新的可用的数据和到达流的末尾,前者
方法返回可用数据,后者返回 null,如下所示:
- stream.read()
- const rs = fs.createReadStream('./appbak.js');
- var chunkArr = [],
- chunkLen = 0;
- rs.on('readable', () = >{
- var chunk = null;
- //这里需要判断是否到了流的末尾
- if ((chunk = rs.read()) !== null) {
- chunkArr.push(chunk);
- chunkLen += chunk.length;
- }
- });
- rs.on('end', (chunk) = >{
- console.log(Buffer.concat(chunkArr, chunkLen).toString());
- });
pause 和 resume 方法
方法让流进入暂停模式,并停止'data'事件触发,
- stream.pause()
方法使流进入流动模式,并恢复'data'事件触发,也可以用来消费所有数据,如下所示:
- stream.resume()
- const rs = fs.createReadStream('./下载.png');
- rs.on('data', (chunk) = >{
- console.log(`接收到$ {
- chunk.length
- }字节数据...`);
- rs.pause();
- console.log(`数据接收将暂停1.5秒.`);
- setTimeout(() = >{
- rs.resume();
- },
- 1000);
- });
- rs.on('end', (chunk) = >{
- console.log(`数据接收完毕`);
- });
pipe(destination[, options]) 方法
方法绑定一个可写流到可读流上,并自动切换到流动模式,将所有数据输出到可写流,以及做好了数据流的管理,不会发生数据丢失的问题,使用如下所示:
- pipe()
- const rs = fs.createReadStream('./app.js');
- rs.pipe(process.stdout);
以上介绍了多种可读流的数据消费的方法,但对于一个可读流,最好只选择其中的一种,推荐使用
方法。
- pipe()
所有的可写流都是基于
类创建的,创建之后便可将数据写入该流中。 write(chunk[, encoding][, callback]) 方法
- stream.Writable
方法向可写流中写入数据,参数含义:
- write()
该方法的返回值为布尔值,如果为 false,则表示需要写入的数据块被缓存并且此时缓存的大小超出 highWaterMark 阀值,否则为 true。
使用如下所示:
- const ws = fs.createWriteStream('./test.txt');
- ws.write('nihao', 'utf8', () = >{
- process.stdout.write('this chunk is flushed.');
- });
- ws.end('done.')
背压机制
如果可写流的写入速度跟不上可读流的读取速度,write 方法添加的数据将被缓存,逐渐增多,导致占用大量内存。我们希望的是消耗一个数据,再去读取一个数据,这样内存就维持在一个水平上。如何做到这一点?可以利用 write 方法的返回值来判断可写流的缓存状态和'drain'事件,及时切换可读流的模式,如下所示:
- function copy(src, dest) {
- src = path.resolve(src);
- dest = path.resolve(dest);
- const rs = fs.createReadStream(src);
- const ws = fs.createWriteStream(dest);
- console.log('正在复制中...');
- const stime = +new Date();
- rs.on('data', (chunk) = >{
- if (null === ws.write(chunk)) {
- rs.pause();
- }
- });
- ws.on('drain', () = >{
- rs.resume();
- });
- rs.on('end', () = >{
- const etime = +new Date();
- console.log(`已完成,用时:$ { (etime - stime) / 1000
- }秒`);
- ws.end();
- });
- function calcProgress() {
- }
- }
- copy('./CSS权威指南 第3版.pdf', './javascript.pdf');
drain 事件
如果
方法返回 false,则 drain 事件将会被触发,上面的背压机制已经使用了该事件。 finish 事件 在调用
- Writable.write()
方法之后且所有缓存区的数据都被写入到下游系统,就会触发该事件,如下所示:
- stream.end()
- const ws = fs.createWriteStream('./alphabet.txt');
- const alphabetStr = 'abcdefghijklmnopqrstuvwxyz';
- ws.on('finish', () = >{
- console.log('done.');
- });
- for (let letter of alphabetStr.split()) {
- ws.write(letter);
- }
- ws.end(); //必须调用
end([chunk][, encoding][, callback]) 方法
方法被调用之后,便不能再调用
- end()
方法写入数据,负责将抛出错误。
- stream.write()
Duplex 流同时实现了 Readable 与 Writable 类的接口,既是可读流,也是可写流。例如'zlib streams'、'crypto streams'、'TCP sockets'等都是 Duplex 流。
Duplex 流的扩展,区别在于,Transform 流自动将写入端的数据变换后添加到可读端。例如:'zlib streams'、'crypto streams'等都是 Transform 流。
模块提供的 API 可以让我们很简单的实现流,该模块使用
- stream
引用,我们只要继承四种流中的一个基类
- require('stream')
,然后实现它的接口就可以了,需要实现的接口如下所示: | Use-case | Class | Method(s) to implement | | ------------- |-------------| -----| | Reading only | Readable | _read | | Writing only | Writable | _write, _writev | | Reading and writing | Duplex | _read, _write, _writev | | Operate on written data, then read the result | Transform | _transform, _flush |
- (stream.Writable, stream.Readable, stream.Duplex, or stream.Transform)
Readable 流实现
如上所示,我们只要继承 Readable 类并实现_read 接口即可,,如下所示:
- const Readable = require('stream').Readable;
- const util = require('util');
- const alphabetArr = 'abcdefghijklmnopqrstuvwxyz'.split();
- /*function AbReadable(){
- if(!this instanceof AbReadable){
- return new AbReadable();
- }
- Readable.call(this);
- }
- util.inherits(AbReadable,Readable);
- AbReadable.prototype._read = function(){
- if(!alphabetArr.length){
- this.push(null);
- }else{
- this.push(alphabetArr.shift());
- }
- };
- const abReadable = new AbReadable();
- abReadable.pipe(process.stdout);*/
- /*class AbReadable extends Readable{
- constructor(){
- super();
- }
- _read(){
- if(!alphabetArr.length){
- this.push(null);
- }else{
- this.push(alphabetArr.shift());
- }
- }
- }
- const abReadable = new AbReadable();
- abReadable.pipe(process.stdout);*/
- /*const abReadable = new Readable({
- read(){
- if(!alphabetArr.length){
- this.push(null);
- }else{
- this.push(alphabetArr.shift());
- }
- }
- });
- abReadable.pipe(process.stdout);*/
- const abReadable = Readable();
- abReadable._read = function() {
- if (!alphabetArr.length) {
- this.push(null);
- } else {
- this.push(alphabetArr.shift());
- }
- }
- abReadable.pipe(process.stdout);
以上代码使用了四种方法创建一个 Readable 可读流,必须实现
方法,以及用到了
- _read()
方法,该方法的作用是将指定的数据添加到读取队列。 Writable 流实现 我们只要继承 Writable 类并实现_write 或_writev 接口,如下所示 (只使用两种方法):
- readable.push()
- /*class MyWritable extends Writable{
- constructor(){
- super();
- }
- _write(chunk,encoding,callback){
- process.stdout.write(chunk);
- callback();
- }
- }
- const myWritable = new MyWritable();*/
- const myWritable = new Writable({
- write(chunk, encoding, callback) {
- process.stdout.write(chunk);
- callback();
- }
- });
- myWritable.on('finish', () = >{
- process.stdout.write('done');
- }) myWritable.write('a');
- myWritable.write('b');
- myWritable.write('c');
- myWritable.end();
Duplex 流实现
实现 Duplex 流,需要继承 Duplex 类,并实现_read 和_write 接口,如下所示:
- class MyDuplex extends Duplex {
- constructor() {
- super();
- this.source = [];
- }
- _read() {
- if (!this.source.length) {
- this.push(null);
- } else {
- this.push(this.source.shift());
- }
- }
- _write(chunk, encoding, cb) {
- this.source.push(chunk);
- cb();
- }
- }
- const myDuplex = new MyDuplex();
- myDuplex.on('finish', () = >{
- process.stdout.write('write done.')
- });
- myDuplex.on('end', () = >{
- process.stdout.write('read done.')
- });
- myDuplex.write('\na\n');
- myDuplex.write('c\n');
- myDuplex.end('b\n');
- myDuplex.pipe(process.stdout);
上面的代码实现了
方法,可作为可读流来使用,同时实现了
- _read()
方法,又可作为可写流来使用。 Transform 流实现
- _write()
- class MyTransform extends Transform {
- constructor() {
- super();
- }
- _transform(chunk, encoding, callback) {
- chunk = (chunk + '').toUpperCase();
- callback(null, chunk);
- }
- }
- const myTransform = new MyTransform();
- myTransform.write('hello world!');
- myTransform.end();
- myTransform.pipe(process.stdout);
上面代码中的
方法,其第一个参数,要么为 error,要么为 null,第二个参数将被自动转发给
- _transform()
方法,因此该方法也可以使用如下写法:
- readable.push()
- _transform(chunk, encoding, callback) {
- chunk = (chunk + '').toUpperCase() this.push(chunk) callback();
- }
Object Mode 流实现
我们知道流中的数据默认都是 Buffer 类型,可读流的数据进入流中便被转换成 buffer,然后被消耗,可写流写入数据时,底层调用也将其转化为 buffer。但将构造函数的 objectMode 选择设置为 true,便可产生原样的数据,如下所示:
- const rs = Readable();
- rs.push('a');
- rs.push('b');
- rs.push(null);
- rs.on('data', (chunk) = >{
- console.log(chunk);
- }); //<Buffer 61>与<Buffer 62>
- const rs1 = Readable({
- objectMode: !0
- });
- rs1.push('a');
- rs1.push('b');
- rs1.push(null);
- rs1.on('data', (chunk) = >{
- console.log(chunk);
- }); //a与b
下面利用 Transform 流实现一个简单的 CSS 压缩工具,如下所示:
- function minify(src, dest) {
- const transform = new Transform({
- transform(chunk, encoding, cb) {
- cb(null, (chunk.toString()).replace(/[\s\r\n\t]/g, ''));
- }
- });
- fs.createReadStream(src, {
- encoding: 'utf8'
- }).pipe(transform).pipe(fs.createWriteStream(dest));
- }
- minify('./reset.css', './reset.min.css');
来源: http://www.cnblogs.com/zmxmumu/p/6141482.html