Node.js 流编程

2024-01-21 19:51:21 浏览数 (1)

缓冲模式和流模式

  • 缓冲模式(buffer mode),在这种模式下系统会把某份资源传来的所有数据,都先收集到一个缓冲区里,直到操作完成为止。然后,系统把这些数据当成一个模块回传给调用方。比如 fs.writeFilefs.readFile 等;
  • 流模式(stream mode),在流模式下,系统会把自己从资源端收到的每一块新数据都立刻传给消费方,让后者有机会立刻处理该数据。

假如我们要读取一份特别庞大的文件,这份文件有好几个 GB 大小,这种情况下如果使用缓冲模式是相当糟糕的,而且 V8 引擎对缓冲区的尺寸是有限制的,你可能根本没办法分配一个高达好几 GB 的缓冲区,因此有可能还谈不到物理内存耗尽的问题,你在分配缓冲区的这个环节就已经被卡住了。

在 Node.js 中可以通过 buffer.constants.MAX_LENGTH 查看某套开发环境最多可支持多少字节的缓冲区。

缓冲模式压缩文件

代码语言:ts | buf-mode-gzip.ts复制
// main.js

import { gzip } from 'node:zlib';

import { promisify } from 'node:util';

import { promises as fs } from 'node:fs';



const gzipPromise = promisify(gzip);



const filename = process.argv[2];



async function main() {

    const data = await fs.readFile(filename);

    const gzippedData = await gzipPromise(data);

    await fs.writeFile(`${filename}.gz`, gzippedData);

    console.log('File successfully compressed');

}



main();

运行 node ./main.js ./test.txt 就可以把 test.txt 文件压缩成 .gz 格式的压缩包了。

流模式压缩文件

代码语言:ts | stream-mode-gzip.ts复制
// main.js

import { createGzip } from 'node:zlib';

import { createReadStream, createWriteStream } from 'node:fs';



const filename = process.argv[2];



createReadStream(filename)

    .pipe(createGzip())

    .pipe(createWriteStream(`${filename}.gz`))

    .on('finish', () => console.log('File successfully compressed'));

流对象结构

Node.js 平台里面每一种流对象,在类型上都属于下面这四个基本抽象类中的一个,这些类是由 stream 核心模块提供的:

  • Readable
  • Writable
  • Duplex
  • Transform

每个 stream 类的对象,本身也都是一个 EventEmmiter 实例,所有流对象实际上可以触发许多事件,比如:

  • Readable 流在读取完毕时会触发 end 事件;
  • Writable 流在写入完毕后会触发 finish 事件;
  • 如果操作过程中发生错误,则会触发 error 事件;

流不仅可以处理二进制数据,而且几乎能处理任何一种 JavaScript 值。流对象的操作模式可以分成两种:

  • 二进制模式(Binary mode):以 chunk 的形式串流数据,这种模式可以用来处理缓冲或者字符串;
  • 对象模式(Object mode):以对象序列的形式串流数据(这意味着我们几乎能处理任何一种 JavaScript 值),因此可以像函数式编程那样,把各种处理环节分别表示成相应的流对象,并把这些对象组合起来(比如 Rxjs 这个库);

Readable 流(可读流)

要通过 Readable 流来读取数据,有两种办法可以考虑:非流动模式(non-flowing),也叫暂停模式,另一种是流动模式(flowing)。

非流动模式

下面代码实现了一款简单的程序,把标准输入端(这也是一种 Readable 流)的内容读取进来,并将读到的东西回显到标准输出端。

代码语言:ts | read-stdin.ts复制
process.stdin.on('readable', () => {

    let chunk: Buffer | null;

    console.log('New data available');

    while((chunk = process.stdin.read()) !== null) {

        // 回显

        console.log(

            `Chunk read(${chunk.length} bytes: "${chunk.toString()}")`

        );

    }

}).on('end', () => {

    // Windows 上是 Ctrl Z,Linux和Mac上是 Ctrl D

    console.log('End of stream');

});

readable 一旦发生(按下 Enter 键),就说明有新的数据可以读取了。

process.stdin.read() 方法是一项同步操作,会从 Readable 流内部缓冲区里面提取一块数据,这种模式下让我们可以根据需要,从流对象里面提取数据。

流动模式

流动模式下,我们不通过 read() 方法提取数据,而是等着流对象把数据推送到 data 监听器里面,只要流对象拿到数据,它就会推过来。上面的代码改为流动模式,就应该这么写:

