数据读写可以看作是事件模式(Event)的特例,不断发送的数据块好比一个个的事件。读数据是
事件,写数据是
- read
事件,而数据块是事件附带的信息。Node 为这类情况提供了一个特殊接口
- write
。
- Stream
"数据流"(stream)是处理系统缓存的一种方式。操作系统采用数据块(chunk)的方式读取数据,每收到一次数据,就存入缓存。Node 应用程序有两种缓存的处理方式,第一种是等到所有数据接收完毕,一次性从缓存读取,这就是传统的读取文件的方式;第二种是采用 "数据流" 的方式,收到一块数据,就读取一块,即在数据还没有接收完成时,就开始处理它。
第一种方式先将数据全部读入内存,然后处理,优点是符合直觉,流程非常自然,缺点是如果遇到大文件,要花很长时间,才能进入数据处理的步骤。第二种方式每次只读入数据的一小块,像 "流水" 一样,每当系统读入了一小块数据,就会触发一个事件,发出 "新数据块" 的信号。应用程序只要监听这个事件,就能掌握数据读取的进展,做出相应处理,这样就提高了程序的性能。
- var fs = require('fs');
- fs
- .createReadStream('./data/customers.csv')
- .pipe(process.stdout);
上面代码中,
方法就是以 "数据流" 的方式读取文件,这可以在文件还没有读取完的情况下,就输出到标准输出。这显然对大文件的读取非常有利。
- fs.createReadStream
Unix 操作系统从很早以前,就有 "数据流" 这个概念,它是不同进程之间传递数据的一种方式。管道命令(pipe)就起到在不同命令之间,连接数据流的作用。"数据流" 相当于把较大的数据拆成很小的部分,一个命令只要部署了数据流接口,就可以把一个流的输出接到另一个流的输入。Node 引入了这个概念,通过数据流接口为异步读写数据提供的统一接口,无论是硬盘数据、网络数据,还是内存数据,都可以采用这个接口读写。
数据流接口最大特点就是通过事件通信,具有
、
- readable
、
- writable
、
- drain
、
- data
、
- end
等事件,既可以读取数据,也可以写入数据。读写数据时,每读入(或写入)一段数据,就会触发一次
- close
事件,全部读取(或写入)完毕,触发
- data
事件。如果发生错误,则触发
- end
事件。
- error
一个对象只要部署了数据流接口,就可以从它读取数据,或者写入数据。Node 内部很多涉及 IO 处理的对象,都部署了 Stream 接口,下面就是其中的一些。
Stream 接口分成三类。
"可读数据流" 用来产生数据。它表示数据的来源,只要一个对象提供 "可读数据流",就表示你可以从其中读取数据。
- var Readable = require('stream').Readable;
- var rs = new Readable();
- rs.push('beep ');
- rs.push('boop\n');
- rs.push(null);
- rs.pipe(process.stdout);
上面代码产生了一个可写数据流,最后将其写入标注输出。可读数据流的
方法,用来将数据输入缓存。
- push
中的 null,用来告诉 rs,数据输入完毕。
- rs.push(null)
"可读数据流" 有两种状态:流动态和暂停态。处于流动态时,数据会尽快地从数据源导向用户的程序;处于暂停态时,必须显式调用
等指令,"可读数据流" 才会释放数据。刚刚新建的时候,"可读数据流" 处于暂停态。
- stream.read()
三种方法可以让暂停态转为流动态。
如果转为流动态时,没有 data 事件的监听函数,也没有 pipe 方法的目的地,那么数据将遗失。
以下两种方法可以让流动态转为暂停态。
注意,只移除 data 事件的监听函数,并不会自动引发数据流进入 "暂停态"。另外,存在 pipe 方法的目的地时,调用 pause 方法,并不能保证数据流总是处于暂停态,一旦那些目的地发出数据请求,数据流有可能会继续提供数据。
每当系统有新的数据,该接口可以监听到 data 事件,从而回调函数。
- var fs = require('fs');
- var readableStream = fs.createReadStream('file.txt');
- var data = '';
- readableStream.setEncoding('utf8');
- readableStream.on('data',
- function(chunk) {
- data += chunk;
- });
- readableStream.on('end',
- function() {
- console.log(data);
- });
上面代码中,fs 模块的 createReadStream 方法,是部署了 Stream 接口的文件读取方法。该方法对指定的文件,返回一个对象。该对象只要监听 data 事件,回调函数就能读到数据。
除了 data 事件,监听 readable 事件,也可以读到数据。
- var fs = require('fs');
- var readableStream = fs.createReadStream('file.txt');
- var data = '';
- var chunk;
- readableStream.setEncoding('utf8');
- readableStream.on('readable',
- function() {
- while ((chunk = readableStream.read()) !== null) {
- data += chunk;
- }
- });
- readableStream.on('end',
- function() {
- console.log(data)
- });
readable 事件表示系统缓冲之中有可读的数据,使用 read 方法去读出数据。如果没有数据可读,read 方法会返回 null。
"可读数据流" 除了 read 方法,还有以下方法。
一个数据流的
属性返回一个布尔值。如果数据流是一个仍然打开的可读数据流,就返回
- readable
,否则返回
- true
。
- false
read 方法从系统缓存读取并返回数据。如果读不到数据,则返回 null。
该方法可以接受一个整数作为参数,表示所要读取数据的数量,然后会返回该数量的数据。如果读不到足够数量的数据,返回 null。如果不提供这个参数,默认返回系统缓存之中的所有数据。
只在 "暂停态" 时,该方法才有必要手动调用。"流动态" 时,该方法是自动调用的,直到系统缓存之中的数据被读光。
- var readable = getReadableStreamSomehow();
- readable.on('readable', function() {
- var chunk;
- while (null !== (chunk = readable.read())) {
- console.log('got %d bytes of data', chunk.length);
- }
- });
如果该方法返回一个数据块,那么它就触发了 data 事件。
可读数据流的
方法,可以将数据放入可读数据流。
- _read
- var Readable = require('stream').Readable;
- var rs = Readable();
- var c = 97;
- rs._read = function () {
- rs.push(String.fromCharCode(c++));
- if (c > 'z'.charCodeAt(0)) rs.push(null);
- };
- rs.pipe(process.stdout);
运行结果如下。
- $ node read1.js
- abcdefghijklmnopqrstuvwxyz
调用该方法,会使得数据流返回指定编码的字符串,而不是缓存之中的二进制对象。比如,调用
,数据流会返回 UTF-8 字符串,调用
- setEncoding('utf8')
,数据流会返回 16 进制的字符串。
- setEncoding('hex')
的参数是字符串的编码方法,比如
- setEncoding
、
- utf8
、
- ascii
等。
- base64
该方法会正确处理多字节的字符,而缓存的方法
不会。所以如果想要从数据流读取字符串,应该总是使用该方法。
- buf.toString(encoding)
- var readable = getReadableStreamSomehow();
- readable.setEncoding('utf8');
- readable.on('data', function(chunk) {
- assert.equal(typeof chunk, 'string');
- console.log('got %d characters of string data', chunk.length);
- });
方法会使得 "可读数据流" 继续释放
- resume
事件,即转为流动态。
- data
- // 新建一个readable数据流
- var readable = getReadableStreamSomehow();
- readable.resume();
- readable.on('end', function(chunk) {
- console.log('数据流到达尾部,未读取任务数据');
- });
上面代码中,调用
方法使得数据流进入流动态,只定义
- resume
事件的监听函数,不定义
- end
事件的监听函数,表示不从数据流读取任何数据,只监听数据流到达尾部。
- data
方法使得流动态的数据流,停止释放
- pause
事件,转而进入暂停态。任何此时已经可以读到的数据,都将停留在系统缓存。
- data
- // 新建一个readable数据流
- var readable = getReadableStreamSomehow();
- readable.on('data',
- function(chunk) {
- console.log('读取 % d字节的数据', chunk.length);
- readable.pause();
- console.log('接下来的1秒内不读取数据');
- setTimeout(function() {
- console.log('数据恢复读取');
- readable.resume();
- },
- 1000);
- });
该方法返回一个布尔值,表示 "可读数据流" 被客户端手动暂停(即调用了 pause 方法),目前还没有调用 resume 方法。
- var readable = new stream.Readable
- readable.isPaused() // === false
- readable.pause()
- readable.isPaused() // === true
- readable.resume()
- readable.isPaused() // === false
pipe 方法是自动传送数据的机制,就像管道一样。它从 "可读数据流" 读出所有数据,将其写出指定的目的地。整个过程是自动的。
- src.pipe(dst)
pipe 方法必须在可读数据流上调用,它的参数必须是可写数据流。
- var fs = require('fs');
- var readableStream = fs.createReadStream('file1.txt');
- var writableStream = fs.createWriteStream('file2.txt');
- readableStream.pipe(writableStream);
上面代码使用 pipe 方法,将 file1 的内容写入 file2。整个过程由 pipe 方法管理,不用手动干预,所以可以将传送数据写得很简洁。
pipe 方法返回目的地的数据流,因此可以使用链式写法,将多个数据流操作连在一起。
- a.pipe(b).pipe(c).pipe(d)
- // 等同于
- a.pipe(b);
- b.pipe(c);
- c.pipe(d);
下面是一个例子。
- var fs = require('fs');
- var zlib = require('zlib');
- fs.createReadStream('input.txt.gz')
- .pipe(zlib.createGunzip())
- .pipe(fs.createWriteStream('output.txt'));
上面代码采用链式写法,先读取文件,然后进行压缩,最后输出。
下面的写法模拟了 Unix 系统的 cat 命令,将标准输出写入标准输入。
- process.stdin.pipe(process.stdout);
当来源地的数据流读取完成,默认会调用目的地的 end 方法,就不再能够写入。对 pipe 方法传入第二个参数
,可以让目的地的数据流保持打开。
- { end: false }
- reader.pipe(writer, {
- end: false
- });
- reader.on('end',
- function() {
- writer.end('Goodbye\n');
- });
上面代码中,目的地数据流默认不会调用 end 方法,只能手动调用,因此 "Goodbye" 会被写入。
该方法移除 pipe 方法指定的数据流目的地。如果没有参数,则移除所有的 pipe 方法目的地。如果有参数,则移除该参数指定的目的地。如果没有匹配参数的目的地,则不会产生任何效果。
- var readable = getReadableStreamSomehow();
- var writable = fs.createWriteStream('file.txt');
- readable.pipe(writable);
- setTimeout(function() {
- console.log('停止写入file.txt');
- readable.unpipe(writable);
- console.log('手动关闭file.txt的写入数据流');
- writable.end();
- }, 1000);
上面代码写入 file.txt 的时间,只有 1 秒钟,然后就停止写入。
下面代码中,
是一个 readable 数据流,它可以监听以下事件。
- s
- s.on('data', f); // 收到新的数据时,data事件就会发生,触发f()
- s.on('end', f); // 数据读取完以后,不会再收到数据了,end事件发生,触发f()
- s.on('error', f); // 发生错误时,error事件发生,触发f()
- s.readable // => true if it is a readable stream that is still open
- s.pause(); // Pause "data" events. For throttling uploads, e.g.
- s.resume(); // Resume again
- (1)readable
- readable事件在数据流能够向外提供数据时触发。
- ```javascript
- var readable = getReadableStreamSomehow();
- readable.on('readable', function() {
- // there is some data to read now
- });
下面是一个例子。
- process.stdin.on('readable', function () {
- var buf = process.stdin.read();
- console.dir(buf);
- });
上面代码将标准输入的数据读出。
read 方法接受一个整数作为参数,表示以多少个字节为单位进行读取。
- process.stdin.on('readable', function () {
- var buf = process.stdin.read(3);
- console.dir(buf);
- });
上面代码将以 3 个字节为单位进行输出内容。
(2)data
对于那些没有显式暂停的数据流,添加 data 事件监听函数,会将数据流切换到流动态,尽快向外提供数据。
- var readable = getReadableStreamSomehow();
- readable.on('data', function(chunk) {
- console.log('got %d bytes of data', chunk.length);
- });
(3)end
无法再读取到数据时,会触发 end 事件。也就是说,只有当前数据被完全读取完,才会触发 end 事件,比如不停地调用 read 方法。
- var readable = getReadableStreamSomehow();
- readable.on('data',
- function(chunk) {
- console.log('got % d bytes of data', chunk.length);
- });
- readable.on('end',
- function() {
- console.log('there will be no more data.');
- });
(4)close
数据源关闭时,close 事件被触发。并不是所有的数据流都支持这个事件。
(5)error
当读取数据发生错误时,error 事件被触发。
可读数据流又分成两种,一种是 pull 模式,自己拉数据,就好像用吸管吸水,只有你吸了,水才会上来;另一种是 push 模式,数据自动推送给你,就好像水从水龙头自动涌出来。如果监听
事件,那么自动激活 push 模式;如果自己从数据流读取数据,那就是在使用 pull 模式。
- data
任何对象都可以部署可读数据流的接口。
- var Readable = require('stream').Readable;
- var util = require('util');
- function MyObject(options) {
- if (! (this instanceof MyObject)) return new MyObject(options);
- if (! options) options = {};
- options.objectMode = true;
- Readable.call(this, options);
- }
- util.inherits(MyObject, Readable);
- MyObject.prototype._read = function read() {
- var self = this;
- someMethodGetData(function(err, data) {
- if (err) self.emit('error', err);
- else self.push(data);
- });
- };
上面代码中,构造函数
继承了读数据流的接口。
- MyObject
设为
- options.objectMode
,是为了设置数据流处理的是对象,而不是字符串或者 buffer。此外,还要在
- true
上部署
- MyObject.prototype
方法,每当数据流要读取数据,就会调用这个方法。在这个方法里面,我们取到数据,使用
- _read
将数据放进数据流。
- stream.push(data)
然后,
的实例就可以使用 "读数据流" 的接口了。
- MyObject
- var myObj = new MyObject();
- myObj.on('data', function(data) {
- console.log(data);
- });
上面是 push 模式,下面是 pull 模式。
- var myObj = new MyObject();
- var data = myObj.read();
也可以暂停 / 恢复读数据。
- myObj
- myObj.pause();
- setTimeout(function () {
- myObj.resume();
- }, 5000);
模块的
- fs
方法,就可以创建一个读取数据的数据流。
- createReadStream
- var fs = require('fs');
- var stream = fs.createReadStream('readme.txt');
- stream.setEncoding('utf8');
上面代码创建了一个文本文件
的数据流。由于这个数据流会当作文本处理,所以要用
- readme.txt
方法设定编码。
- setEncoding
然后,监听
事件,获取每一个数据块;监听
- data
事件,当数据传送结束,再统一处理。
- end
- var data = '';
- stream.on('data', function(chunk) {
- data += chunk;
- })
- stream.on('end', function() {
- console.log('Data length: %d', data.length);
- });
监听
事件,也可以取得与监听
- readable
事件同样的效果。
- data
- var data = '';
- stream.on('readable', function() {
- var chunk;
- while(chunk = stream.read()) {
- data += chunk;
- }
- });
数据流还有
和
- pause
方法,可以暂停和恢复数据传送。
- resume
- // 暂停
- stream.pause();
- // 1秒后恢复
- setTimeout(stream.resume(), 1000);
注意,数据流新建以后,默认状态是暂停,只有指定了
事件的回调函数,或者调用了
- data
方法,数据才会开发发送。
- resume
如果要同时使用
与
- readable
事件,可以像下面这样写。
- data
- stream.pause();
- var pulledData = '';
- var pushedData = '';
- stream.on('readable',
- function() {
- var chunk;
- while (chunk = stream.read()) {
- pulledData += chunk;
- }
- });
- stream.on('data',
- function(chunk) {
- pushedData += chunk;
- });
上面代码中,显式调用
方法,会使得
- pause
事件释放一个
- readable
事件,否则
- data
监听无效。
- data
如果觉得
事件和
- data
事件写起来太麻烦,Stream 接口还提供了
- end
方法,自动处理这两个事件。数据流通过
- pipe
方法,可以方便地导向其他具有 Stream 接口的对象。
- pipe
- var fs = require('fs');
- var zlib = require('zlib');
- fs.createReadStream('wow.txt')
- .pipe(zlib.createGzip())
- .pipe(process.stdout);
上面代码先打开文本文件
,然后压缩,再导向标准输出。
- wow.txt
- fs.createReadStream('wow.txt')
- .pipe(zlib.createGzip())
- .pipe(fs.createWriteStream('wow.gz'));
上面代码压缩文件
以后,又将其写回压缩文件。
- wow.txt
下面代码新建一个 Stream 实例,然后指定写入事件和终止事件的回调函数,再将其接到标准输入之上。
- var stream = require('stream');
- var Stream = stream.Stream;
- var ws = new Stream;
- ws.writable = true;
- ws.write = function(data) {
- console.log("input=" + data);
- }
- ws.end = function(data) {
- console.log("bye");
- }
- process.stdin.pipe(ws);
调用上面的脚本,会产生以下结果。
- $ node pipe_out.js
- hello
- input=hello
- ^d
- bye
上面代码调用脚本下,键入
,会输出
- hello
。然后按下
- input=hello
,会输出
- ctrl-d
。使用管道命令,可以看得更清楚。
- bye
- $ echo hello | node pipe_out.js
- input=hello
- bye
"可读数据流" 用来对外释放数据,"可写数据流" 则是用来接收数据。它允许你将数据写入某个目的地。它是数据写入的一种抽象,不同的数据目的地部署了这个接口以后,就可以用统一的方法写入。
以下是部署了可写数据流的一些场合。
只要调用
,就能将数据写入可读数据流。
- stream.write(o)
可以指定回调函数
- stream.write(payload, callback)
,一旦缓存中的数据释放(
- callback
),就会调用这个回调函数。
- payload
部署 "可写数据流",必须继承
,以及实现
- stream.Writable
方法。下面是一个例子,数据库的写入接口部署 "可写数据流" 接口。
- stream._write
- var Writable = require('stream').Writable;
- var util = require('util');
- module.exports = DatabaseWriteStream;
- function DatabaseWriteStream(options) {
- if (! (this instanceof DatabaseWriteStream))
- return new DatabaseWriteStream(options);
- if (! options) options = {};
- options.objectMode = true;
- Writable.call(this, options);
- }
- util.inherits(DatabaseWriteStream, Writable);
- DatabaseWriteStream.prototype._write = function write(doc, encoding, callback) {
- insertIntoDatabase(JSON.stringify(doc), callback);
- };
上面代码中,
方法执行实际的写入操作,它必须接受三个参数。
- _write
:要写入的数据块
- chunk
:如果写入的是字符串,必须字符串的编码
- encoding
:写入完成后或发生错误时的回调函数
- callback
下面是用法的例子。
- var DbWriteStream = require('. / db_write_stream');
- var db = DbWriteStream();
- var Thermometer = require('. / thermometer');
- var thermomether = Thermometer();
- thermomether.on('data',
- function(temp) {
- db.write({
- when: Date.now(),
- temperature: temp
- });
- });
下面是 fs 模块的可写数据流的例子。
- var fs = require('fs');
- var readableStream = fs.createReadStream('file1.txt');
- var writableStream = fs.createWriteStream('file2.txt');
- readableStream.setEncoding('utf8');
- readableStream.on('data', function(chunk) {
- writableStream.write(chunk);
- });
上面代码中,fs 模块的
方法针对特定文件,创建了一个 "可写数据流",本质上就是对写入操作部署了
- createWriteStream
接口。然后,"可写数据流" 的
- Stream
方法,可以将数据写入文件。
- write
属性返回一个布尔值。如果数据流仍然打开,并且可写,就返回
- writable
,否则返回
- true
。
- false
- s.writeable
方法用于向 "可写数据流" 写入数据。它接受两个参数,一个是写入的内容,可以是字符串,也可以是一个
- write
对象(比如可读数据流)或
- stream
对象(表示二进制数据),另一个是写入完成后的回调函数,它是可选的。
- buffer
来源: http://www.bubuko.com/infodetail-1954238.html