从 koa-body 入手分析,搞懂 Node.js 文件上传流程

2022-03-10 17:38:59 浏览数 (2)

故事背景

某天,团队的新来的很爱问问题的小伙伴突然问我:怎么在Koa服务中处理接收上传的文件数据?

我答:这个简单,你在koa-body里配一下multipart,然后在ctx.request.files取一下

他又问:好的。为什么配置了multipart就可以在ctx.request.files拿到呢?

我又答:因为koa-body帮你处理了

他再问:好的。那它是怎么处理的呢?

我:**...我去看下源码再来回答你**

梳理思路

通过前面刨根问底的对话,我们可以提炼出三个问题:

  • WHAT:通过什么能使Koa解析文件上传呢? 答:koa-body
  • HOW:如何进行配置呢? 答:开启multipart配置(注:更多细节在formidable配置中)
  • WHY:为什么 koa-body 可以解析上传的文件,它是什么时候在 ctx 上增加的属性呢?

前两个问题,我们现在已经有了答案。作为一个开发者,在掌握了WHAT和HOW之后,我们更应该沉下心来,对WHY背后的原理进行探索。要知其然更知其所以然

那么,对于这类原理性的问题,我们的思路不用多说:看源码

简单分析 koa-body 源码

入口文件

我们分析NPM依赖肯定要从入口文件进行分析,既package.json中的main字段开始,一般来说都是index.js

根据Koa.js的中间件实现规范,我们可以从上面的代码中了解到:

  1. requestBody方法返回的function才是是真正执行的中间件
  2. 服务启动时,requestBody方法会初始化相关配置

requestBody 返回中间件实现

接下来我们来看中间件真正实现的代码。

由于屏幕原因我对一些无关的代码进行了折叠,我们重点看红框处。

  1. opts.multipart(配置验证)与ctx.is('multipart')(请求头验证)都为true时,判断为文件上传场景,调用formy方法
  2. formy方法返回的promise实例resolved时,将promise实例返回的数据附加到ctx.request.bodyctx.request.files

这下WHY的后半部分谜底解开了:在真实处理逻辑的promise实例resolved后,koa-body会将返回的数据附加在ctx.request

formy 方法实现

通过上一部分的截图,我们可以看到对文件解析的逻辑都在formy方法中。那么这个方法是怎么实现的呢。

话不多说,上代码

代码语言:javascript复制
function formy(ctx, opts) {
  return new Promise(function (resolve, reject) {
    var fields = {};
    var files = {};
    var form = new forms.IncomingForm(opts);
    form.on('end', function () {
      return resolve({
        fields: fields,
        files: files
      });
    }).on('error', function (err) {
      return reject(err);
    }).on('field', function (field, value) {
      if (fields[field]) {
        if (Array.isArray(fields[field])) {
          fields[field].push(value);
        } else {
          fields[field] = [fields[field], value];
        }
      } else {
        fields[field] = value;
      }
    }).on('file', function (field, file) {
      if (files[field]) {
        if (Array.isArray(files[field])) {
          files[field].push(file);
        } else {
          files[field] = [files[field], file];
        }
      } else {
        files[field] = file;
      }
    });
    if (opts.onFileBegin) {
      form.on('fileBegin', opts.onFileBegin);
    }
    form.parse(ctx.req);
  });
}

逻辑不多,我们再来总结一下。

  1. new 了一个 IncomingForm实例,传入formidable对应的配置
  2. 调用实例的parse方法,并监听enderrorfilefield等事件
  3. end事件的回调中,进行resolve

那么forms.IncomingForm是哪来的呢?

代码语言:javascript复制
const forms = require('formidable');

原来是koa-body引用的第三方依赖formidable

这下我们明确了,**ctx.request对象上附加的数据是在formidable.IncomingForm实例中进行处理,通过filefield等事件回调进行接收,最后在end事件回调中进行返回的**。

简单分析 formidable 源码

入口文件

通过前面的分析,我们知道了 koa-body 对于文件的处理是引用的 formidable。我们还是从入口文件进行分析。

入口代码非常简单,核心逻辑看来都在Formidable

Formidable.js 分析

先来对Formidable.js有一个宏观印象:

  1. 定义并导出了 IncomingForm
  2. IncomingForm继承了EventEmitter
  3. IncomingForm定义了诸多方法

通过继承EventEmitterIncomingForm实例可以在合适的时机触发相关事件并监听处理

parse 过程

回到上文,IncomingForm实例调用了parse方法。我们从parse方法开始入手分析。

