[关闭]
@zhangning16 2018-01-26T03:07:07.000000Z 字数 5061 阅读 298

Node.js Stream模块学习笔记

node


如何实现一个简单的copy方法

文件的拷贝是一个很常见的需求,可是node的fs模块没有提供copy方法,但利用node提供的其他api可以很简单的实现一个copy方法。比如:

  1. const fs = require('fs')
  2. const file = fs.readFileSync('./init.txt', {encoding: 'utf8'})
  3. fs.writeFileSync('./init_copy.txt', file)

这是用同步的方式先读取文件,拿到读取到数据后,再写入进一个新的文件,也就实现了一个复制功能。
但是这样写是问题,如果如果文件过大,比如读取一个1个G视频文件,等到读取完毕再进行写入操作,将占用系统非常大的内存,而且接收端的等待时间也较长,显然是不合理的。这时候就需要引入流的概念,一边流出,一边流入。

什么是流

先来看看node.js官方文档的解释:

流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。
Node.js 提供了多种流对象。 例如, HTTP 请求 和 process.stdout 就都是流的实例。
流可以是可读的、可写的,或是可读写的。所有的流都是 EventEmitter 的实例。

Node.js 中有四种基本的流类型:

Readable - 可读的流 (例如 fs.createReadStream()).
Writable - 可写的流 (例如 fs.createWriteStream()).
Duplex - 可读写的流 (例如 net.Socket).
Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).

这篇笔记只涉及到Readable和Writable,流强调的一边流入,一边流出,也就是一个产生数据,一个消费数据。把文件a复制到文件b,那么a就是提供可读的流,b就提供可写的流。

可读流API总览

可读流是对数据源的一种抽象,是数据生产商。可读流具体又分为两种状态,流动状态(flowing)和暂停状态(paused)。这两种状态是可以互相转换的,当可读流在初次创建的时候默认是处于暂停态的。我们需要手动创建提供消费数据的机制,可读流才会主动提供数据。如果消费数据的机制不存在的话,生产数据将会变得没有意义,这时候可读流也就主动停止生产数据。
从暂停状态到流动状态有三种方法
1. 给可读流添加‘data’事件监听函数。
2. 调用 writeStream.resume() 方法。
3. 调用 writeStream.pipe() 方法将数据发送到 Writable。
从流动状态到暂停状态有两种方法
1. 如果不存在管道目标(pipe destination),可以通过调用 writeStream.pause() 方法实现。
2. 如果存在管道目标,可以通过取消 'data' 事件监听,并调用 writeStream.unpipe() 方法移除所有管道目标来实现。

可读流常用的事件

  1. 'data' 事件:'data' 事件会在流将数据传递给消费者时触发。当流转换到流动 模式时会触发该事件。调用 readable.pipe(), readable.resume() 方法,或为 'data' 事件添加回调可以将流转换到流动模式。 'data' 事件也会在调用 readable.read() 方法并有数据返回时触发。
  2. 'end' 事件:'end' 事件将在流中再没有数据可供消费时触发。注意: 'end' 事件只有在数据被完全消费后 才会触发 。
  3. 'error' 事件:'error' 事件可以在任何时候在可读流实现上触发。 通常,这会在底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。
  4. 'readable' 事件:'readable' 事件将在流中有数据可供读取时触发。在某些情况下,为 'readable' 事件添加回调将会导致一些数据被读取到内部缓存中。当到达流数据尾部时, 'readable' 事件也会触发。触发顺序在 'end' 事件之前。事实上, 'readable' 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null。

可读流常用的方法

  1. readable.pause(),调整可读流的状态为暂停态,这时可读流不会进行数据的输出。
  2. readable.resume(),恢复可读流的状态为流动态,这时可读流继续向外输出数据。
  3. readable.pipe(destination[, options]),用管道的方式向外输出数据,我的理解是,调用pipe方法,可以在可读流与可写流直接加个一个水管,这样可读流输出多少,可写流就接收多少,不存在溢出的问题。数据的传输更加可靠。
  4. readable.read([size]),读取内部缓存区中的数据,可选参数是要读取的字节大小,如果不填,则读取缓存区的全部数据。如果内部缓存区没有数据了,则返回null。
  5. readable.setEncoding(encoding),设置可读流的编码方式
  6. readable.unpipe([destination]),与pipe方法相反,用来解除管道。

可写流API总览

可写流常用的事件

  1. 'drain' 事件:如果可写流进行写操作是,比如writeStream.write()方法返回 false,说明还没写完,等写完后,流将在适当的时机触发 'drain' 事件,这时才可以继续向流中写入数据。这个时候我们可以做一些修改可读流状态的事情。
  2. 'error' 事件:'error' 事件在写入数据出错或者使用管道出错时触发。事件发生时,回调函数仅会接收到一个 Error 参数。
  3. 'finish' 事件: 在调用了 writeStream.end() 方法,且缓冲区数据都已经传给写入完毕之后,'finish' 事件将被触发。
  4. 'pipe' 事件: 在可读流上调用 stream.pipe() 方法,并在目标流向 中添加当前可写流 ( writable ) 时,将会在可写流上触发 'pipe' 事件。
  5. 'unpipe' 事件: 在 Readable 上调用 stream.unpipe() 方法,从目标流向中移除当前 Writable 时,将会触发 'unpipe' 事件。

