一. node.js 中的流是什么
stream(流) 是 Node.js 提供的又一个仅在服务区端可用的模块, 流是一种抽象的数据结构 Stream 是一个抽象接口, Node 中有很多对象实现了这个接口例如, 对 http 服务器发起请求的 request 对象就是一个 Stream, 还有 stdout(标准输出流)
顾名思义, 流的意思就是数据的流动, 就好比停水了, 楼上的人存了一些水, 楼下的人发请求想借楼上的人一桶水, 直接搬费力又麻烦, 直接往下倒容易倒到地上, 于是可以用一根管子连接两个桶 (A 和 B), 楼上的人直接把 A 桶的里水通过管子流到楼下的 B 桶里, 这就类似于 request 对象向服务器发请求要资源, 在这 request 请求资源的传播方式通过流来实现
再举个例子, 可以把数据看成是数据流, 如果我们在键盘上打字, 电脑程序把字符一个一个输出到屏幕上, 这也可以看成是一个流, 这个可以叫做标准输出 (stdout)
二. 为什么要在 node 中使用流
看了前面稍微了解 node 的同学可能就要问了, 流的作用不就是传递数据麽, 也就是把一个地方数据拷贝到另一个地方, 不用流也可以这样实现:
- var water = fs.readFileSync('a.txt', {encoding: 'utf8'});
- fs.writeFileSync('b.txt', water);
是的, 只要使用 node 的读写文件的功能就能实现上面借水的效果, 但这样做有个致命问题:
处理数据量较大的文件时不能分块处理, 导致速度慢, 内存容易爆满
使用读写方式是把文件内容全部读入内存, 然后再写入文件, 对于小型的文本文件问题不大, 但是遇到较大的比如音频视频文件, 动辄几个 GB 大小实在承受不住, 而流可以把文件资源拆分成小块, 一块一块的运输, 资源就像水流一样进行传输, 使用流的话上述功能可以这样写:
- var fs = require('fs');
- var readStream = fs.createReadStream('a.mp4'); // 创建可读流
- var writeStream = fs.createWriteStream('b.mp4'); // 创建可写流
- readStream.on('data', function(chunk) { // 当有数据流出时, 写入数据
- writeStream.write(chunk);
- });
- readStream.on('end', function() { // 当没有数据时, 关闭数据流
- writeStream.end();
- });
但这样写还是有一些问题的, 如果说写入的速度跟不上读取的速度, 有可能导致数据丢失正常的情况应该是, 写完一段, 再读取下一段, 如果没有写完的话, 就让读取流先暂停, 等写完再继续, 所以为了让可读流和可写流速度一致, 就要用到流中必不可少的属性 pipe 了, pipe 翻译过来意思是管道, 顾名思义, 就想上面的倒水一样, 如果不用一根管子相连, A 桶倒进 B 桶的水不会均速传输, 可能会导致水的浪费, 用 pipe 可以这样解决上述问题:
- fs.createReadStream('a.mp4').pipe(fs.createWriteStream('b.mp4));
- // pipe 自动调用了 data,end 等事件'
需要特别注意的是, pipe() 只是可读流的方法, 也就是说只能从可读流中通过 pipe 方法拷贝数据到可写流, 反之则不行, 写的时候要注意顺序
三. 流的四种类型
Stream 提供了以下四种类型的流:
Readable 可读流
Writable 可写流
Duplex 可读可写流
Transform 在读写过程中可以修改和变换数据的 Duplex 流
1.Readable
可读流有五个参数:
highWaterMark 缓存区字节大小, 默认 16384
encoding 字符编码, 默认为 null, 就是 buffer
objectMode 是否操作 js 其他类型 默认 false
read 对内部的_read() 方式实现 子类实现, 父类调用
destroy 对内部的_ destroy() 方法实现 子类实现, 父类调用
可读流中分为 2 种模式流动模式和暂停模式
1 流动模式: 可读流自动读取数据, 通过 EventEmitter 接口的事件尽快将数据提供给应用
2 暂停模式: 必须显式调用 stream.read() 方法来从流中读取数据片段
暂停模式切换到流动模式 i:
1 监听 data 事件
2 调用 stream.resume() 方法
3 调用 stream.pipe() 方法将数据发送到可写流
流动模式切换到暂停模式:
1 如果不存在管道目标, 调用 stream.pause() 方法
2 如果存在管道目标, 调用 stream.unpipe() 并取消'data'事件监听
可读流事件:'data','readable','error','close','end'
监听 data 事件, 触发流动模式
- const { Readable } = require('stream');
- let i = 0;
- const rs = Readable({
- encoding: 'utf8',
- // 这里传入的 read 方法, 会被写入_read()
- read: (size) => {
- // size 为 highWaterMark 大小
- // 在这个方法里面实现获取数据, 读取到数据调用 rs.push([data]), 如果没有数据了, push(null) 结束流
- if (i < 6) {
rs.push(` 当前读取数据: ${i++}`);
- } else {
- rs.push(null);
- }
- },
- // 源代码, 可覆盖
- destroy(err, cb) {
- rs.push(null);
- cb(err);
- }
- });rs.on('data', (data) = >{
- console.log(data);
- // 每次 push 数据则触发 data 事件
监听 readable 事件, 触发暂停模式, 当流有了新数据或到了流结束之前触发 readable 事件, 需要显示调用 read([size]) 读取数据:
- const { Readable } = require('stream');
- let i = 0;
- const rs = Readable({
- encoding: 'utf8',
- highWaterMark: 9,
- // 这里传入的 read 方法, 会被写入_read()
- read: (size) => {
- // size 为 highWaterMark 大小
- // 在这个方法里面实现获取数据, 读取到数据调用 rs.push([data]), 如果没有数据了, push(null) 结束流
- if (i < 10) {
- // push 其实是把数据放入缓存区
rs.push(` 当前读取数据: ${i++}`);
- } else {
- rs.push(null);
- }
- }
- });
- rs.on('readable', () => {
- const data = rs.read(9);
- console.log(data);
- //
- })
- 2. Writable
可写流有以下参数:
highWaterMark 缓存区字节大小, 默认 16384
decodeStrings 是否将字符编码传入缓冲区
objectMode 是否操作 js 其他类型 默认 false
write 子类实现, 供父类调用 实现写入底层数据
writev 子类实现, 供父类调用 一次处理多个 chunk 写入底层数据
destroy 可以覆盖父类方法, 不能直接调用, 销毁流时, 父类调用
final 完成写入所有数据时父类触发
- const Writable = require('stream').Writable
- const writable = Writable()
- // 实现 `_write` 方法
- // 这是将数据写入底层的逻辑
- writable._write = function (data, enc, next) {
- // 将流中的数据写入底层
- process.stdout.write(data.toString().toUpperCase())
- // 写入完成时, 调用 `next()` 方法通知流传入下一个数据
- process.nextTick(next)
- }
- // 所有数据均已写入底层
- writable.on('finish', () => process.stdout.write('DONE'))
- // 将一个数据写入流中
- writable.write('a' + '\n')
- writable.write('b' + '\n')
- writable.write('c' + '\n')
- // 再无数据写入流时, 需要调用 `end` 方法
- writable.end()
- 3. Duplex
Duplex 为读写流, 既可当成可读流来使用, 也可当成可写流来使用, 实际上就是继承了 Readable 和 Writable 的一类流所以, Duplex 拥有 Writable 和 Readable 所有方法和事件, 但各自独立缓存区, 一个 Duplex 对象可以同时实现_read() 和_write() 方法
- var Duplex = require('stream').Duplex
- var duplex = Duplex()
- // 可读端底层读取逻辑
- duplex._read = function () {
- this._readNum = this._readNum || 0
- if (this._readNum > 1) {
- this.push(null)
- } else {
- this.push('' + (this._readNum++))
- }
- }
- // 可写端底层写逻辑
- duplex._write = function (buf, enc, next) {
- // a, b
- process.stdout.write('_write' + buf.toString() + '\n')
- next()
- }
- // 0, 1
- duplex.on('data', data => console.log('ondata', data.toString()))
- duplex.write('a')
- duplex.write('b')
- duplex.end()
- 4. Transform
Tranform 为转换流, 它继承自 Duplex, 并已经实现了_read 和_write 方法, 同时要求用户实现一个_transform 方法, 从一个地方读取数据, 转换数据后输出到一个地方
- const stream = require('stream');
- const transform = stream.Transform({
- transform(chunk, encoding, cb) {
- // 把数据转换成小写字母, 然后 push 到缓存区
- this.push(chunk.toString().toLowerCase());
- cb();
- }
- });
- transform.write('D');
- console.log(transform.read(1).toString()); // d
四. stream 中的 pipe
前面已经说过, pipe 的作用是在流中搭建一条管道, 从可读流中到可写流, 目的是实现读取和写入步调一致, 边读边写
- const stream = require('stream');
- const readStream = stream.Readable({
- read() {
- this.push(fs.readFileSync('a.txt')); // 文件内容 test
- this.push(null);
- }
- });
- const writeStream = stream.Writable({
- write(chunk, encoding, cb) {
- // chunk 为 test buffer
- fs.writeFileSync('b.txt', chunk.toString());
- cb();
- }
- });
- writeStream.on('pipe', data = >{
- // 触发 pipe 事件
- console.log(data);
- });
- readStream.pipe(writeStream);
来源: http://www.jianshu.com/p/8738832e7515