Web Workers RPC:Comlink 源码解析

2022-11-02 17:58:25 浏览数 (1)

上篇文章,有提及 Web Workers RPC 以解决浏览器不阻塞UI的问题,其中 comlink 是一把利器,本文就 comlink 的关键源码进行解析。

Comlink 通过提供 RPC 实现将基于 Worker.postMessage(someObject) 的 API 变成了对开发人员更友好的“类似本地调用”方式。

拆解源码之前,先介绍几个重要的概念:Proxy、Channel Messaging API、Transferable objects

注意:worker 创建完成后,每次通信都是新建 MessageChannel,目的是避免消息冲突。

重要概念

proxy
代码语言:javascript复制
new Proxy(target, handler)
  • target 被代理的对象
  • handler 被代理对象上的自定义行为

handler 处理函数

说明

get

劫持获取属性值

set

劫持设置属性值

apply

劫持函数调用

construct

劫持 new 操作符

apply

代码语言:javascript复制
function sum(a, b) {
  return a   b
}

const handler = {
  apply: function(target, thisArg, args) {
    return target(...args) * 10
  }
};

const proxy1 = new Proxy(sum, handler)

console.log(sum(1, 2))			// 3
console.log(proxy1(1, 2)) 	// 30

construct

代码语言:javascript复制
class P {
  constructor (name) {
    this.name = name
  }
  sayName () {
    console.log(this.name)
    return this.name
  }
}

const ProxyP = new Proxy(P, {
  construct (target, args) {
    return new target(...args)
  }
})
new ProxyP('LiGang').sayName()	// LiGang
Channel Messaging API

Channel Messaging API 允许两个不同的脚本运行在同一个文档的不同浏览器上下文(比如两个 iframe,或者文档主体和一个 iframe,使用 SharedWorker 的两个文档,或者两个 worker)来直接通讯,在每端使用一个端口(port)通过双向频道(channel)向彼此传递消息。

使用 MessageChannel() 构造函数来创建通讯信道。一旦创建,信道的两个端口即可通过 MessageChannel.port1MessageChannel.port2 属性进行访问(都会返回 MessagePort 对象)。

MessageChannel 以 DOM Event 的形式发送消息,所以它属于异步的宏任务。

示例:作为 EventEmitter 事件订阅发布使用,实现脚本间通信

代码语言:javascript复制
/* one.js */
export default function (port) { port.onmessage/port.postMessage }
/* two.js */
export default function (port) { port.onmessage/port.postMessage }

/* index.js */
import one from 'one.js'
import two from 'two.js'

const { port1, port2 } = new MessageChannel()
one(port1)
two(port2)

通过 Channel Messaging 进行通信,也可以完成 worker 和 worker 直接通信,无需主进程。

Transferable objects

可转移对象是拥有可以从一个上下文转移到另一个上下文的资源的对象,确保资源一次只能在一个上下文中可用。转移后,原始对象不再可用;它不再指向传输的资源,任何读取或写入对象的尝试都将引发异常。

可转移对象通常用于共享一次只能安全地暴露给单个 JavaScript 线程的资源。

支持的对象:ArrayBuffer、MessagePort、ReadableStream、TransformStream、AudioData、ImageBitmap 等

Channel Messaging API 的 MessageChannel 接口允许我们创建一个新的消息通道,并通过它的两个 MessagePort 属性(port1/port2)发送数据。

myWorker.postMessage(aMessage, transferList) transferList(可选):一个可选的Transferable对象的数组,用于传递所有权。如果一个对象的所有权被转移,在发送它的上下文中将变为不可用(中止),并且只有在它被发送到的 worker 中可用。

源码解析

  1. 通过 Proxy 对 wrap(worker) 劫持相关操作;
  2. 通过 ep(worker/MessageChannel)进行 on message 以及 postMessage 操作;
    • 基本类型:直接通过 worker 传递;
    • 非基本类型:需要通过 MessageChannel 传递port,进行 expose、wrap 处理
  3. 通过 toWireValue/fromWireValue 对通信原始数据处理
wrap