可写流常用的方法

  1. writable.write(chunk[, encoding][, callback]),可写流的写操作,三个参数依次是可写的数据,编码格式,回调函数。
  2. writable.setDefaultEncoding(encoding),设置编码方式。
  3. writable.end([chunk][, encoding][, callback])。主动结束写入操作,第一个参数可以使一段文本,也就是最后要写入的文本,第二个参数是编码格式,第三个参数是回调函数。

使用流重写copy方法

  1. // 引入node内置的fs文件处理模块
  2. const fs = require('fs');
  3. // 读取当前目录下的init.txt文件,并且返回的是一个可读流,用readStream代表这个可读流
  4. const readStream = fs.createReadStream('./init.txt');
  5. // 创建一个的init_copy.txt文件,并且返回的是一个可写流,用writeStream代表这个可写流
  6. const writeStream = fs.createWriteStream('./init_copy.txt');
  7. // 当系统缓存中有可读流有数据流出时,会不断的触发data事件
  8. readStream.on('data', (chunk) => {
  9. // 回调函数的第一个参数,是读取到的系统缓存的数据,可写流拿到这个数据,进行写操作
  10. writeStream.write(chunk);
  11. });
  12. // 当可读流不再进行有数据流出后,将触发end事件,通过监听这个事件,手动关闭写操作。
  13. readStream.on('end', (tunck) => {
  14. console.log('主动关闭写入流');
  15. writeStream.end();
  16. })

这时候我们拷贝大文件的时候,就合理了许多,每次读一点,写一点,然后根据,不会有大量的文件堆在系统缓存中,但是仔细想想也有问题。比如这部分代码:

  1. // 当系统缓存中有可读流有数据流出时,会不断的触发data事件
  2. readStream.on('data', (chunk) => {
  3. // 回调函数的第一个参数,是读取到的系统缓存的数据,可写流拿到这个数据,进行写操作
  4. writeStream.write(chunk);
  5. });

每当可读流输出数据时,都会触发‘data’事件,这段代码都会执行,但是并不能保证读取数据和写入数据的速度是完全一样的,如果可读流提供的数据的速度远远超过可写流写入的速度,那么很有可能数据读取不及时,导致数据的丢失。所以这段代码还需要继续优化。
可读流有两种状态,流动状态(flowing)和暂停状态(paused)。为什么需要这两个状态,回想一下上面的例子,如果我们可以等到每次写入的时候,判断是否写入完毕,如果写入完毕,就继续流入数据,如果没有写入完毕,就暂停流入数据,这样就可以避免由于写入不及时导致的数据丢失,类似开关水龙头。
那么我们对代码优化一下:

  1. const fs = require('fs');
  2. const readStream = fs.createReadStream('./init.txt');
  3. const writeStream = fs.createWriteStream('./init_copy.txt');
  4. readStream.on('data', (chunk) => {
  5. // writeStream.write()方法被调用后,会返回一个布尔值,根据这个布尔值,我们可以判断是否写入完毕,为true时,表示写入完毕,为false时,表示还没写入完毕,这时我们可以让可读流手动暂停,就能够避免数据丢失的问题。
  6. if (writeStream.write(chunk) === false) {
  7. readStream.pause();
  8. }
  9. });
  10. // drain事件触发时,说明可以继续向流中写入数据,我们手动调整可读流的状态为流动状态,继续提供数据
  11. writeStream.on('drain', function() {
  12. readStream.resume();
  13. });
  14. readStream.on('end', (tunck) => {
  15. console.log('主动关闭写入流');
  16. writeStream.end();
  17. })

这样就解决了读取和写入速度不一致导致的问题。但是我们需要不断的关注数据流的状态,需要手动操作数据流,比较繁琐。

pipe管道

node实现了管道机制,只需要一行代码就可以完美的实现文件的copy。pipe() 会自动管理数据流,自动监听‘data’和‘end’事件,自动控制水流速度我们不需要担心数据流的读取与写入速度。比如这样:

  1. const fs = require('fs');
  2. const readableStream = fs.createReadStream('./init.txt');
  3. const writableStream = fs.createWriteStream('./init_copy.txt');
  4. readableStream.pipe(writableStream);

无论哪一种流,都会使用.pipe()方法来实现输入和输出。pipe() 方法返回 目标流的引用,这样就可以对流进行链式地管道操作。比如下面这个文件解压操作的例子:

  1. const fs = require("fs");
  2. const zlib = require('zlib')
  3. // 压缩 README.md 文件为 README.md.gz
  4. fs.createReadStream('./README.md')
  5. .pipe(zlib.createGzip())
  6. .pipe(fs.createWriteStream('README.md.gz'))
  7. console.log("文件压缩完成")

参考文档

一. Node.js中文文档
二. Node.js Streams 基础
三. Node中的stream (流)
四. nodejs中流(stream)的理解
五. JavaScript 标准参考教程(alpha)by阮一峰

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注