背景
之前在开发 ASP.NET 的时候, 根据源代码依次追踪整个程序的执行过程, 发现最重要的过程是基于一个管道流的, 像水管一样, 依次处理管道流中的十几个事件, 当时对流的认知是四个字, 依次执行那么现在做 Node 的开发, 对于 Node 中的流是另四个字, 那就是源源不断本篇文章主要目的是带大家手写可读流与可写流
简介
在 Node 中, 请求流, 响应流, 文件流等都是基于 stream 模块封装的简单的理解, 流就是将大块的东西, 分小块依次处理就像你需要 10kg 的水, 水管就一点点的源源不断的流出来给你又如在程序中
fs.readFileSync('/demo.txt', {encoding:'utf8'});fs.writeFileSync('/demo.txt', data);
以上两个方法是把文件内容全部读入内存, 然后再写入文件, 但是如果文件过大就会出现问题了, 内存容易爆掉这里就需要用到流了, 一点点的读取或者写入
分类
Readable - 可读的流 (例如 fs.createReadStream()).
Writable - 可写的流 (例如 fs.createWriteStream()).
Duplex - 可读写的流 (例如 net.Socket).
Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).
Readable - 可读的流介绍与实现
可读流分为两种模式: flowing 和 paused, 并且两种模式可以相互转换
1. 在 flowing 模式下, 可读流自动从系统底层读取数据, 并通过 EventEmitter 接口的事件尽快将数据提供给应用
2. 在 paused 模式下, 必须显式调用 stream.read() 方法来从流中读取数据片段
3. 所有初始工作模式为 paused 的 Readable 流, 可以通过下面三种途径切换到 flowing 模式:
监听'data' 事件
调用 stream.resume() 方法
调用 stream.pipe() 方法将数据发送到 Writable
4. 可读流可以通过下面途径切换到 paused 模式:
如果不存在管道目标 (pipe destination), 可以通过调用 stream.pause() 方法实现
如果存在管道目标, 可以通过取消'data' 事件监听, 并调用 stream.unpipe() 方法移除所有管道目标来实现
5. 为了便于理解, 这里分开写两种模式, 下面为 flowing 模式基本实现
流程图:
定义类首先要继承自 EventEmitter, 因为要发射监听事件, 在构造函数中依次定义各个参数, 其中需要说明的是 this.flowing 用于切换模式, this.buffer 并非是缓存, 因为流动模式是默认不走缓存的, 这个 buffer 是读取的时候 fs.read 的一个参数, 所有的事件监听到都会执行 newListener 当该类被构造的时候就打开文件, 并监听事件, 如果是 data, 默认走流动模式开始读取
- class ReadStream extends EventEmitter {
- constructor(path, options) {
- super(path, options);
- this.path = path; // 写入路径
- this.flags = options.flags || 'r'; // 操作修饰符
- this.mode = options.mode || 0o666; // 权限
- this.autoClose = options.autoClose; // 是否自动关闭
- this.highWaterMark = options.highWaterMark || 64 * 1024; // 默认 64k
- this.pos = this.start = options.start || 0; // 起始位置
- this.end = options.end; // 结束位置
- this.encoding = options.encoding; // 编码
- this.flowing = null; // 流动模式
- this.buffer = Buffer.alloc(this.highWaterMark); // 读取的 buffer 不是缓存
- this.open();
- this.on('newListener', (type, listener) = >{
- if (type == 'data') {
- this.flowing = true;
- this.read();
- }
- })
- }
open 方法 打开传入路径的文件 如果出错并且设置自动关闭属性, 直接关闭打开, 发射 error 事件如果成功了, 发射 open 事件, 供读取方法接收
- open() {
- fs.open(this.path, this.flags, this.mode, (err, fd) = >{
- if (err) {
- if (this.autoClose) {
- this.destroy();
- return this.emit('error', err);
- }
- }
- this.fd = fd;
- this.emit('open');
- })
- }
最重要的方法 Read, 一进入就需要判断文件是否已经打开了, 因为文件打开是异步的一个过程, 此时可能并未打开, 如果没有打开就监听发射的 open 事件, 然后在回调函数中进行 read 方法的调用其中核心是每次需要读多少, 这个量由传入的开始结束位置已经最高水位线决定的最高水位线代表依次最多读取多少, 默认值是 64kb 那么每次读取的值 howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;
- read() {
- if (typeof this.fd != 'number') {
- return this.once('open', () = >this.read());
- }
- let howMuchToRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
- fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, bytes) = >{ //bytes 是实际读到的字节数
- if (err) {
- if (this.autoClose) this.destroy();
- return this.emit('error', err);
- }
- if (bytes) {
- let data = this.buffer.slice(0, bytes);
- this.pos += bytes;
- data = this.encoding ? data.toString(this.encoding) : data;
- this.emit('data', data);
- if (this.end && this.pos > this.end) {
- return this.endFn();
- } else {
- if (this.flowing) this.read();
- }
- } else {
- return this.endFn();
- }
- })
- }
pipe 方法实现 pipe 方法就是边读取边写入, 控制读写速度, 当可写流的写入返回 false 时, 暂停读取, 当可写流触发 drain 事件后, 恢复读取
- pipe(ws) {
- this.on('data', data = >{
- let flag = ws.write(data);
- if (!flag) {
- this.pause();
- }
- }) ws.on('drain', () = >{
- this.resume();
- })
- }
- pause() {
- this.flowing = false;
- }
- resume() {
- this.flowing = true;
- this.read();
- }
暂停模式 暂停模式不同的是需要走缓存, 并且监听的是 readable 事件 (这里我只贴出具有差异性的代码) 流程图
在构造函数中需要加入 this.buffers = [];(源码中为了提高效率, 使用的是链表结构, 这里我用数组代替), 以及 readable 事件的监听
- this.on('newListener', (type) = >{
- if (type == 'data') {
- this.flowing = true;
- this.read();
- }
- if (type == 'readable') {
- this.read(0);
- }
- });
这里 read 方法需要传入一个 n, 表示需要读取的字节数如果判断缓存的大小, 即 this.length, 如果 this.length == 0 || this.length < this.highWaterMark , 执行_read() 方法, 此时执行的 n 为 0
- let _read = () = >{
- let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
- fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) = >{
- if (err) {
- return
- }
- let data;
- if (bytesRead > 0) {
- data = this.buffer.slice(0, bytesRead);
- this.pos += bytesRead;
- this.length += bytesRead;
- if (this.end && this.pos > this.end) {
- if (this.needReadable) {
- this.emit('readable');
- }
- this.emit('end');
- } else {
- this.buffers.push(data);
- if (this.needReadable) {
- this.emit('readable');
- this.needReadable = false;
- }
- }
- } else {
- if (this.needReadable) {
- this.emit('readable');
- }
- return this.emit('end');
- }
- })
- }
如果传入的 n 值 0 < n < this.length, 走以下逻辑, 即从缓存区中读取相应的字节数进行读取
- if (0 < n < this.length) {
- ret = Buffer.alloc(n);
- let b;
- let index = 0;
- while (null != (b = this.buffers.shift())) {
- for (let i = 0; i < b.length; i++) {
- ret[index++] = b[i];
- if (index == ret.length) {
- this.length -= n;
- b = b.slice(i + 1);
- this.buffers.unshift(b);
- break;
- }
- }
- }
- if (this.encoding) ret = ret.toString(this.encoding);
- }
Writable - 可读的流介绍与实现
流程图
构造函数跟上面差距不大, 有一个 this.buffer 的缓存区, 并且最高水位线默认为 16k
- // 构造函数
- constructor(path, options) {
- super(path, options);
- this.path = path; // 写入路径
- this.flags = options.flags || 'w'; // 操作修饰符
- this.mode = options.mode || 0o666; // 权限
- this.start = options.start || 0; // 写入的起始位置
- this.pos = this.start; // 文件的写入索引
- this.encoding = options.encoding || 'utf8'; // 编码
- this.autoClose = options.autoClose; // 自动关闭
- this.highWaterMark = options.highWaterMark || 16 * 1024; // 默认最高水位线 16k
- this.buffers = []; // 缓存区 源码里面是链表
- this.writing = false; // 标识内部正在写入数据
- this.length = 0; // 标识缓存区字节的长度
- this.open(); // 默认一创建就打开文件
- }
open 即打开文件是一样的, 这里不再描述最重要的 write 方法, 该方法有一个返回值, 标识缓存区的长度是否超过了最高水位线, 特别注意的是如果超过了也会写入进去的, 因为会放入到缓存区中, 等到 drain 事件触发, 再接着写入, 具体写入是执行的_write 方法, 其中有一个 this.writing 标识是否正在写入, 如果正在写入, 则放入缓存区中, 在清空缓存区的时候依次取出写入, 即以下的 clearBuffer 方法, 此方法中当缓存区清空了以后触发 drain 事件, 这里需要特殊说明一下, 如果缓存区从未满过, 是不会触发这个事件的
- write(chunk, encoding, callback) {
- chunk = Buffer.isBuffer(chunk) ? chunk: Buffer.from(chunk, this.encoding); // 此方法只吸入 buffer 或者字符串
- this.length += chunk.length; // 当前缓存区的长度
- if (this.writing) { // 如果正在写入数据 则需要把数据放入缓存区里面
- this.buffers.push({
- chunk,
- encoding,
- callback
- })
- } else { // 如果当前空闲 直接调用底层写入的方法进行写入 并且在写完以后 清空缓存区
- this.writing = true;
- this._write(chunk, encoding, () = >{
- callback && callback();
- this.clearBuffer();
- })
- }
- //write 方法有一个返回值 表示当前缓存区是否超过了最高水位线 即是否能继续写入
- return this.length < this.highWaterMark;
- }
- _write(chunk, encoding, callback) {
- if (typeof this.fd != 'number') { // 因为是异步的 文件可能在这个时候并未打开
- return this.once('open', () = >this._write(chunk, encoding, callback));
- }
- fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) = >{
- if (err) {
- if (this.autoClose) {
- this.destory();
- }
- return this.emit('error', err);
- }
- this.pos += bytesWritten;
- this.length -= bytesWritten;
- callback && callback();
- })
- }
- clearBuffer() {
- let data = this.buffers.shift();
- if (data) {
- this._write(data.chunk, data.encoding, () = >{
- data.callback && data.callback();
- this.clearBuffer();
- })
- } else {
- this.writing = false;
- // 缓存区清空了 缓存区如果没有满过 是不会触发这个事件的
- this.emit('drain');
- }
- }
参考链接
Node.js API 文档
深入理解 Node Stream 内部机制
来源: https://juejin.im/post/5a7278dc6fb9a01c9c1f59e7