可写流 (Writable Stream)
可写流是对数据写入'目的地'的一种抽象.
可写流的原理其实与可读流类似, 当数据过来的时候会写入缓存池, 当写入的速度很慢或者写入暂停时候, 数据流便会进入到队列池缓存起来, 当然即使缓存池满了, 剩余的数据也是存在内存
可写流的简单用法如下代码
- let fs = require('fs');
- let path = require('path');
- let ws = fs.createWriteStream(path.join(__dirname, '1.txt'), {
- highWaterMark: 3,
- autoClose: true,
- flags: 'w',
- encoding: 'utf8',
- mode: 0o666,
- start: 0,
- });
- let i = 9;
- function write() {
- let flag = true;
- while (i> 0 && flag) {
- flag = ws.write(--i + '','utf8', () =>{
- console.log('ok')
- });
- console.log(flag)
- }
- }
- write();
- // drain 只有当缓存区充满后 并且被消费后触发
- ws.on('drain',
- function() {
- console.log('抽干') write();
- });
实现原理
现在就让我们来实现一个简单的可写流, 来研究可写流的内部原理, 可写流有很多方法与可读流类似, 这里不在重复了首先要有一个构造函数来定义一些基本选项属性, 然后调用一个 open 放法打开文件, 并且有一个 destroy 方法来处理关闭逻辑
- let EventEmitter = require('events');
- let fs = require('fs');
- class WriteStream extends EventEmitter {
- constructor(path,options) {
- super();
- this.path = path;
- this.highWaterMark = options.highWaterMark || 16 * 1024;
- this.autoClose = options.autoClose || true;
- this.mode = options.mode;
- this.start = options.start || 0;
- this.flags = options.flags || 'w';
- this.encoding = options.encoding || 'utf8';
- // 可写流 要有一个缓存区, 当正在写入文件是, 内容要写入到缓存区中
- // 在源码中是一个链表 => []
- this.buffers = [];
- // 标识 是否正在写入
- this.writing = false;
- // 是否满足触发 drain 事件
- this.needDrain = false;
- // 记录写入的位置
- this.pos = 0;
- // 记录缓存区的大小
- this.length = 0;
- this.open();
- }
- destroy() {
- if (typeof this.fd !== 'number') {
- return this.emit('close');
- }
- fs.close(this.fd, () => {
- this.emit('close')
- });
- }
- open() {
- fs.open(this.path, this.flags, this.mode, (err,fd) => {
- if (err) {
- this.emit('error', err);
- if (this.autoClose) {
- this.destroy();
- }
- return;
- }
- this.fd = fd;
- this.emit('open');
- })
- }
- }
- module.exports = WriteStream;
接着我们实现 write 方法来让可写流对象调用, 在 write 方法中我们首先将数据转化为 buffer, 接着实现一些事件的触发条件的逻辑, 如果现在没有正在写入的话我们就要真正的进行写入操作了, 这里我们实现一个_write 方法来实现写入操作, 否则则代表文件正在写入, 那我们就将流传来的数据先放在缓存区中, 保证写入数据不会同时进行.
- write(chunk,encoding=this.encoding,callback=()=>{}){
- chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);
- // write 返回一个 boolean 类型
- this.length+=chunk.length;
- let ret = this.length<this.highWaterMark; // 比较是否达到了缓存区的大小
- this.needDrain = !ret; // 是否需要触发 needDrain
- // 判断是否正在写入 如果是正在写入 就写入到缓存区中
- if(this.writing){
- this.buffers.push({
- encoding,
- chunk,
- callback
- }); // []
- }else{
- // 专门用来将内容 写入到文件内
- this.writing = true;
- this._write(chunk,encoding,()=>{
- callback();
- this.clearBuffer();
- }); // 8
- }
- return ret;
- }
- _write(chunk,encoding,callback){
- if(typeof this.fd !== 'number'){
- return this.once('open',()=>this._write(chunk,encoding,callback));
- }
- fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
- this.length -= byteWritten;
- this.pos += byteWritten;
- callback(); // 清空缓存区的内容
- });
- }
_write 写入之后的回调中我们会调用传入回调函数 clearBuffer, 这个方法会去 buffers 中继续递归地把数据取出, 然后继续调用_write 方法去写入, 直到全部 buffer 中的数据取出后, 这样就清空了 buffers.
- clearBuffer(){
- let buffer = this.buffers.shift();
- if(buffer){
- this._write(buffer.chunk,buffer.encoding,()=>{
- buffer.callback();
- this.clearBuffer()
- });
- }else{
- this.writing = false;
- if(this.needDrain){ // 是否需要触发 drain 需要就发射 drain 事件
- this.needDrain = false;
- this.emit('drain');
- }
- }
- }
最后附上完整的代码
- let EventEmitter = require('events');
- let fs = require('fs');
- class WriteStream extends EventEmitter{
- constructor(path,options){
- super();
- this.path = path;
- this.highWaterMark = options.highWaterMark||16*1024;
- this.autoClose = options.autoClose||true;
- this.mode = options.mode;
- this.start = options.start||0;
- this.flags = options.flags||'w';
- this.encoding = options.encoding || 'utf8';
- // 可写流 要有一个缓存区, 当正在写入文件是, 内容要写入到缓存区中
- // 在源码中是一个链表 => []
- this.buffers = [];
- // 标识 是否正在写入
- this.writing = false;
- // 是否满足触发 drain 事件
- this.needDrain = false;
- // 记录写入的位置
- this.pos = 0;
- // 记录缓存区的大小
- this.length = 0;
- this.open();
- }
- destroy(){
- if(typeof this.fd !=='number'){
- return this.emit('close');
- }
- fs.close(this.fd,()=>{
- this.emit('close')
- })
- }
- open(){
- fs.open(this.path,this.flags,this.mode,(err,fd)=>{
- if(err){
- this.emit('error',err);
- if(this.autoClose){
- this.destroy();
- }
- return
- }
- this.fd = fd;
- this.emit('open');
- })
- }
- write(chunk,encoding=this.encoding,callback=()=>{}){
- chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);
- // write 返回一个 boolean 类型
- this.length+=chunk.length;
- let ret = this.length<this.highWaterMark; // 比较是否达到了缓存区的大小
- this.needDrain = !ret; // 是否需要触发 needDrain
- // 判断是否正在写入 如果是正在写入 就写入到缓存区中
- if(this.writing){
- this.buffers.push({
- encoding,
- chunk,
- callback
- }); // []
- }else{
- // 专门用来将内容 写入到文件内
- this.writing = true;
- this._write(chunk,encoding,()=>{
- callback();
- this.clearBuffer();
- }); // 8
- }
- return ret;
- }
- clearBuffer(){
- let buffer = this.buffers.shift();
- if(buffer){
- this._write(buffer.chunk,buffer.encoding,()=>{
- buffer.callback();
- this.clearBuffer()
- });
- }else{
- this.writing = false;
- if(this.needDrain){ // 是否需要触发 drain 需要就发射 drain 事件
- this.needDrain = false;
- this.emit('drain');
- }
- }
- }
- _write(chunk,encoding,callback){
- if(typeof this.fd !== 'number'){
- return this.once('open',()=>this._write(chunk,encoding,callback));
- }
- fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
- this.length -= byteWritten;
- this.pos += byteWritten;
- callback(); // 清空缓存区的内容
- });
- }
- }
- module.exports = WriteStream;
Pipe 管道流
前面我们了解了可读流与可写流, 那么怎么让二者结合起来使用呢, node 给我们提供好了方法 --Pipe 管道, 流顾名思义, 就是在可读流与可写流中间加入一个管道, 实现一边读取, 一边写入, 读一点写一点.
Pipe 的使用方法如下
- let fs = require('fs');
- let path = require('path');
- let ReadStream = require('./ReadStream');
- let WriteStream = require('./WriteStream');
- let rs = new ReadStream(path.join(__dirname, './1.txt'), {
- highWaterMark: 4
- });
- let ws = new WriteStream(path.join(__dirname, './2.txt'), {
- highWaterMark: 1
- });
- // 4 1
- rs.pipe(ws);
实现原理
Pipe 的原理比较简单, 简单说监听可读流的 data 事件来持续获取文件中的数据, 然后我们就会去调用写流的 write 方法. 如果可写流缓存区已满, 那么当我们得到调用可读流的 pause 方法来暂停读取, 然后等到写流的缓存区已经全部写入并且触发 drain 事件时, 我们就会调用 resume 重新开启读取的流程. 上代码
- pipe(ws) {
- this.on('data', (chunk) => {
- let flag = ws.write(chunk);
- if (!flag) {
- this.pause();
- }
- });
- ws.on('drain', () => {
- this.resume();
- })
- }
自定义流
Node 允许我们自定义流, 读流继承于 Readable 接口, 写流则继承于 Writable 接口, 所以我们其实是可以自定义一个流模块, 只要继承 stream 模块对应的接口即可.
自定义可读流
如果我们要自定义读流的话, 那我们就需要继承 Readable,Readable 里面有一个 read() 方法, 默认调用_read(), 所以我们只要复写了_read() 方法就可实现读取的逻辑, 同时 Readable 中也提供了一个 push 方法, 调用 push 方法就会触发 data 事件, push 中的参数就是 data 事件回调函数的参数, 当 push 传入的参数为 null 的时候就代表读流停止, 上代码
- let { Readable } = require('stream');
- // 想实现什么流 就继承这个流
- // Readable 里面有一个 read() 方法, 默认掉_read()
- // Readable 中提供了一个 push 方法你调用 push 方法就会触发 data 事件
- let index = 9;
- class MyRead extends Readable {
- _read() {
- // 可读流什么时候停止呢? 当 push null 的时候停止
- if (index--> 0) return this.push('123');
- this.push(null);
- }
- }
- let mr = new MyRead();
- mr.on('data', function(data) {
- console.log(data);
- });
自定义可写流
与自定义读流类似, 自定义写流需要继承 Writable 接口, 并且实现一个_write() 方法, 这里注意的是_write 中可以传入 3 个参数, chunk, encoding, callback,chunk 就是代表写入的数据, 通常是一个 buffer,encoding 是编码类型, 通常不会用到, 最后的 callback 要注意, 它并不是我们用这个自定义写流调用 write 时的回调, 而是我们上面讲到写流实现时的 clearBuffer 函数.
- let {
- Writable
- } = require('stream');
- // 可写流实现_write 方法
- // 源码中默认调用的是 Writable 中的 write 方法
- class MyWrite extends Writable {
- _write(chunk, encoding, callback) {
- console.log(chunk.toString());
- callback(); // clearBuffer
- }
- }
- let mw = new MyWrite();
- mw.write('111', 'utf8', () =>{
- console.log(1);
- }) mw.write('222', 'utf8', () =>{
- console.log(1);
- });
Duplex 双工流
双工流其实就是结合了上面我们说的自定义读流和自定义写流, 它既能读也能写, 同时可以做到读写之间互不干扰
- let { Duplex } = require('stream');
- // 双工流 又能读 又能写, 而且读取可以没关系 (互不干扰)
- let d = Duplex({
- read() {
- this.push('hello');
- this.push(null);
- },
- write(chunk, encoding, callback) {
- console.log(chunk);
- callback();
- }
- });
- d.on('data', function(data) {
- console.log(data);
- });
- d.write('hello');
Transform 转换流
转换流的本质就是双工流, 唯一不同的是它并不需要像上面提到的双工流一样实现 read 和 write, 它只需要实现一个 transform 方法用于转换
- let { Transform } = require('stream');
- // 它的参数和可写流一样
- let tranform1 = Transform({
- transform(chunk, encoding, callback) {
- this.push(chunk.toString().toUpperCase()); // 将输入的内容放入到可读流中
- callback();
- }
- });
- let tranform2 = Transform({
- transform(chunk, encoding, callback){
- console.log(chunk.toString());
- callback();
- }
- });
- // 等待你的输入
- // rs.pipe(ws);
- // 希望将输入的内容转化成大写在输出出来
- process.stdin.pipe(tranform1).pipe(tranform2);
- // 对象流 可读流里只能放 buffer 或者字符串 对象流里可以放对象
对象流
默认情况下, 流处理的数据是 Buffer/String 类型的值. 对象流的特点就是它有一个 objectMode 标志, 我们可以设置它让流可以接受任何 JavaScript 对象. 上代码
- const {
- Transform
- } = require('stream');
- let fs = require('fs');
- let rs = fs.createReadStream('./users.json');
- rs.setEncoding('utf8');
- let toJson = Transform({
- readableObjectMode: true,
- transform(chunk, encoding, callback) {
- this.push(JSON.parse(chunk));
- callback();
- }
- });
- let jsonOut = Transform({
- writableObjectMode: true,
- transform(chunk, encoding, callback) {
- console.log(chunk);
- callback();
- }
- });
- rs.pipe(toJson).pipe(jsonOut);
来源: https://juejin.im/post/5ac238cc6fb9a028b4113bdc