通过红框处逻辑我们可以看到,parse方法的职责主要有两个:

  1. 解析请求header,设置parser
  2. 监听req参数的data事件,处理数据。

通过前面传入的参数,我们知道 req参数就是ctx.reqNode.js原生request对象

这下谜底又解开了一部分,**koa-body是怎么拿到上传的文件数据的呢?通过监听Node.js原生request对象的data事件**

write 过程

本节涉及到很多方法嵌套调用,我统称为write过程。

writeHeaders方法的调用链是这样的(代码比较长,大家感兴趣的可以自行去看源码):

writeHeaders() -> _parseContentType() -> initMultipart() -> MultipartParser

经过一串复杂的调用,我们完成了writeHeaders方法的使命:解析请求header,设置parser

MultipartParser 源码分析

我们可以看到,定义的MultipartParser类继承了Transform

我们简单看下官方文档对于Tranform流的定义

通过定义的_transform方法,我们可以对写进流的buffer进行处理

代码语言:javascript复制
  _transform(buffer, _, done) {
    let i = 0;
    let prevIndex = this.index;
    let { index, state, flags } = this;
    const { lookbehind, boundary, boundaryChars } = this;
    const boundaryLength = boundary.length;
    const boundaryEnd = boundaryLength - 1;
    this.bufferLength = buffer.length;
    let c = null;
    let cl = null;

    const setMark = (name, idx) => {
      this[`${name}Mark`] = typeof idx === 'number' ? idx : i;
    };

    const clearMarkSymbol = (name) => {
      delete this[`${name}Mark`];
    };

    const dataCallback = (name, shouldClear) => {
      const markSymbol = `${name}Mark`;
      if (!(markSymbol in this)) {
        return;
      }

      if (!shouldClear) {
        this._handleCallback(name, buffer, this[markSymbol], buffer.length);
        setMark(name, 0);
      } else {
        this._handleCallback(name, buffer, this[markSymbol], i);
        clearMarkSymbol(name);
      }
    };

    for (i = 0; i < this.bufferLength; i  ) {
      c = buffer[i];
      switch (state) {
        case STATE.PARSER_UNINITIALIZED:
          return i;
        case STATE.START:
          index = 0;
          state = STATE.START_BOUNDARY;
        case STATE.START_BOUNDARY:
          if (index === boundary.length - 2) {
            if (c === HYPHEN) {
              flags |= FBOUNDARY.LAST_BOUNDARY;
            } else if (c !== CR) {
              return i;
            }
            index  ;
            break;
          } else if (index - 1 === boundary.length - 2) {
            if (flags & FBOUNDARY.LAST_BOUNDARY && c === HYPHEN) {
              this._handleCallback('end');
              state = STATE.END;
              flags = 0;
            } else if (!(flags & FBOUNDARY.LAST_BOUNDARY) && c === LF) {
              index = 0;
              this._handleCallback('partBegin');
              state = STATE.HEADER_FIELD_START;
            } else {
              return i;
            }
            break;
          }

          if (c !== boundary[index   2]) {
            index = -2;
          }
          if (c === boundary[index   2]) {
            index  ;
          }
          break;
        case STATE.HEADER_FIELD_START:
          state = STATE.HEADER_FIELD;
          setMark('headerField');
          index = 0;
        case STATE.HEADER_FIELD:
          if (c === CR) {
            clearMarkSymbol('headerField');
            state = STATE.HEADERS_ALMOST_DONE;
            break;
          }

          index  ;
          if (c === HYPHEN) {
            break;
          }

          if (c === COLON) {
            if (index === 1) {
              // empty header field
              return i;
            }
            dataCallback('headerField', true);
            state = STATE.HEADER_VALUE_START;
            break;
          }

          cl = lower(c);
          if (cl < A || cl > Z) {
            return i;
          }
          break;
        case STATE.HEADER_VALUE_START:
          if (c === SPACE) {
            break;
          }

          setMark('headerValue');
          state = STATE.HEADER_VALUE;
        case STATE.HEADER_VALUE:
          if (c === CR) {
            dataCallback('headerValue', true);
            this._handleCallback('headerEnd');
            state = STATE.HEADER_VALUE_ALMOST_DONE;
          }
          break;
        case STATE.HEADER_VALUE_ALMOST_DONE:
          if (c !== LF) {
            return i;
          }
          state = STATE.HEADER_FIELD_START;
          break;
        case STATE.HEADERS_ALMOST_DONE:
          if (c !== LF) {
            return i;
          }

          this._handleCallback('headersEnd');
          state = STATE.PART_DATA_START;
          break;
        case STATE.PART_DATA_START:
          state = STATE.PART_DATA;
          setMark('partData');
        case STATE.PART_DATA:
          prevIndex = index;

          if (index === 0) {
            // boyer-moore derrived algorithm to safely skip non-boundary data
            i  = boundaryEnd;
            while (i < this.bufferLength && !(buffer[i] in boundaryChars)) {
              i  = boundaryLength;
            }
            i -= boundaryEnd;
            c = buffer[i];
          }

          if (index < boundary.length) {
            if (boundary[index] === c) {
              if (index === 0) {
                dataCallback('partData', true);
              }
              index  ;
            } else {
              index = 0;
            }
          } else if (index === boundary.length) {
            index  ;
            if (c === CR) {
              // CR = part boundary
              flags |= FBOUNDARY.PART_BOUNDARY;
            } else if (c === HYPHEN) {
              // HYPHEN = end boundary
              flags |= FBOUNDARY.LAST_BOUNDARY;
            } else {
              index = 0;
            }
          } else if (index - 1 === boundary.length) {
            if (flags & FBOUNDARY.PART_BOUNDARY) {
              index = 0;
              if (c === LF) {
                // unset the PART_BOUNDARY flag
                flags &= ~FBOUNDARY.PART_BOUNDARY;
                this._handleCallback('partEnd');
                this._handleCallback('partBegin');
                state = STATE.HEADER_FIELD_START;
                break;
              }
            } else if (flags & FBOUNDARY.LAST_BOUNDARY) {
              if (c === HYPHEN) {
                this._handleCallback('partEnd');
                this._handleCallback('end');
                state = STATE.END;
                flags = 0;
              } else {
                index = 0;
              }
            } else {
              index = 0;
            }
          }

          if (index > 0) {
            // when matching a possible boundary, keep a lookbehind reference
            // in case it turns out to be a false lead
            lookbehind[index - 1] = c;
          } else if (prevIndex > 0) {
            // if our boundary turned out to be rubbish, the captured lookbehind
            // belongs to partData
            this._handleCallback('partData', lookbehind, 0, prevIndex);
            prevIndex = 0;
            setMark('partData');

            // reconsider the current character even so it interrupted the sequence
            // it could be the beginning of a new sequence
            i--;
          }

          break;
        case STATE.END:
          break;
        default:
          return i;
      }
    }

    dataCallback('headerField');
    dataCallback('headerValue');
    dataCallback('partData');

    this.index = index;
    this.state = state;
    this.flags = flags;

    done();
    return this.bufferLength;
  }

