缓冲模式和流模式
- 缓冲模式(buffer mode),在这种模式下系统会把某份资源传来的所有数据,都先收集到一个缓冲区里,直到操作完成为止。然后,系统把这些数据当成一个模块回传给调用方。比如
fs.writeFile
、fs.readFile
等; - 流模式(stream mode),在流模式下,系统会把自己从资源端收到的每一块新数据都立刻传给消费方,让后者有机会立刻处理该数据。
假如我们要读取一份特别庞大的文件,这份文件有好几个 GB 大小,这种情况下如果使用缓冲模式是相当糟糕的,而且 V8 引擎对缓冲区的尺寸是有限制的,你可能根本没办法分配一个高达好几 GB 的缓冲区,因此有可能还谈不到物理内存耗尽的问题,你在分配缓冲区的这个环节就已经被卡住了。
在 Node.js 中可以通过
buffer.constants.MAX_LENGTH
查看某套开发环境最多可支持多少字节的缓冲区。
缓冲模式压缩文件
代码语言:ts | buf-mode-gzip.ts复制// main.js
import { gzip } from 'node:zlib';
import { promisify } from 'node:util';
import { promises as fs } from 'node:fs';
const gzipPromise = promisify(gzip);
const filename = process.argv[2];
async function main() {
const data = await fs.readFile(filename);
const gzippedData = await gzipPromise(data);
await fs.writeFile(`${filename}.gz`, gzippedData);
console.log('File successfully compressed');
}
main();
运行 node ./main.js ./test.txt 就可以把 test.txt 文件压缩成 .gz 格式的压缩包了。
流模式压缩文件
代码语言:ts | stream-mode-gzip.ts复制// main.js
import { createGzip } from 'node:zlib';
import { createReadStream, createWriteStream } from 'node:fs';
const filename = process.argv[2];
createReadStream(filename)
.pipe(createGzip())
.pipe(createWriteStream(`${filename}.gz`))
.on('finish', () => console.log('File successfully compressed'));
流对象结构
在 Node.js
平台里面每一种流对象,在类型上都属于下面这四个基本抽象类中的一个,这些类是由 stream
核心模块提供的:
Readable
Writable
Duplex
Transform
每个 stream
类的对象,本身也都是一个 EventEmmiter
实例,所有流对象实际上可以触发许多事件,比如:
Readable
流在读取完毕时会触发end
事件;Writable
流在写入完毕后会触发finish
事件;- 如果操作过程中发生错误,则会触发
error
事件;
流不仅可以处理二进制数据,而且几乎能处理任何一种 JavaScript 值。流对象的操作模式可以分成两种:
- 二进制模式(Binary mode):以 chunk 的形式串流数据,这种模式可以用来处理缓冲或者字符串;
- 对象模式(Object mode):以对象序列的形式串流数据(这意味着我们几乎能处理任何一种 JavaScript 值),因此可以像函数式编程那样,把各种处理环节分别表示成相应的流对象,并把这些对象组合起来(比如
Rxjs
这个库);
Readable 流(可读流)
要通过 Readable 流来读取数据,有两种办法可以考虑:非流动模式
(non-flowing),也叫暂停模式,另一种是流动模式
(flowing)。
非流动模式
下面代码实现了一款简单的程序,把标准输入端(这也是一种 Readable 流)的内容读取进来,并将读到的东西回显到标准输出端。
代码语言:ts | read-stdin.ts复制process.stdin.on('readable', () => {
let chunk: Buffer | null;
console.log('New data available');
while((chunk = process.stdin.read()) !== null) {
// 回显
console.log(
`Chunk read(${chunk.length} bytes: "${chunk.toString()}")`
);
}
}).on('end', () => {
// Windows 上是 Ctrl Z,Linux和Mac上是 Ctrl D
console.log('End of stream');
});
readable
一旦发生(按下 Enter 键),就说明有新的数据可以读取了。
process.stdin.read()
方法是一项同步操作,会从 Readable 流内部缓冲区里面提取一块数据,这种模式下让我们可以根据需要,从流对象里面提取数据。
流动模式
流动模式下,我们不通过 read()
方法提取数据,而是等着流对象把数据推送到 data
监听器里面,只要流对象拿到数据,它就会推过来。上面的代码改为流动模式,就应该这么写:
process.stdin.on('data', (chunk) => {
console.log('New data available');
console.log(
`Chunk read(${chunk.length} bytes: "${chunk.toString()}")`
);
})
.on('end', () => {
console.log('End of stream');
});
实现自己的 Readable 流
自己定制新的 Readable 流,首先必须从 stream 模块里面继承 Readable 原型,然后还必须在自己的这个具体类之中,给 _read([size])
方法提供实现代码,而这个方法内部又必须 readable.push(chunk)
这种操作向缓冲区里面填入数据。
_read()
方法和read()
方法不通,后者是给流对象的消费方使用的,而_read()
方法是我们在定制 stream 子类时必须自己实现的一个方法。一旦流准备好接受更多数据,则 _read() 将在每次调用 this.push(dataChunk) 后再次调用。 _read() 可能会继续从资源中读取并推送数据,直到 readable.push() 返回 false。
比如下面代码,可以生成随机字符串流对象:
代码语言:ts | random-stream.ts复制import { Readable, ReadableOptions } from 'node:stream';
import Chance from 'chance';
const chance = new Chance();
export class RandomStream extends Readable {
emittedBytes = 0;
constructor(options?: ReadableOptions) {
super(options); // 继承
}
_read(size: number): void {
// 生成长度为 size 的随机字符串
const chunk = chance.string({ length: size });
// 推入内部缓冲区
this.push(chunk, 'utf8');
this.emittedBytes = chunk.length;
// 百分之 5 的概率返回 true,并推入 null
// 这样会给内部缓冲区推入 `EOF`(文件结束),表示这条数据就此结束
if (chance.bool({ likelihood: 5 })) {
this.push(null);
}
}
}
使用 RandomStream
代码语言:ts | random-stream.ts复制const randomStream = new RandomStream({
highWaterMark: 10
});
randomStream.on('data', (chunk) => {
console.log(`Chunk received (${chunk.length}) bytes: ${chunk.toString()}`);
}).on('end', () => {
console.log(`Produced (${randomStream.emittedBytes}) bytes of radom data`);
});
_read()
函数中接收一个size
数字类型的参数,它是一个建议参数,意思是说,你最好尊重这个参数,只推入调用方所请求的这么多字节(即highWaterMark
配置项),当然这只是一个建议,不是强迫你必须这么做。
ReadableOptions
接收的 options 参数可能会有这样一些属性:
encoding
: 表示流对象按照什么样的编码标准,把缓冲区的数据转化成字符串,它的默认值是null
;objectMode
: 这个属性是个标志,用来表示对象模式是否启用,它的默认值是false
;highWaterMark
: 这个属性表示内部缓冲区的数据上限,如果数据所占的字节数已经达到该上限,那么这个流对象就不应该再从数据源之中读取数据了,它的默认值是16KB
;
简化版定制方案
如果定制的流对象比较简单,可以不用专门编写一个类,而是采用简化版的写法来制作 Readable 流。这种写法只需要调用 new Readable(options)
,并把一个包含 read()
方法的对象传给 options 参数即可。
let emittedBytes = 0;
const randomStream = new Readable({
highWaterMark: 10,
read(size: number): void {
const chunk = chance.string({ length: size });
this.push(chunk, 'utf8');
emittedBytes = chunk.length;
if (chance.bool({ likelihood: 5 })) {
this.push(null);
}
}
});
randomStream.on('data', (chunk) => {
console.log(`Chunk received (${chunk.length}) bytes: ${chunk.toString()}`);
}).on('end', () => {
console.log(`Produced (${emittedBytes}) bytes of radom data`);
});
Readable.from
// main.js
有个叫做 Readable.from()
的辅助函数,让你能够把数组或者生成器、迭代器以及异步迭代器这样的 iterable
对象当做数据源,轻松构建 Readable 流。
import { Readable } from 'node:stream';
const arrStream = Readable.from(['a', 'b', 'c', 'd', 'e', 'f', 'g']);
arrStream.on('data', (char: string) => {
console.log("