① createProxy(Proxy:get/set/apply/construct):创建后,会生成相应 Proxy ② processArguments : Proxy(get/set/apply/construct) 劫持处理 ③ toWireValue:对传入参数进行统一格式处理 [{type,name,value}, [transferables]] ④ serialize(MessageChannel)=> expose:创建 MessageChannel 通信管道、同时监听 worker 的返回(通过 expose,下面介绍) ⑤ requestResponseMessage(on message、postMessage):监听消息,同时发送当前信息

代码语言:javascript复制
// ①
function createProxy<T>(
  ep: Endpoint,
  path: (string | number | symbol)[] = [],
  target: object = function () {}
): Remote<T> {
	return  new Proxy(target, {
    get() {},
    set() {},
    apply() {},
    construct() {
      const [argumentList, transferables] = processArguments(rawArgumentList);
      // ⑤
      return requestResponseMessage(...).then(fromWireValue)
    }
  }    
}

// ②
function processArguments(argumentList: any[]): [WireValue[], Transferable[]] {
  const processed = argumentList.map(toWireValue);
  return [processed.map((v) => v[0]), myFlat(processed.map((v) => v[1]))];
}

// ③    
function toWireValue(value: any): [WireValue, Transferable[]] {
  for (const [name, handler] of transferHandlers) {
     const [serializedValue, transferables] = handler.serialize(value);
  }
}
   
const proxyTransferHandler: TransferHandler<object, MessagePort> = {
  canHandle: (val): val is ProxyMarked =>
    isObject(val) && (val as ProxyMarked)[proxyMarker],
  // ④       
  serialize(obj) {
    const { port1, port2 } = new MessageChannel();
    expose(obj, port1);
    return [port2, [port2]];
  },
  deserialize(port) {
    port.start();
    return wrap(port);
  },
}    

// ⑤
function requestResponseMessage(): Promise<WireValue> {
  return new Promise((resolve) => {
    ep.addEventListener("message", function l(ev: MessageEvent) {
      resolve(ev.data);
    } as any);
    ep.postMessage({ id, ...msg }, transfers);
  });
}

关于 get 中 then 的特别说明:

根据 ECMAScript® 2022 Language Specification 中 await 的描述:

  • await value 在内部实现中会变成 await Promise.resolve(value)
  • 而 Promise.resolve 的处理中 则会获取 value.then 的值,如果它是一个函数则会通过它创建一个 Promise Job。

await value => await Promise.resolve(value) => await {then}

下述例子中 value 等于 success

代码语言:javascript复制
const value = await {
  then: (resolve, reject) => {
    resolve('success')
  }
}
expose

① on message:监听 message 事件 ② fromWireValue:对接受参数进行统一格式处理 ③ deserialize => wrap:发送消息对列、同时代理相关内容(通过 wrap,上面介绍) ④ GET/SET/APPLY/CONSTRUCT/ENDPONIT/RELEASE:针对不同MessageType,执行不同逻辑 ⑤ returnValue :依据分支 ④ 产生返回结果 ⑥ toWireValue ⑦ serialize(MessageChannel) ⑧ postMessage(transferables):发送

代码语言:javascript复制
export function expose(obj: any, ep: Endpoint = self as any) {
  // ①
  ep.addEventListener("message", function callback(ev: MessageEvent) {
    // ② ③
    const argumentList = (ev.data.argumentList || []).map(fromWireValue);
    switch (type) {
      // ④  
    	case GET/SET/APPLY/CONSTRUCT/ENDPONIT/RELEASE
        // ⑤
      	returnValue = ...  
    }
    Promise.resolve(returnValue)
      .catch((value) => {
      return { value, [throwMarker]: 0 };
    }).then((returnValue) => {
      // ⑥ ⑦ 
      const [wireValue, transferables] = toWireValue(returnValue);
      // ⑧
      ep.postMessage({ ...wireValue, id }, transferables);
    });
  }
}

// ②
function fromWireValue(value: WireValue): any {
  switch (value.type) {
    case WireValueType.HANDLER:
      // ③
      return transferHandlers.get(value.name)!.deserialize(value.value);
    case WireValueType.RAW:
      return value.value;
  }
}   
  
const proxyTransferHandler: TransferHandler<object, MessagePort> = {
  deserialize(port) {
    port.start();
    // 执行 wrap 流程
    return wrap(port);
  },
}   

使用

如果大家项目中需要使用 webWorker,强烈推荐大家尝试 Comlink,Comlink 同项目结合,可以使用 comlink-loader

0 人点赞