_transform的逻辑也很简单:遍历buffer,在不同的阶段进行不同的处理并通过_handleCallback方法发出不同的事件进而触发对应的回调

注:_handleCallback方法实现很有意思,有兴趣的同学可以自己看下

parser.on('data')

上文我们说到,parser流被写入数据时,会调用_transform方法进行处理,同时触发各个周期事件的回调

事件回调的代码如图所示:

我们需要关注的是headersEnd事件,headsEnd事件的回调中,调用了IncomingForm实例的onPart方法

为什么说this.onPart调用的是IncomingForm实例的方法呢,可以看下前面的代码,有一步call绑定this的操作

在层层嵌套调用中,我们终于回到了IncomingForm的逻辑里,可喜可贺。

write 过程简单总结

我们再来回顾一下write过程:

  • this.writeHeaders(req.headers); - 设置parser
  • req.on('data') - 调用parserwrite方法,向流中写入数据
  • parser.on('headersEnd') - headers解析完毕,调用this.onPart方法

保存过程

_handlePart 方法源码分析

headers解析完毕之后,我们会调用this.onPart方法。而this.onPart方法的核心逻辑都在_handlePart方法中。接下来我们来对_handlePart源码进行分析

代码语言:javascript复制
  _handlePart(part) {
    if (part.originalFilename && typeof part.originalFilename !== 'string') {
      this._error(
        new FormidableError(
          `the part.originalFilename should be string when it exists`,
          errors.filenameNotString,
        ),
      );
      return;
    }

    if (!part.mimetype) {
      let value = '';
      const decoder = new StringDecoder(
        part.transferEncoding || this.options.encoding,
      );

      part.on('data', (buffer) => {
        this._fieldsSize  = buffer.length;
        if (this._fieldsSize > this.options.maxFieldsSize) {
          this._error(
            new FormidableError(
              `options.maxFieldsSize (${this.options.maxFieldsSize} bytes) exceeded, received ${this._fieldsSize} bytes of field data`,
              errors.maxFieldsSizeExceeded,
              413, // Payload Too Large
            ),
          );
          return;
        }
        value  = decoder.write(buffer);
      });

      part.on('end', () => {
        this.emit('field', part.name, value);
      });
      return;
    }

    if (!this.options.filter(part)) {
      return;
    }

    this._flushing  = 1;

    const newFilename = this._getNewName(part);
    const filepath = this._joinDirectoryName(newFilename);
    
    // new 了一个 File 实例
    const file = this._newFile({
      newFilename,
      filepath,
      originalFilename: part.originalFilename,
      mimetype: part.mimetype,
    });
    file.on('error', (err) => {
      this._error(err);
    });
    this.emit('fileBegin', part.name, file);

    // 调用 open 方法
    file.open();
    this.openedFiles.push(file);

    // 监听 data 时间
    part.on('data', (buffer) => {
      this._fileSize  = buffer.length;
      if (this._fileSize < this.options.minFileSize) {
        this._error(
          new FormidableError(
            `options.minFileSize (${this.options.minFileSize} bytes) inferior, received ${this._fileSize} bytes of file data`,
            errors.smallerThanMinFileSize,
            400,
          ),
        );
        return;
      }
      if (this._fileSize > this.options.maxFileSize) {
        this._error(
          new FormidableError(
            `options.maxFileSize (${this.options.maxFileSize} bytes) exceeded, received ${this._fileSize} bytes of file data`,
            errors.biggerThanMaxFileSize,
            413,
          ),
        );
        return;
      }
      if (buffer.length === 0) {
        return;
      }
      this.pause();
      
      // 写入 file
      file.write(buffer, () => {
        this.resume();
      });
    });

    // 监听 end 事件 
    part.on('end', () => {
      if (!this.options.allowEmptyFiles && this._fileSize === 0) {
        this._error(
          new FormidableError(
            `options.allowEmptyFiles is false, file size should be greather than 0`,
            errors.noEmptyFiles,
            400,
          ),
        );
        return;
      }

      // 关闭事件
      file.end(() => {
        this._flushing -= 1;
        this.emit('file', part.name, file);
        this._maybeEnd();
      });
    });
  }

