引言
Webpack Version 5 中对于任务调度实现了一套基于 AsyncQueue 的逻辑来管理各个任务之间的执行顺序。
深入研究了下,发现 Webpack 源代码中涉及到任务调度相关内容都会基于 AsyncQueue 来初始化队列,从而实现异步队列调用。
所谓 AsyncQueue 本质上就是 Webpack 开发任务自己实现的一款任务调度器,如果你有兴趣深入了解 Webpack ,那么理解 AsyncQueue 是你不可或缺的准备。
即使你暂时没有阅读源码的打算,我也会带你实现一款简单的 JS 任务调度器,合理的利用任务编排机制会为你的代码带来更加完整的逻辑处理以及更加高效的性能提升。
实现一款自定义的任务调度器函数不也是一件非常酷的事情吗。说不定哪天就用上了呢,对吧!
任务调度器
文章的开头我简单和大家聊一聊什么是任务调度器。
所谓调度器即是充当同一时间内对于多个任务进行分配,从而将任务有序列的调用执行。
我画了一张草图来辅助大家理解它的概念,假设此时 AsyncQueue 调度器同时最多支持处理两个并发任务。
此时我们拥有3个 Task 任务,每一个 Task 进入 AsyncQueue 时首先会判断当前是否可以立即执行,如果此时调度器已经拥堵那么新添加的任务将会进行排队等待,直至调度器释放出空闲那么前一个排队等待的任务才会继续执行。
当然这只是一个简单的示例,调度器同时支持处理多少个任务我们可以外部由外部参数决定而非固定数值。
简单来说,一个任务调度器拥有以下几个关键属性:
- 处理器函数,每个 Task 进入调度器都会经过该函数处理后得到返回结果。
- 并发数,同一时间调度器内部支持最多处理 Task 的个数。
这两个条件是调度器组成的基本内容,概念性的内容总是比较晦涩。没关系,接下来我们结合实际例子带你去看看它是如何在 Webpack 工作流中使用的。
大多数工具源代码中都存在任务编排的概念,在你开发一些自己的工具库时适当的利用任务调度机制会为你的工具带来更加优秀的性能和更加智能化的逻辑处理机制。
Webpack 中任务调度器
源码中的 AsyncQueue
上图为 webpack/lib/Compilation.js
的内容,左侧为 Webpack 4 右侧为 Webpack 5 。
上图我们可以清楚的看到版本 5 中对于 Compilation 上的一些实例属性全部通过了 new AsyncQueue 的形式来定义成为异步调度器队列。
在调度器中通过 processor 属性传入了对应的处理方法,使用 AsyncQueue 来管理内部的调度顺序。
Webpack parallelism 配置选项。
AsyncQueue 用法
我们看到在 Webpack 5 中大量运用了 AsyncQueue 来初始化 Compilation 中的实例属性。
AsyncQueue 本质上就是一款任务调度器,那么在 Webpack 中它是如何使用的呢,我们先来看一看它的用法。
代码语言:javascript复制const AsyncQueue = require('webpack/lib/util/AsyncQueue');
/**
*
* 处理器函数
* @param {*} item 需要传入里的item
* @param {*} callback 表示处理器完成的callback
*/
function processor(item, callback) {
setTimeout(() => {
item.number = Math.random();
callback(null, item);
},2000);
}
const queue = new AsyncQueue({
name: 'addNumber',
processor,
parallelism: 2,
getKey: (item) => item.key,
});
这里我们引入了 AsyncQueue 在 Webapck 中的源码实现,它存放在 webpack/lib/util/AsyncQueue.js
中。
同时我们通过 new AsyncQueue 实例化了一个 queue ,它接受以下参数:
- name 表示当前调度器的名称,这里我们给他起名为 addNumber 。
- processor 表示当前调度器的处理函数,也就是说 AsyncQueue 中每个任务都会基于 processor 处理函数来处理后得到输出结果。
- parallelism 表示当前 AsyncQueue 支持的并发任务数量。
- getKey 这是一个函数,通过该函数我们获得每一个入栈 Task 的唯一 key。
这里我们初始化了一个调度器实例对象 queue ,当然 Webpack 源码中还支持一系列的钩子函数以及其他属性。
这里属性对于我们理解 AsyncQueue 用处暂时不会很大,所以我们暂时抛开这些额外属性。有兴趣的同学可以自行打开 webpack/lib/util/AsyncQueue.js
来查看这些属性。
此时我们尝试往调度器中添加任务来看一看:
代码语言:javascript复制queue.add({ key: 'item1', name: '19Qingfeng' }, (err, result) => {
console.log('item1处理后的结果', result);
});
queue.add({ key: 'item2', name: '19Qingfeng' }, (err, result) => {
console.log('item2处理后的结果');
});
queue.add({ key: 'item3', name: '19Qingfeng' }, (err, result) => {
console.log('item3处理后的结果');
});
我们通过 queue 向创建的调度器中添加了3条记录。
- 第一个参数表示需要处理的 item ,我们将会通过 AsyncQueue 中的 getKey 方法获取到每一个 item 的 key 属性从而通过该属性判断是否是重复添加的 item 。
- 至于其他参数就显得无关紧要了,我们只要保证需要处理的 item 中存在 getKey 方法需要的属性就可以了。
首先,前两个添加进入的 item1、item2 会加入调度器中立即调用,当 item3 加入调度器时因为我们设置的最大并行数量为 2 ,所以此时 item3 的加入会产生等待。
它需要等待已经在队列中的任务释放出空闲才可以执行接下来的任务。 代码上来说,即是当 item1、item2 加入队列会立即执行,此时 item3 在添加时会进入排队。
在 processor 处理 item1 完成后,调度器空闲 item3 才会进入调度器执行。
控制台会依次打印:
代码语言:javascript复制// 大约2s 处理完毕输出处理后结果
item1处理后的结果 { key: 'item1', name: '19Qingfeng', number: 0.9861293360853851 }
// 此时item1出队 item3进入执行 同时item2也会立马执行完毕
item2处理后的结果
// 大约4s后 item3会处理完成
item3处理后的结果
了解了 AsyncQueue 的基本用法之后,如果出现重复 key 会怎么办呢,实践出真知。我们来尝试一下:
代码语言:javascript复制queue.add({ key: 'item1', name: '19Qingfeng' }, (err, result) => {
console.log('item1处理后的结果', result);
});
queue.add({ key: 'item2', name: '19Qingfeng' }, (err, result) => {
console.log('item2处理后的结果');
});
queue.add({ key: 'item3', name: '19Qingfeng' }, (err, result) => {
console.log('item3处理后的结果');
});
queue.add({ key: 'item1', name: '19Qingfeng' }, (err, result) => {
console.log('ite1重复处理后的结果', result);
});
我们在之前的结果上额外添加了一个重复的 key 为 item1 的处理任务:
代码语言:javascript复制item1处理后的结果 { key: 'item1', name: '19Qingfeng', number: 0.3849850781468127 }
item1重复处理后的结果 { key: 'item1', name: '19Qingfeng', number: 0.3849850781468127 }
item2处理后的结果
item3处理后的结果
调度器内部会根据 getKey 方法的返回值来判断任务是否重复,如果任务重复那么添加的任务并不会被处理而是在之前已经添加过的重复任务执行完毕后,传入处理后的结果同时调用所有的完成回调函数。
实现任务调度器
上边我们谈到过 AsyncQueue 在 Webpack5 中的基础用法,这里我会完全将 AsyncQueue 和 Webpack 解耦,单独来聊聊如何实现一款任务调度器。
我们先从一张简单的图来入手调度器的流程,本质上每当我们调用 add 添加任务时都会往队列中添加任务同时通过 _willEnsureProcessing
属性防止一次 EventLoop 中多次执行队列调用函数。
我们会在本次 EventLoop 中收集入队的任务,并且通过 setImmediate 在下次 EventLoop 中迭代调度器调用需要执行的函数。
也许此时你仍然不太理解 _willEnsureProcessing 的作用,没关系,我们先来实现它。
AsyncQueue 在 Webpack 源码中是基于 tapable 的 hooks 来调用的,不过它的逻辑对我们来说没有多大作用,关于 tapable 的逻辑这里我们直接忽略它。
初始化参数
代码语言:javascript复制class AsyncQueue {
constructor(options) {
this.options = options;
// 名称
this.name = options.name;
// 处理器函数
this.processor = options.processor;
// 并发执行最大数
this.parallelism = options.parallelism || 100;
// 唯一标示函数
this.getKey = options.getKey;
// 保存当前队列中等执行的任务
this._queued = new ArrayQueue();
// 保存当前队列中所有已经执行过的任务
this._entries = new Map();
// 当前并发任务
this._activeTasks = 0;
// 是否开启下次事件队列EventLoop中等待执行的函数
this._willEnsureProcessing = false;
// 队列是否已经结束
this._stopped = false;
}
}
module.exports = AsyncQueue;
首先我们初始化了 AsyncQueue 的构造函数,在 AsyncQueue 初始化时我们保存了外部传入的参数。
同时定义了内部的私有属性:
this._queued
这是一个 ArrayQueue 的实例,在这个属性内部我们按照添加顺序保存即将要执行的所有任务。this._entry
这个属性中保存了所有进入调度器的任务,我们主要用它来判断添加的重复任务。this._activeTasks
它保存当前调度器并发的任务数。this._willEnsureProcessing
这个属性主要用于判断下一次 EventLoop 中是否已经开启调度器的执行,主要用于避免重复调用。this._stopped
这个属性用来判断当前队列是否被暂停。
ArrayQueue
在初始化参数阶段,我们通过 new ArrayQueue 创建了一个保存当前执行任务的队列。
代码语言:javascript复制class ArrayQueue {
constructor(items) {
this._list = items ? Array.from(items) : [];
}
// 入队
enqueue(item) {
this._list.push(item);
}
// 出队
dequeue() {
return this._list.shift();
}
}
ArrayQueue 内部的实现非常简单,他在内部维护一个 _list 属性, _list 内部存储排队等待的任务。
当调用 enqueue 时会在尾部添加一个 Task,当调用 dequeue 时会从首部弹出一个任务来执行。
Add Task
接下来,我们来看看每次往调度器中添加 Task 会发生什么:
代码语言:javascript复制 // ...
// 添加任务时
add(item, callback) {
if (this._stopped) {
return callback(new Error('Queue was stopped'));
}
// 获取当前添加的唯一key
const key = this.getKey(item);
// 创建一个新的entry对象
const newEntry = new AsyncQueueEntry(item, callback);
// 将entry添加进入this._entries
this._entries.set(key, newEntry);
// Task入队
this._queued.enqueue(newEntry);
// _willEnsureProcessing为false表示下次EventLoop中并不会调用调用器执行任务
// 当_willEnsureProcessing为false时我们需要在下一次EventLoop中执行调度器中的任务
// 并且将_willEnsureProcessing设置为true,防止本次EventLoop多次add造成下次EventLoop中多次重复执行任务
if (!this._willEnsureProcessing) {
this._willEnsureProcessing = true;
// 下一次EventLoop中调用_ensureProcessing执行调度器中的任务
setImmediate(this._ensureProcessing.bind(this));
}
}
// ...
首先在添加任务时,如果当前队列已经暂停那么会直接调用 add 方法传入的 callback 并且传递一个错误对象。
之后我们获得通过 this._getKey 获得唯一标识符,并且将 Add 添加的方法添加进入 this._entries 和 this._queued 中。
首先,这里有一个 AsyncQueueEntry 的类。我们利用 new AsyncQueueEntry 创建了一个 Task 实例,我们来看看它的实现:
代码语言:javascript复制// 还未执行
const QUEUED_STATE = 0;
// 正在处理
const PROCESSING_STATE = 1;
// 处理完成
const DONE_STATE = 2;
class AsyncQueueEntry {
constructor(item, callback) {
// 保存传入Task需要处理的值
this.item = item;
// 初始化状态
this.state = QUEUED_STATE;
// 保存传入Task完成的Callback
this.callback = callback;
// 用于重复Task的处理 我们会在稍微用到它
this.callbacks = undefined;
// 保存当前任务处理后的结果
this.result = undefined;
// 保存当前任务处理后的错误
this.error = undefined;
}
}
在 new AsyncEntry(item,callback) 中我们实例化了一个 Task 任务,定义了一系列属性来存储本次 Task 的相关信息,比如它的 callback、state 状态、result 结果等等之类。
当通过 add 方法往调度器添加一个 Task 时,我们通过 setImmediate 在下一次事件循环中调用 _ensureProcessing 方法来开启调度器执行队列。
比如我们上边的代码:
代码语言:javascript复制queue.add({ key: 'item1', name: '19Qingfeng' }, (err, result) => {
console.log('item1处理后的结果', result);
});
queue.add({ key: 'item2', name: '19Qingfeng' }, (err, result) => {
console.log('item2处理后的结果');
});
queue.add({ key: 'item3', name: '19Qingfeng' }, (err, result) => {
console.log('item3处理后的结果');
});
我们在一次事件循环中调用了多次 queue.add 方法,如果没有 _willEnsureProcessing 的判断会导致每次调用 add 方法时都会再次 setImmediate(this._ensureProcessing.bind(this));
方法。
事实上,当本次事件循环中无论多少次调用 add 方法添加任务,我们仅需要调用一次 setImmediate(this._ensureProcessing.bind(this));
方法告诉调度器在下次事件循环中迭代任务队列进行执行就好了。
_ensureProcessing
接下来我们来看看 _ensureProcessing 这个方法,在添加完成 Task 后,开始执行时我们需要利用 _activeTasks 和 parallelism 来做判断,如果当前调度器并没有达到最大的并发那么就让最先添加的 Task 出队执行。
代码语言:javascript复制 // ...
// 迭代队列执行
_ensureProcessing() {
// 当还可以执行时
while (this._activeTasks < this.parallelism) {
// 获取最顶部排队任务
const entry = this._queued.dequeue();
// 如果已经没有任务了直接退出while循环
if (entry === undefined) break;
this._activeTasks ;
// 修改任务状态处理中
entry.state = PROCESSING_STATE;
this._startProcess(entry);
}
// 重置本次EventLoop中的_willEnsureProcessing为false
this._willEnsureProcessing = false
}
_startProcessing
上边我们迭代了 this._queued ,根据并发数要求取出队列中顶部的任务调用 startProcess 方法进行执行,此时我们来一起实现一下这个方法:
代码语言:javascript复制 // 处理Task
_startProcess(entry) {
this.processor(entry.item, (e, r) => {
if (e) {
this._handleResult(
entry,
new Error(`AsyncQueue(${this.name} processor error.)`)
);
}
this._handleResult(entry, e, r);
});
}
可以看到在 _startProcess 内部做的事情非常简单,通过调用初始化调度器时传入的 processor 函数得到处理后的结果。
所谓 _startProcessing 正如它的名字那样,它仅仅管理处理单个 Task 的逻辑,当 this._startProcess 处理完成后会将处理结果交 this._handleResult 函数来处理。
_handleResult
接下来我们一起来实现 _handleResult 方法:
代码语言:javascript复制 // 当Task处理完成时
_handleResult(entry, e, r) {
const callback = entry.callback;
entry.state = DONE_STATE;
entry.callback = undefined;
entry.result = r;
entry.error = e;
this._activeTasks--;
callback(e, r);
// 当调度器任务完成时
// 如果下一次EventLoop中并没有安排调度器执行
// 那么重置this._willEnsureProcessing状态 开启调度器执行
if (!this._willEnsureProcessing) {
this._willEnsureProcessing = true;
setImmediate(this._ensureProcessing);
}
}
当某个 Task 处理完成后,我们通过参数中传递 entry 参数获取对应的 Task 任务。
接下里做的事情就非常简单了,我们将处理后的结果以及对应的错误保存在 entry 中,同时让 this._activeTasks 并发数减一,调用 entry.callback 传入对应的结果表示处理完成。
同时,如果 _willEnsureProcessing 为 false 时,表示下一次 EventLoop 中并没有调用调度器执行。此时因为调度器执行完一个任务我们应该在下一次事件循环中继续调用调度器执行。
验证结果
整个代码一气呵成,其实它并不是很难,对吧。写到这里基础的处理逻辑已经完成了。针对上方的 Demo 我们利用这份自己实现的 AsyncQueue 来尝试运行下:
代码语言:javascript复制const AsyncQueue = require('./asyncQueue');
/**
*
* 处理器函数
* @param {*} item 需要传入里的item
* @param {*} callback 表示处理器完成的callback
*/
function processor(item, callback) {
setTimeout(() => {
item.number = Math.random();
callback(null, item);
}, 2000);
}
const queue = new AsyncQueue({
name: 'addNumber',
processor,
parallelism: 2,
getKey: (item) => item.key,
});
queue.add({ key: 'item1', name: '19Qingfeng' }, (err, result) => {
console.log('item1处理后的结果', result);
});
queue.add({ key: 'item2', name: '19Qingfeng' }, (err, result) => {
console.log('item2处理后的结果');
});
queue.add({ key: 'item3', name: '19Qingfeng' }, (err, result) => {
console.log('item3处理后的结果');
});
检查控制台的输出如愿以偿的得到了我们想要的结果。但是此时我们遗漏了一个小问题,我们并没有对与重复的 item 进行判断。
换句话说,如果存在重复的 Item 时,此时调度器并不会判断重复的任务而是会将 key 重复的任务当作一个全新的任务去处理。我们可以来试试:
代码语言:javascript复制queue.add({ key: 'item1', name: '19Qingfeng' }, (err, result) => {
console.log('item1处理后的结果', result);
});
queue.add({ key: 'item2', name: '19Qingfeng' }, (err, result) => {
console.log('item2处理后的结果');
});
queue.add({ key: 'item3', name: '19Qingfeng' }, (err, result) => {
console.log('item3处理后的结果');
});
// 此时我添加了一个重复的 key 为 item1 的任务
queue.add({ key: 'item1', name: '19Qingfeng' }, (err, result) => {
console.log('item1重复处理后的结果', err, result);
});
从打印结果中你可以发现,针对于重复的 item1 任务完全是当作了一个新的任务来处理。
这对于一个任务调度器来说,这样的逻辑是完全不合理的。我希望的是当存在重复的 key 值时,我会用上一个相同 key 的处理结果来调用重复的 callback 即可,完全没有必要重新在进入队列处理一次。
重复 Task 处理
借助于 webpack 中 AsyncQueue 的思路,我们可以轻松的实现这个功能。
或者你驻足思考下,你会用何种方式来处理重复 key 来达到想要我们想要的逻辑呢。
细心的同学可能会发现我们在 AsyncQueueEntry 构造函数中保留的 callbacks 属性并没有被用到,它正是我们解决问题的关键。
针对于重复的任务我们利用一个额外的 callbacks 参数来保存相同 Task 的不同回调函数不就解决了吗。
顺着这个思路,我们来试一试。
首先,我们回到 add 方法中:
代码语言:javascript复制 // 添加任务时
add(item, callback) {
if (this._stopped) {
return callback(new Error('Queue was stopped'));
}
// 获取当前添加的唯一key
const key = this.getKey(item);
// 如果存在重复的key
const entry = this._entries.get(key);
if (entry !== undefined) {
// 如果之前重复的Task已经执行完毕
if (entry.state === DONE_STATE) {
// 直接调用callback传入之前已经处理完成的结果
process.nextTick(() => callback(entry.error, entry.result));
} else if (entry.callbacks === undefined) {
entry.callbacks = [callback]
} else {
entry.callbacks.push(callback)
}
return;
}
// 创建一个新的entry对象
const newEntry = new AsyncQueueEntry(item, callback);
// 将entry添加进入this._entries
this._entries.set(key, newEntry);
// Task入队
this._queued.enqueue(newEntry);
// _willEnsureProcessing为false表示下次EventLoop中并不会调用调用器执行任务
// 当_willEnsureProcessing为false时我们需要在下一次EventLoop中执行调度器中的任务
// 并且将_willEnsureProcessing设置为true,防止本次EventLoop多次add造成下次EventLoop中多次重复执行任务
if (!this._willEnsureProcessing) {
this._willEnsureProcessing = true;
// 下一次EventLoop中调用_ensureProcessing执行调度器中的任务
setImmediate(this._ensureProcessing.bind(this));
}
}
这里我们对于 add 的逻辑进行了稍稍修改。在调用 add 方法时,我们首先从 this._entries 中检查之前是否已经添加过相同的 Task 。
如果存在相同的 Task ,此时会有三种情况:
- 如果之前重复的 Task 已经执行完毕,那么此时我们将本次重复添加的 callback 利用之前已经处理完成的结果调用就可以了。
- 如果当前 entry.callbacks === undefined 表示该 Task 首次重复,我们为 callbacks 进行初始化并且注入本次重复的 callback。
- 如果以上两种情况都不满足,那么表示该 key 对应的 Task 已经存在过重复且执行还未完成,那么我们往当前 entry 的 callbacks 参数中添加重复的 callback 等待 Task 执行完毕调用即可。
同样在修改了 add 逻辑之后我们还有修改 Task 执行完毕后的调用逻辑:
代码语言:javascript复制 // 当Task处理完成时
_handleResult(entry, e, r) {
const callback = entry.callback;
const callbacks = entry.callbacks;
entry.state = DONE_STATE;
entry.callback = undefined;
entry.callbacks = undefined;
entry.result = r;
entry.error = e;
this._activeTasks--;
callback(e, r);
if (callbacks !== undefined) {
for (const callback of callbacks) {
callback(e, r);
}
}
// 当调度器任务完成时
// 如果下一次EventLoop中并没有安排调度器执行
// 那么重置this._willEnsureProcessing状态 开启调度器执行
if (!this._willEnsureProcessing) {
this._willEnsureProcessing = true;
setImmediate(this._ensureProcessing.bind(this));
}
}
我们在 Task 执行完成后不仅仅会执行本次 Task 对应的 callback,同时会检查是否存在 Key 值重复的任务。
如果存在,我们也会通过一次处理函数的结果清空重复添加任务的回调函数,也就是 callbacks 属性。
此时我们再来调用上述存在重复的 Demo 来验证下输出结果:
此时针对于我们注入 key 为 item1 的重复任务,相同 key 的任务仅会被 processor 处理一次将处理后的结果传递给分别的回调函数中。
结尾
至此,基于 NodeJs 的一个简单任务调度器我们已经执行了。
照例,感谢每一位看到结尾的小伙伴。
有兴趣的了解 Webpack 更多知识的朋友可以关注我的专栏 从原理玩转 Webpack。