代码语言:ts | read-stdin.ts复制
process.stdin.on('data', (chunk) => {

    console.log('New data available');

    console.log(

        `Chunk read(${chunk.length} bytes: "${chunk.toString()}")`

    );

})

.on('end', () => {

    console.log('End of stream');

});

实现自己的 Readable 流

自己定制新的 Readable 流,首先必须从 stream 模块里面继承 Readable 原型,然后还必须在自己的这个具体类之中,给 _read([size]) 方法提供实现代码,而这个方法内部又必须 readable.push(chunk) 这种操作向缓冲区里面填入数据。

_read() 方法和 read() 方法不通,后者是给流对象的消费方使用的,而 _read() 方法是我们在定制 stream 子类时必须自己实现的一个方法。一旦流准备好接受更多数据,则 _read() 将在每次调用 this.push(dataChunk) 后再次调用。 _read() 可能会继续从资源中读取并推送数据,直到 readable.push() 返回 false。

比如下面代码,可以生成随机字符串流对象:

代码语言:ts | random-stream.ts复制
import { Readable, ReadableOptions } from 'node:stream';

import Chance from 'chance';



const chance = new Chance();



export class RandomStream extends Readable {

    emittedBytes = 0;

    constructor(options?: ReadableOptions) {

        super(options); // 继承

    }



    _read(size: number): void {

        // 生成长度为 size 的随机字符串

        const chunk = chance.string({ length: size });

        // 推入内部缓冲区

        this.push(chunk, 'utf8');

        this.emittedBytes  = chunk.length;

        // 百分之 5 的概率返回 true,并推入 null

        // 这样会给内部缓冲区推入 `EOF`(文件结束),表示这条数据就此结束

        if (chance.bool({ likelihood: 5 })) {

            this.push(null);

        }

    }

}
使用 RandomStream
代码语言:ts | random-stream.ts复制
const randomStream = new RandomStream({

    highWaterMark: 10

});

randomStream.on('data', (chunk) => {

    console.log(`Chunk received (${chunk.length}) bytes: ${chunk.toString()}`);

}).on('end', () => {

    console.log(`Produced (${randomStream.emittedBytes}) bytes of radom data`);

});

_read() 函数中接收一个 size 数字类型的参数,它是一个建议参数,意思是说,你最好尊重这个参数,只推入调用方所请求的这么多字节(即 highWaterMark 配置项),当然这只是一个建议,不是强迫你必须这么做。

ReadableOptions 接收的 options 参数可能会有这样一些属性:

  • encoding: 表示流对象按照什么样的编码标准,把缓冲区的数据转化成字符串,它的默认值是 null
  • objectMode: 这个属性是个标志,用来表示对象模式是否启用,它的默认值是 false
  • highWaterMark: 这个属性表示内部缓冲区的数据上限,如果数据所占的字节数已经达到该上限,那么这个流对象就不应该再从数据源之中读取数据了,它的默认值是 16KB

简化版定制方案

如果定制的流对象比较简单,可以不用专门编写一个类,而是采用简化版的写法来制作 Readable 流。这种写法只需要调用 new Readable(options),并把一个包含 read() 方法的对象传给 options 参数即可。

代码语言:ts | random-stream.ts复制
let emittedBytes = 0;

const randomStream = new Readable({

    highWaterMark: 10,

    read(size: number): void {

        const chunk = chance.string({ length: size });

        this.push(chunk, 'utf8');

        emittedBytes  = chunk.length;

        if (chance.bool({ likelihood: 5 })) {

            this.push(null);

        }

    }

});



randomStream.on('data', (chunk) => {

    console.log(`Chunk received (${chunk.length}) bytes: ${chunk.toString()}`);

}).on('end', () => {

    console.log(`Produced (${emittedBytes}) bytes of radom data`);

});

Readable.from

// main.js

有个叫做 Readable.from() 的辅助函数,让你能够把数组或者生成器、迭代器以及异步迭代器这样的 iterable 对象当做数据源,轻松构建 Readable 流。

代码语言:ts | exercise.ts复制
import { Readable } from 'node:stream';



const arrStream = Readable.from(['a', 'b', 'c', 'd', 'e', 'f', 'g']);



arrStream.on('data', (char: string) => {

    console.log("


	

0 人点赞