通过我在代码关键处加的注释,我们可以看到_handlePart方法的核心逻辑如下:

  1. new 一个 File 对象的实例 file
  2. 开启 file
  3. 监听 partdata 事件,将数据写入 file
  4. 监听 partend 事件,关闭 file

在解读这段核心逻辑前,我们需要明确两个重要信息

  1. 传入的参数 part 是什么? part是在initMultipart方法中创建的可读流,通过data事件向外界传输数据
  1. File 对象的实例 file 是什么? 是根据传入的filePath创建的可写流

明白了这两个前提,这下我们终于明白了!

_handlePart方法就是打开一个文件流,把parser解析出的数据通过文件流写入文件,然后关闭

结束流程

_maybeEnd

我们的分析终于来到了尾声!

整体流程如何结束呢?

我们注意到有一个方法叫_maybeEnd当满足条件时,会触发end事件

我们选取this._flushing变量进行分析,如何满足条件。

可以看到,this._flushing初始值为0,当_handlePart方法执行时,this._flushing =1;当file关闭时,this._flushing-=1

由此我们可以分析出,当上传的文件都处理完毕时,this._flushing变量恢复为0,满足条件,触发end事件

大家看到end事件应该很兴奋,因为我们终于走到结束了。

真正结束

end事件被谁接收,相信大家心里已经有数了。

没错,我们回到了koa-body的代码中。后面的流程如下:

  1. formy方法返回的promise实例中监听到IncomingForm实例发出的end事件,promise实例进行resolve
  1. promise实例的then中接收到resolve返回的数据,附加到ctx.request对象上
  1. 中间件执行结束,调用next()

回答问题

本文带领大家从一个文件上传的例子入手,分析了koa-bodyformidable中关于处理文件上传的核心逻辑。

对于我们前面遗留的问题,相信大家此时已经有了答案。

简单回答,koa-body是如何处理文件上传的呢?

答:

  1. 通过req.on('data')获取数据
  2. 解析header,解析boundary
  3. 通过文件流写入本地文件中

分析总结

除了对koa-body文件上传流程有了清晰的了解之外,在整体探索分析过程中,我们还应该有一些别的收获,比如

  • 遇事不明,读源码
  • 对于一些依赖,源码与node_modules中下载的代码格式不同,二者对比阅读有奇效
  • 我们应该有使用流操作文件的意识
  • EventEmitter是通信神器,这种思路可以利用到业务代码中

0 人点赞