故事背景
某天,团队的新来的很爱问问题的小伙伴突然问我:怎么在Koa服务中处理接收上传的文件数据?
我答:这个简单,你在koa-body
里配一下multipart
,然后在ctx.request.files
取一下
他又问:好的。为什么配置了multipart
就可以在ctx.request.files
拿到呢?
我又答:因为koa-body
帮你处理了
他再问:好的。那它是怎么处理的呢?
我:**...我去看下源码再来回答你**
梳理思路
通过前面刨根问底的对话,我们可以提炼出三个问题:
- WHAT:通过什么能使Koa解析文件上传呢?
答:
koa-body
- HOW:如何进行配置呢?
答:开启
multipart
配置(注:更多细节在formidable
配置中) - WHY:为什么 koa-body 可以解析上传的文件,它是什么时候在 ctx 上增加的属性呢?
前两个问题,我们现在已经有了答案。作为一个开发者,在掌握了WHAT和HOW之后,我们更应该沉下心来,对WHY背后的原理进行探索。要知其然更知其所以然。
那么,对于这类原理性的问题,我们的思路不用多说:看源码
简单分析 koa-body 源码
入口文件
我们分析NPM依赖肯定要从入口文件进行分析,既从package.json
中的main
字段开始,一般来说都是index.js
根据Koa.js
的中间件实现规范,我们可以从上面的代码中了解到:
requestBody
方法返回的function
才是是真正执行的中间件- 服务启动时,
requestBody
方法会初始化相关配置
requestBody 返回中间件实现
接下来我们来看中间件真正实现的代码。
由于屏幕原因我对一些无关的代码进行了折叠,我们重点看红框处。
- 当
opts.multipart
(配置验证)与ctx.is('multipart')
(请求头验证)都为true
时,判断为文件上传场景,调用formy
方法 - 当
formy
方法返回的promise
实例resolved
时,将promise
实例返回的数据附加到ctx.request.body
及ctx.request.files
上
这下WHY的后半部分谜底解开了:在真实处理逻辑的promise
实例resolved后,koa-body
会将返回的数据附加在ctx.request
上
formy 方法实现
通过上一部分的截图,我们可以看到对文件解析的逻辑都在formy
方法中。那么这个方法是怎么实现的呢。
话不多说,上代码
代码语言:javascript复制function formy(ctx, opts) {
return new Promise(function (resolve, reject) {
var fields = {};
var files = {};
var form = new forms.IncomingForm(opts);
form.on('end', function () {
return resolve({
fields: fields,
files: files
});
}).on('error', function (err) {
return reject(err);
}).on('field', function (field, value) {
if (fields[field]) {
if (Array.isArray(fields[field])) {
fields[field].push(value);
} else {
fields[field] = [fields[field], value];
}
} else {
fields[field] = value;
}
}).on('file', function (field, file) {
if (files[field]) {
if (Array.isArray(files[field])) {
files[field].push(file);
} else {
files[field] = [files[field], file];
}
} else {
files[field] = file;
}
});
if (opts.onFileBegin) {
form.on('fileBegin', opts.onFileBegin);
}
form.parse(ctx.req);
});
}
逻辑不多,我们再来总结一下。
- new 了一个
IncomingForm
实例,传入formidable
对应的配置 - 调用实例的
parse
方法,并监听end
、error
、file
、field
等事件 - 在
end
事件的回调中,进行resolve
那么forms.IncomingForm
是哪来的呢?
const forms = require('formidable');
原来是koa-body
引用的第三方依赖formidable
这下我们明确了,**ctx.request
对象上附加的数据是在formidable.IncomingForm
实例中进行处理,通过file
、field
等事件回调进行接收,最后在end
事件回调中进行返回的**。
简单分析 formidable 源码
入口文件
通过前面的分析,我们知道了 koa-body
对于文件的处理是引用的 formidable
。我们还是从入口文件进行分析。
入口代码非常简单,核心逻辑看来都在Formidable
中
Formidable.js 分析
先来对Formidable.js
有一个宏观印象:
- 定义并导出了
IncomingForm
类 IncomingForm
继承了EventEmitter
IncomingForm
定义了诸多方法
通过继承EventEmitter
,IncomingForm
实例可以在合适的时机触发相关事件并监听处理
parse 过程
回到上文,IncomingForm
实例调用了parse
方法。我们从parse
方法开始入手分析。
通过红框处逻辑我们可以看到,parse
方法的职责主要有两个:
- 解析请求header,设置parser
- 监听
req
参数的data
事件,处理数据。
通过前面传入的参数,我们知道 req
参数就是ctx.req
,Node.js
原生request
对象。
这下谜底又解开了一部分,**koa-body
是怎么拿到上传的文件数据的呢?通过监听Node.js
原生request
对象的data
事件**
write 过程
本节涉及到很多方法嵌套调用,我统称为write
过程。
writeHeaders
方法的调用链是这样的(代码比较长,大家感兴趣的可以自行去看源码):
writeHeaders()
-> _parseContentType()
-> initMultipart()
-> MultipartParser
经过一串复杂的调用,我们完成了writeHeaders
方法的使命:解析请求header,设置parser
MultipartParser 源码分析
我们可以看到,定义的MultipartParser
类继承了Transform
流
我们简单看下官方文档对于Tranform
流的定义
通过定义的_transform
方法,我们可以对写进流的buffer
进行处理
_transform(buffer, _, done) {
let i = 0;
let prevIndex = this.index;
let { index, state, flags } = this;
const { lookbehind, boundary, boundaryChars } = this;
const boundaryLength = boundary.length;
const boundaryEnd = boundaryLength - 1;
this.bufferLength = buffer.length;
let c = null;
let cl = null;
const setMark = (name, idx) => {
this[`${name}Mark`] = typeof idx === 'number' ? idx : i;
};
const clearMarkSymbol = (name) => {
delete this[`${name}Mark`];
};
const dataCallback = (name, shouldClear) => {
const markSymbol = `${name}Mark`;
if (!(markSymbol in this)) {
return;
}
if (!shouldClear) {
this._handleCallback(name, buffer, this[markSymbol], buffer.length);
setMark(name, 0);
} else {
this._handleCallback(name, buffer, this[markSymbol], i);
clearMarkSymbol(name);
}
};
for (i = 0; i < this.bufferLength; i ) {
c = buffer[i];
switch (state) {
case STATE.PARSER_UNINITIALIZED:
return i;
case STATE.START:
index = 0;
state = STATE.START_BOUNDARY;
case STATE.START_BOUNDARY:
if (index === boundary.length - 2) {
if (c === HYPHEN) {
flags |= FBOUNDARY.LAST_BOUNDARY;
} else if (c !== CR) {
return i;
}
index ;
break;
} else if (index - 1 === boundary.length - 2) {
if (flags & FBOUNDARY.LAST_BOUNDARY && c === HYPHEN) {
this._handleCallback('end');
state = STATE.END;
flags = 0;
} else if (!(flags & FBOUNDARY.LAST_BOUNDARY) && c === LF) {
index = 0;
this._handleCallback('partBegin');
state = STATE.HEADER_FIELD_START;
} else {
return i;
}
break;
}
if (c !== boundary[index 2]) {
index = -2;
}
if (c === boundary[index 2]) {
index ;
}
break;
case STATE.HEADER_FIELD_START:
state = STATE.HEADER_FIELD;
setMark('headerField');
index = 0;
case STATE.HEADER_FIELD:
if (c === CR) {
clearMarkSymbol('headerField');
state = STATE.HEADERS_ALMOST_DONE;
break;
}
index ;
if (c === HYPHEN) {
break;
}
if (c === COLON) {
if (index === 1) {
// empty header field
return i;
}
dataCallback('headerField', true);
state = STATE.HEADER_VALUE_START;
break;
}
cl = lower(c);
if (cl < A || cl > Z) {
return i;
}
break;
case STATE.HEADER_VALUE_START:
if (c === SPACE) {
break;
}
setMark('headerValue');
state = STATE.HEADER_VALUE;
case STATE.HEADER_VALUE:
if (c === CR) {
dataCallback('headerValue', true);
this._handleCallback('headerEnd');
state = STATE.HEADER_VALUE_ALMOST_DONE;
}
break;
case STATE.HEADER_VALUE_ALMOST_DONE:
if (c !== LF) {
return i;
}
state = STATE.HEADER_FIELD_START;
break;
case STATE.HEADERS_ALMOST_DONE:
if (c !== LF) {
return i;
}
this._handleCallback('headersEnd');
state = STATE.PART_DATA_START;
break;
case STATE.PART_DATA_START:
state = STATE.PART_DATA;
setMark('partData');
case STATE.PART_DATA:
prevIndex = index;
if (index === 0) {
// boyer-moore derrived algorithm to safely skip non-boundary data
i = boundaryEnd;
while (i < this.bufferLength && !(buffer[i] in boundaryChars)) {
i = boundaryLength;
}
i -= boundaryEnd;
c = buffer[i];
}
if (index < boundary.length) {
if (boundary[index] === c) {
if (index === 0) {
dataCallback('partData', true);
}
index ;
} else {
index = 0;
}
} else if (index === boundary.length) {
index ;
if (c === CR) {
// CR = part boundary
flags |= FBOUNDARY.PART_BOUNDARY;
} else if (c === HYPHEN) {
// HYPHEN = end boundary
flags |= FBOUNDARY.LAST_BOUNDARY;
} else {
index = 0;
}
} else if (index - 1 === boundary.length) {
if (flags & FBOUNDARY.PART_BOUNDARY) {
index = 0;
if (c === LF) {
// unset the PART_BOUNDARY flag
flags &= ~FBOUNDARY.PART_BOUNDARY;
this._handleCallback('partEnd');
this._handleCallback('partBegin');
state = STATE.HEADER_FIELD_START;
break;
}
} else if (flags & FBOUNDARY.LAST_BOUNDARY) {
if (c === HYPHEN) {
this._handleCallback('partEnd');
this._handleCallback('end');
state = STATE.END;
flags = 0;
} else {
index = 0;
}
} else {
index = 0;
}
}
if (index > 0) {
// when matching a possible boundary, keep a lookbehind reference
// in case it turns out to be a false lead
lookbehind[index - 1] = c;
} else if (prevIndex > 0) {
// if our boundary turned out to be rubbish, the captured lookbehind
// belongs to partData
this._handleCallback('partData', lookbehind, 0, prevIndex);
prevIndex = 0;
setMark('partData');
// reconsider the current character even so it interrupted the sequence
// it could be the beginning of a new sequence
i--;
}
break;
case STATE.END:
break;
default:
return i;
}
}
dataCallback('headerField');
dataCallback('headerValue');
dataCallback('partData');
this.index = index;
this.state = state;
this.flags = flags;
done();
return this.bufferLength;
}
_transform
的逻辑也很简单:遍历buffer
,在不同的阶段进行不同的处理并通过_handleCallback
方法发出不同的事件进而触发对应的回调。
注:_handleCallback
方法实现很有意思,有兴趣的同学可以自己看下
parser.on('data')
上文我们说到,当parser
流被写入数据时,会调用_transform
方法进行处理,同时触发各个周期事件的回调。
事件回调的代码如图所示:
我们需要关注的是headersEnd
事件,在headsEnd
事件的回调中,调用了IncomingForm
实例的onPart
方法
为什么说this.onPart
调用的是IncomingForm
实例的方法呢,可以看下前面的代码,有一步call
绑定this
的操作
在层层嵌套调用中,我们终于回到了IncomingForm
的逻辑里,可喜可贺。
write 过程简单总结
我们再来回顾一下write
过程:
this.writeHeaders(req.headers);
- 设置parser
req.on('data')
- 调用parser
的write
方法,向流中写入数据parser.on('headersEnd')
-headers
解析完毕,调用this.onPart
方法
保存过程
_handlePart
方法源码分析
当headers
解析完毕之后,我们会调用this.onPart
方法。而this.onPart
方法的核心逻辑都在_handlePart
方法中。接下来我们来对_handlePart
源码进行分析
_handlePart(part) {
if (part.originalFilename && typeof part.originalFilename !== 'string') {
this._error(
new FormidableError(
`the part.originalFilename should be string when it exists`,
errors.filenameNotString,
),
);
return;
}
if (!part.mimetype) {
let value = '';
const decoder = new StringDecoder(
part.transferEncoding || this.options.encoding,
);
part.on('data', (buffer) => {
this._fieldsSize = buffer.length;
if (this._fieldsSize > this.options.maxFieldsSize) {
this._error(
new FormidableError(
`options.maxFieldsSize (${this.options.maxFieldsSize} bytes) exceeded, received ${this._fieldsSize} bytes of field data`,
errors.maxFieldsSizeExceeded,
413, // Payload Too Large
),
);
return;
}
value = decoder.write(buffer);
});
part.on('end', () => {
this.emit('field', part.name, value);
});
return;
}
if (!this.options.filter(part)) {
return;
}
this._flushing = 1;
const newFilename = this._getNewName(part);
const filepath = this._joinDirectoryName(newFilename);
// new 了一个 File 实例
const file = this._newFile({
newFilename,
filepath,
originalFilename: part.originalFilename,
mimetype: part.mimetype,
});
file.on('error', (err) => {
this._error(err);
});
this.emit('fileBegin', part.name, file);
// 调用 open 方法
file.open();
this.openedFiles.push(file);
// 监听 data 时间
part.on('data', (buffer) => {
this._fileSize = buffer.length;
if (this._fileSize < this.options.minFileSize) {
this._error(
new FormidableError(
`options.minFileSize (${this.options.minFileSize} bytes) inferior, received ${this._fileSize} bytes of file data`,
errors.smallerThanMinFileSize,
400,
),
);
return;
}
if (this._fileSize > this.options.maxFileSize) {
this._error(
new FormidableError(
`options.maxFileSize (${this.options.maxFileSize} bytes) exceeded, received ${this._fileSize} bytes of file data`,
errors.biggerThanMaxFileSize,
413,
),
);
return;
}
if (buffer.length === 0) {
return;
}
this.pause();
// 写入 file
file.write(buffer, () => {
this.resume();
});
});
// 监听 end 事件
part.on('end', () => {
if (!this.options.allowEmptyFiles && this._fileSize === 0) {
this._error(
new FormidableError(
`options.allowEmptyFiles is false, file size should be greather than 0`,
errors.noEmptyFiles,
400,
),
);
return;
}
// 关闭事件
file.end(() => {
this._flushing -= 1;
this.emit('file', part.name, file);
this._maybeEnd();
});
});
}
通过我在代码关键处加的注释,我们可以看到_handlePart
方法的核心逻辑如下:
new
一个File
对象的实例file
- 开启
file
- 监听
part
的data
事件,将数据写入file
中 - 监听
part
的end
事件,关闭file
在解读这段核心逻辑前,我们需要明确两个重要信息:
- 传入的参数
part
是什么?part
是在initMultipart
方法中创建的可读流,通过data
事件向外界传输数据
File
对象的实例file
是什么? 是根据传入的filePath
创建的可写流
明白了这两个前提,这下我们终于明白了!
_handlePart
方法就是打开一个文件流,把parser
解析出的数据通过文件流写入文件,然后关闭。
结束流程
_maybeEnd
我们的分析终于来到了尾声!
整体流程如何结束呢?
我们注意到有一个方法叫_maybeEnd
,当满足条件时,会触发end
事件
我们选取this._flushing
变量进行分析,如何满足条件。
可以看到,this._flushing
初始值为0,当_handlePart
方法执行时,this._flushing =1
;当file
关闭时,this._flushing-=1
。
由此我们可以分析出,当上传的文件都处理完毕时,this._flushing
变量恢复为0,满足条件,触发end
事件
大家看到end
事件应该很兴奋,因为我们终于走到结束了。
真正结束
end
事件被谁接收,相信大家心里已经有数了。
没错,我们回到了koa-body
的代码中。后面的流程如下:
- 在
formy
方法返回的promise
实例中监听到IncomingForm
实例发出的end
事件,promise
实例进行resolve
- 在
promise
实例的then
中接收到resolve
返回的数据,附加到ctx.request
对象上
- 中间件执行结束,调用
next()
回答问题
本文带领大家从一个文件上传的例子入手,分析了koa-body
及formidable
中关于处理文件上传的核心逻辑。
对于我们前面遗留的问题,相信大家此时已经有了答案。
简单回答,koa-body是如何处理文件上传的呢?
答:
- 通过
req.on('data')
获取数据 - 解析
header
,解析boundary
- 通过文件流写入本地文件中
分析总结
除了对koa-body
文件上传流程有了清晰的了解之外,在整体探索分析过程中,我们还应该有一些别的收获,比如
- 遇事不明,读源码
- 对于一些依赖,源码与
node_modules
中下载的代码格式不同,二者对比阅读有奇效 - 我们应该有使用流操作文件的意识
EventEmitter
是通信神器,这种思路可以利用到业务代码中