流是对数据生产,消费的一种抽象,今天先分析一下流基类的实现
代码语言:javascript复制const EE = require('events');
const util = require('util');
// 流的基类
function Stream() {
EE.call(this);
}
// 继承事件订阅分发的能力
util.inherits(Stream, EE);
流的基类只提供了一个函数就是pipe。用于实现管道化。这个方法代码比较多,分开说。
1 处理数据事件
代码语言:javascript复制 function ondata(chunk) {
// 源流有数据到达,并且目的流可写
if (dest.writable) {
// 目的流过载并且源流实现了pause方法,那就暂停可读流的读取操作,等待目的流触发drain事件
if (false === dest.write(chunk) && source.pause) {
source.pause();
}
}
}
// 监听data事件,可读流有数据的时候,会触发data事件
source.on('data', ondata);
function ondrain() {
// 目的流可写了,并且可读流可读,切换成自动读取模式
if (source.readable && source.resume) {
source.resume();
}
}
// 监听drain事件,目的流可以消费数据了就会触发该事件
dest.on('drain', ondrain);
这是管道化时流控实现的地方,主要是利用了write返回值和drain事件。
流关闭/结束处理
代码语言:javascript复制 // 目的流不是标准输出或标准错误,并且end不等于false
if (!dest._isStdio && (!options || options.end !== false)) {
// 源流没有数据可读了,执行end回调,告诉目的流,没有数据可读了
source.on('end', onend);
// 源流关闭了,执行close回调
source.on('close', onclose);
}
// 两个函数只会执行一次,也只会执行一个
var didOnEnd = false;
function onend() {
if (didOnEnd) return;
didOnEnd = true;
// 执行目的流的end函数,说明写数据完毕
dest.end();
}
function onclose() {
if (didOnEnd) return;
didOnEnd = true;
// 销毁目的流
if (typeof dest.destroy === 'function') dest.destroy();
}
这里是处理源流结束和关闭后,通知目的流的逻辑。
错误处理和事件清除
代码语言:javascript复制 // remove all the event listeners that were added.
function cleanup() {
source.removeListener('data', ondata);
dest.removeListener('drain', ondrain);
source.removeListener('end', onend);
source.removeListener('close', onclose);
source.removeListener('error', onerror);
dest.removeListener('error', onerror);
source.removeListener('end', cleanup);
source.removeListener('close', cleanup);
dest.removeListener('close', cleanup);
}
function onerror(er) {
// 出错了,清除注册的事件,包括正在执行的onerror函数
cleanup();
// 如果用户没有监听流的error事件,则抛出错误,所以我们业务代码需要监听error事件
if (EE.listenerCount(this, 'error') === 0) {
throw er; // Unhandled stream error in pipe.
}
}
// 监听流的error事件
source.on('error', onerror);
dest.on('error', onerror);
// 源流关闭或者没有数据可读时,清除注册的事件
source.on('end', cleanup);
source.on('close', cleanup);
// 目的流关闭了也清除他注册的事件
dest.on('close', cleanup);
这里主要是处理了error事件和流关闭/结束/出错时清除订阅的事件。这就是流基类的所有逻辑。