nodejs流基类源码分析

2020-07-22 16:52:48 浏览数 (3)

流是对数据生产,消费的一种抽象,今天先分析一下流基类的实现

代码语言:javascript复制
const EE = require('events');
const util = require('util');
// 流的基类
function Stream() {
  EE.call(this);
}
// 继承事件订阅分发的能力
util.inherits(Stream, EE);

流的基类只提供了一个函数就是pipe。用于实现管道化。这个方法代码比较多,分开说。

1 处理数据事件

代码语言:javascript复制
 function ondata(chunk) {
    // 源流有数据到达,并且目的流可写
    if (dest.writable) {
      // 目的流过载并且源流实现了pause方法,那就暂停可读流的读取操作,等待目的流触发drain事件
      if (false === dest.write(chunk) && source.pause) {
        source.pause();
      }
    }
 }
  // 监听data事件,可读流有数据的时候,会触发data事件
  source.on('data', ondata);

  function ondrain() {
    // 目的流可写了,并且可读流可读,切换成自动读取模式
    if (source.readable && source.resume) {
      source.resume();
    }
  }
  // 监听drain事件,目的流可以消费数据了就会触发该事件
  dest.on('drain', ondrain);

这是管道化时流控实现的地方,主要是利用了write返回值和drain事件。

流关闭/结束处理

代码语言:javascript复制
 // 目的流不是标准输出或标准错误,并且end不等于false
  if (!dest._isStdio && (!options || options.end !== false)) {
    // 源流没有数据可读了,执行end回调,告诉目的流,没有数据可读了
    source.on('end', onend);
    // 源流关闭了,执行close回调
    source.on('close', onclose);
  }
  // 两个函数只会执行一次,也只会执行一个
  var didOnEnd = false;
  function onend() {
    if (didOnEnd) return;
    didOnEnd = true;
    // 执行目的流的end函数,说明写数据完毕
    dest.end();
  }

  function onclose() {
    if (didOnEnd) return;
    didOnEnd = true;
    // 销毁目的流
    if (typeof dest.destroy === 'function') dest.destroy();
  }

这里是处理源流结束和关闭后,通知目的流的逻辑。

错误处理和事件清除

代码语言:javascript复制
  // remove all the event listeners that were added.
  function cleanup() {
    source.removeListener('data', ondata);
    dest.removeListener('drain', ondrain);

    source.removeListener('end', onend);
    source.removeListener('close', onclose);

    source.removeListener('error', onerror);
    dest.removeListener('error', onerror);

    source.removeListener('end', cleanup);
    source.removeListener('close', cleanup);

    dest.removeListener('close', cleanup);
  }

  function onerror(er) {
    // 出错了,清除注册的事件,包括正在执行的onerror函数
    cleanup();
    // 如果用户没有监听流的error事件,则抛出错误,所以我们业务代码需要监听error事件
    if (EE.listenerCount(this, 'error') === 0) {
      throw er; // Unhandled stream error in pipe.
    }
  }
  // 监听流的error事件
  source.on('error', onerror);
  dest.on('error', onerror);
  // 源流关闭或者没有数据可读时,清除注册的事件
  source.on('end', cleanup);
  source.on('close', cleanup);
  // 目的流关闭了也清除他注册的事件
  dest.on('close', cleanup);

这里主要是处理了error事件和流关闭/结束/出错时清除订阅的事件。这就是流基类的所有逻辑。

0 人点赞