优雅的实现多线程环境下的协程调度 - 再谈 ASIO 与 Coroutine

2023-10-16 15:31:58 浏览数 (1)

[!info] 导语: 在先前的文章《从无栈协程到C 异步框架》中,我们探讨了如何将上层的协程调度器与底层的C 17协程实现以及C 20协程实现相结合,从而构建一个在单线程环境下易于使用的异步框架。通过相关示例,我们发现协程在表达线性类型业务方面具有显著优势。那么,在多线程环境下,当单个协程的执行不再受限于单一线程时,我们能否继续保持这种线性类型业务的友好表达,并在多线程环境中充分利用协程的优势呢?本篇文章将致力于解决这一核心问题。

1. 单线程环境下的 coroutine

我们先来重温一下单线程环境下的一些基本的设计和概念, 在上一篇中, 我们提到了实际业务中一个coroutine的基本表达模式如下:

(以C 20为例)

上层的调度器实现基本结构如下图所示:

我们在Scheduler中会将对应的coroutine创建为SchedTask, 然后在Scheduler中维护各类不同作用的SchedTask队列, 如用于立即唤醒的immediate_queue, 用于下一帧唤醒的nextframe_queue, 以及用于等待业务唤醒的wait_event_queue. 一个协程大致有以下几种唤醒的方式: - 处于immediate_queue中: 当前正在执行immediate_queue的情况下, 会被立即唤醒执行 - 处于nextframe_queue中: 当执行Scheduler::Update() 时会被唤醒执行 - 处于wait_event_queue中: 业务调用相应SchedTaskAwake()来恢复执行 这种机制在单一线程的情况下, 是能够很好的工作的, 业务侧如 IO 和 Network 等自己处理好相关的异步逻辑, 在主线程对相关的 coroutine 进行唤醒, 就能很好的完成相关的工作了. 通过引入一个中间层的 coroutine 调度器, 我们很好的达成了以下目标: - 原有实现 "库作者向" -> 通过协程调度器对业务隔离特性复杂度, 业务使用简洁. - 协程无管理状态自由使用 -> 有集中的地方对系统中协程的整体运行状态做管理和监控. - 对象生命周期的手动管理 -> C 20 compiler 自动处理栈变量, 低心智负担的业务开发模式.

但很多时候我们也会不可避免的面临多线程的环境, 很显然, 原来的这套 coroutine && Scheduler 实现对于多线程来说, 是没法简单平移使用的. 而笔者在给我们 CrossEngine 添加协程支持的时候, 就碰到了这种情况.

在开始解决具体的多线程 coroutine 调度问题前, 我们先来以 ASIO Lambda Post 为例, 回顾多线程对通用任务调度相关的知识, 后续再来讨论协程相关的内容.


2. ASIO 多线程调度 - lambda post 应用介绍

尽管我们通常将ASIO作为网络库使用,但实际上,它在支持通用任务调度方面也表现出色。借助C 11引入的lambda和函数对象,我们可以将通用任务包装成lambda,然后使用post()方法将其提交到某个io_context上, 整个任务派发的过程也是现在众多游戏引擎所使用的lambda post式的异步任务派发机制。大体的过程如下图所示:

我们一般是通过io_context内的scheduler implpost(), dispatch(), defer()这三个方法之一将业务侧的lambda传递给asio, asio会将对应的lambda存储为一个operation, 也就是一个任务, 而具体的operation最后会被执行io_context::run()的线程所执行.

[!hint] 需要注意的是asio没有使用句柄式的方式对operation进行管理, 在需要返回值的情况下, 是通过额外的async_result的模板来完成异步传值等操作的. 下文中我们会对async_result做简单的介绍.

2.1 项目应用实例简介

ASIO所使用调度器本身就是一个很通用的lambda post机制, 所以将ASIO作为通用的并发框架当然也是切实可行的。实际上,网易的许多项目都采用了这种方法。最初是他们的服务器将ASIO作为底层并发框架,后来知名度较高的Messiah引擎也借鉴和发扬了这种方式,将ASIO作为底层基础的并发框架。

当然, 实际项目的使用中一般会将ASIO作简单的包装, 为了方便大家的理解, 这里直接以笔者所在的CrossEngine项目举例(CrossEngine是一个游戏引擎, 下文我们简称CE), 方便大家理解如何将ASIO用作通用的异步调度器的.

2.1.1 隔离式的ASIO使用

游戏引擎中一般会涉及到多个线程之间的任务调度, 下图是CE框架层中的asio::io_context与线程的关系和分组:

JobSystem图

整体的封装是比较简洁的: 1. 外围的JobSystem负责对所有的JobSlot进行管理 2. 每个JobSlot一一对应一个asio::context 3. 每个JobSlot会创建一组线程池用于其关联的asio::io_context的任务的调度, 也就是每个线程调用io_context::run()来执行投递来的任务. 4. 主线程(逻辑线程)是比较特殊的存在, 我们一般是使用手动驱动其工作的模式. 5. 业务侧使用JobType枚举来选择对应的asio::io_context来进行任务的投递, 这样就对业务侧适当隔离了asio本身, 枚举也易于记忆和使用.

2.1.2 JobType 简介

JobType 本身也是一种业务侧对任务进行分组的方式, 不同的 JobType 对应的是某一类粒度或者业务特性相近的任务, 如 kWorkJob, 对应的是一组工作线程, 我们希望在其上执行的任务粒度都是非常小的, 这样在有很多任务被投递到工作线程上的时候, 它们可以很好的并发, 而不是出现长时间等待另外一个任务完成后才能被调度的情况.

具体在CE框架层中对应JobType的定义如下:

代码语言:javascript复制
enum class JobType : int {
  kLogicJob = 0,       // logic thread(main thread)
  kWorkJob,            // work thread
  kSlowJob,            // slow work thread(run io or other slow job)
  kNetworkJob,         // add a separate thread for network
  kNetworkConnectJob,  // extra connect thread for network
  kLogJob,             // log thread
  kNotifyExternalJob,  // use external process to report something, 1 thread only~~
  kTotalJobTypes,
};

JobType的具体使用是: - kLogicJob - 主线程(逻辑线程)执行任务 - kWorkJob - Work Thread线程池执行任务(多个), 一般是计算量可控的小任务 - kSlowJob - IO专用线程池, IO相关的任务投递到本线程池 - kNetworkJob - 目前tbuspp专用的处理线程 - kNetworkConnectJob - 专用的网络连接线程, tbuspp模式下不需要 - kLogJob - 日志专用线程, 目前日志模块是自己起的线程, 可以归并到此处管理 - kNotifyExternalJob** - 专用的通知线程, 如lua error的上报, 使用该类型

2.1.3 一个简单的文件异步读取示例

对于一个简单的异步任务, 它可能的执行状态是先在某个线程上做阻塞式的执行, 然后再回归主线程进行回调, 如下图所示:

代码语言:javascript复制
sequenceDiagram
    Logic Job ->> Work Job: calculate task
    Work Job ->>-Logic Job: calculate result

这里我们给出CE中的异步文件读取代码为例:

代码语言:javascript复制
auto ticket = GJobSystem->RequestTicket();
  auto fullPath = GetFullPath(relPath);
  GJobSystem->Post(
      [this, ticket, relPath, fullPath, loadFunc]() {
        ByteBufferPtr outBuf;
        try {
          // ... Code read file from system to outBuf ignore here.
        } catch (std::exception& ex) {
          ERR_DEF("Read file failed, name:%s, err:%s", fullPath.c_str(), ex.what());
        }

        GJobSystem->Post(
            [outBuf, ticket, relPath, loadFunc]() {
              if (ticket) {
                loadFunc(ticket, relPath, "", outBuf);
              }
            },
            JobType::kLogicJob);
      },
      JobType::kSlowJob);

  return ticket;

我们用两次Post()完成了文件的异步读取: 1. 第一次Post()后的任务会在kSlowJob上执行, 最后会被投递到JobSystem图上的两个Slow Thread之一进行执行. 2. 在完成文件的IO后, 会进行第二次的Post(), 将文件读取的结果投递给主线程, 在主线程回调相关的callback.

2.1.4 流水线式任务的示例

在CE中, 结合对asio::strand的封装, 对于下图中的流水线式任务:

代码语言:javascript复制
sequenceDiagram
    participant L as Logic Job
    participant W1 as Work Job1
    participant W2 as Work Job2
    participant W3 as Work Job3
    L ->>W1: part 1
    activate W1
    W1 ->>W2: part 2
    deactivate W1
    activate W2
    W2 ->>W3: part 3
    deactivate W2
    activate W3
    W3 ->>W2: part 4
    deactivate W3
    activate W2
    W2 ->>L: return
    deactivate W2

我们直接使用代码:

代码语言:javascript复制
auto strand = GJobSystem->request_strand(gbf::JobType::kWorkJob);
starnd.post([](){ 
    //part1~
    // ...
});
starnd.post([](){ 
    //part2~
    // ...
});
starnd.post([](){ 
    //part3~ 
    // ...
});
starnd.post([](){ 
    //part4~ 
    // ...
});
starnd.post([](){ 
    GJobSystem->post([](){
        //return code here
        // ...
    }, gbf::JobType::kLogicJob); 
});

就完成了这类链式任务的实现, 这样也能避免让具体的业务关注过于底层的复杂设计.

2.1.5 lambda post小议

对于lambda post类型的JobSystem实现来说, 整体设计上都是大同小异的, 可能差别比较多的地方主要体现在这两处: 1. 线程池的表达, 像CE这种是比较简约的设计, 某个线程创建后, 它对应执行的任务类型就被固定下来了, 但部分引擎如Halo, 使用的是更具公用性的线程, 一个线程可以对某几类任务进行调度. 后者的设计实现更紧凑, 间接可以实现减少总线程数, 那肯定也意味着更低的thread context switch了, 但底层的任务获取也会相对更复杂一些. 2. 依赖asio::strand这类设施, 我们能够补齐多工作线程上的线性表达能力, 但对于更复杂的DAG类型的组合任务表达, 每个引擎可能都会有自己差异化的实现. 本系列主要关注的是asio本身, 这部分暂时不进行展开了.


3. 多线程环境下协程实现面临的挑战

CE 底层实现了类似上文 ASIO lambda post 的机制, 并进一步提供了对异步任务的 DAG 支持. 目前项目希望引入协程对其中一部分代码的实现进行重构, 以使其有更简单的实现, 同时 CE 本身也是运行在前面所说的固定线程池状态下的:

JobSystem图

这种情况下, 我们想引入协程, 初步考虑, 有以下这些方案可供选择.


3.1 解决问题的思路 - 方案A

前文<<从无栈协程到 C 异步框架>>中我们提到过, 我们已经有一版在单一线程下工作良好的 coroutine 封装了, 那么最简单的想法, 我们直接使用这一版实现, 进行适当的调整, 我们将这种思路简称为 方案A, 那么 方案A 是否可以满足项目的需求呢?

调整的思路也比较直接, 为每个有 coroutine 需要的线程提供专有的 Scheduler, 大家互不影响, 这样每个线程内都依然是一个单线程协程执行环境. 但我们很快就面临了一些问题: - 多个线程都有 Scheduler, 存在引入混乱的风险, 可能会有Scheduler误用的情况发生. - 像 kWorkJob 这种本身就是一组线程的情况, 无法很好的支持. - 更糟糕的, 缺乏统一调度, 整个系统最后的执行状态可能出现 "一核有难, 众核围观" 的名场面了:

缺乏集中的调度和管理, 以及明确的系统级调度支持, 很难可控的在现有的CE下引入协程, 方案A 肯定是不太可行的了, 我们需要寻求更合适的解决方案.


3.2 解决问题的思路 - 方案B

ASIO 新版本也有两个目前还是 Experimental 状态的协程实现, 在 方案A 并不可行的情况下, 我们把目光投向了 ASIO coroutine 实现, 这两版实现也是天然支持多线程的, 那么我们是否可以将原来的单线程 Scheduler 跟 ASIO 做适当的结合, 以此作为我们多线程 coroutine 调度的基础呢?

我们先来看asio croutine里的一段代码:

代码语言:javascript复制
template <typename Executor, typename R, typename... Args>
class async_result<use_awaitable_t<Executor>, R(Args...)>
{
public:
  template <typename Initiation, typename... InitArgs>
  static auto initiate(
    Initiation initiation,
    use_awaitable_t<Executor> u, 
    InitArgs... args) -> return_type
  {
    co_await [&] (auto* frame) {
      return do_init(frame, initiation, u, args...);
    };

    for (;;) {} // Never reached.
#if defined(_MSC_VER)
    co_return dummy_return<typename return_type::value_type>();
#endif // defined(_MSC_VER)
  }
};

整个 ASIO 对通用任务的定制是通过 async_result<>来提供的, 而 coroutine 本身也是通过对 async_result<> 的一个特化来包装实现的. 我们可以直接认为这里是 ASIO coroutine 的执行起点, 从这里开始, asio的实现就在wrapper的路上一去不复返了, do_init() 实现本身, 又是一圈很厚重的 wrapper 实现. 当然还有部分细心的读者看到的匪夷所思的 for(;;){} 死循环以及其后的注释 Never reached, 过多的细节我们不再展开, 直接来看一下笔者对 ASIO 这套实现的总结: 1. 整体代码wrapper较多, 单一coroutine生命周期内代码多次跳转, 增加了不少复杂度 2. 与thread级别的task(Function)比, coroutine的实现缺乏集中的调度器, 父子之间的表达也未转移到相关的对象上, 给分析定位问题造成了进一步的困难 3. 缺乏与thread级别调度上的打通和呼应 4. 定制点问题, 本身设计更多的是考虑与经典callback的结合, 以及最后返回值的callback承接, 对于其他的定制方式, 存在一些局限性. 总结性的看的话, 业务直接上这一套, 理解和维护成本都比较高, 追踪定位问题也非常复杂, 所以 方案B 也是不合适的. 方案A方案B 都不行, 留给我们的可选项不多了, 那么我们接下来应该怎么办呢?


4. 新的多线程协程设计

前面提到的 方案A方案B 都不适用, 我们 "零基" 思考一波, 抛开 ASIO 现有的 coroutine 实现, 如果只考虑 ASIO 实现质量非常高的多线程通用调度这部分, 我们基于此重新考虑在其上的协程调度实现, 是否可以达成我们预期的目标呢? 这种思路也是我们的 - 方案C.


4.1 设计总览

其实asio coroutine现状的来源, 很大程度是没有规划中间层, 如果我们引入: - 跨excution contextcoro_service_manager - 和跟execution_context一一对应的coro_service

通过两层调度器的设计, 各层调度器各司其职, 其实是能够设计出更清晰的执行逻辑的, 整体的设计如下图所示:

接下来我们对设计中的三个核心对象进行具体的展开.


4.2 coro_service

内层的调度器 coro_service 是真正负责协程执行的地方, 如图中所示, 每个 asio::execution_context 都会创建并关联一个 coro_service, coro_service 利用 asio::execution_contextdispatch() 来派发 coroutine 的 resume() 到对应的线程去执行, 另外需要注意的是 coro_service 中实际调度的都是从 coroutine 转换而来的 iasync_task, 这样更有利于我们存储管理 coroutine 的执行状态. 这样做的好处也是相当明显的: - 有线程相关的协程调度器存在, 弥补了 ASIO coroutine 实现缺乏统一管理的问题, 也弥补了监控调试困难的问题. - 通过 coro_service 基本统一了 thread/coroutine 的管理. - 不依赖 asio::async_result<> 做实现扩展, 这样也就没有了asio::async_result<>本身带来的设计限制, 也避免了整体实现陷入 ASIO coroutine 大量依赖 Wrapper 处理各种业务的尴尬局面.


4.3 coro_service_manager

coro_service_manager 对比与 execution_context 一一对应的coro_service, 是具有全局性的, 大部分跨线程相关的操作, 我们都在 coro_service_manager 处进行处理, 这也是两层设计的目的之一, 外层全局性质的 coro_service_manager 负责跨线程部分的处理, 而 coro_service 则主要负责 coroutine 的 resume() 的执行. 同时 coro_service_manager 也是 coroutine 的管理入口, 可以通过它来发起 coroutine 相关的 create(), destroy(), awake() 等操作. 它的存在弥补了 coro_service 本身缺乏的跨线程支持的这部分能力, 同时它与 coro_service 也是分工明确, 各司其职的.


4.4 以 iasync_task 为中心的设计

当然, 除了与execution_context结合的这部分, 我们也需要将callstack打理等具体coroutine相关的状态和操作集中到一个地方, 我们的设计里叫iasync_task, 以它为中心, 重新考虑coroutine在多线程环境下的执行情况:

多线程环境下, 我们将操作iasync_task的线程分为两类: 1. iasync_task的工作线程 -> 负责执行resume()的线程 2. iasync_task的操作线程 -> 调用terminate()等非resume()操作异步任务的线程. 因为coroutine本身的线性执行特点, 从工作线程的角度出发, 有且只会有一个工作线程对同一个 iasync_task 执行resume()操作, 那么如果我们将操作线程的所有操作缓存, 变成operate_queue, 并对它加锁, 在执行resume()的时候去真正操作, 协程的多线程问题就变成了单线程问题了. 当然, 实际操作中, 我们需要遵守一定的规则: 1. 操作线程发起的所有操作, 原则上要通过operate queue进行. 2. 操作线程除了awake()外, 原则上只访问operate queue间接完成自己的操作. 以co_service_manager发起terminate()的实现为例, terminate()调用时仅仅仅只是以有锁的方式在iasync_taskoperate_queue中追加一个等待执行的operate, 当线程下次被工作线程Resume()时, 才会真正的执行Terminate()操作. 这样设计的好处是锁都是小粒度的了, 执行也兼顾了coroutine本身的特点, 避免卷入复杂的数据加锁等操作上. 通过这样的设计, 操作coroutine的线程角色也就明确了, 我们就能够根据不同的线程角色来设计接口. 最后我们来看一下对应coroutine被唤醒的情况:


4.5 Awake机制

多线程环境下, Awake也是采用集中式带lock的方式来处理的, coro_service_manager上会维护相关的Awake Table, Awake Table一般是由协程本身进行插入, 业务逻辑进行唤醒并删除的, 所以本身就是并发的, 需要带锁, 但因为Awake Table的每项数据比较少, Awake执行的操作也仅仅是对对应slot发起Dispatch(), 这里的锁也是轻量的. 而且仅是在需要唤醒的情况下才会工作. 对实际性能的影响比较可控. 另外一点就是IAsyncTask加入Awake Table的时候, 会告知当前的job type, 这样下次唤醒的时候就知道对应的Dispatch() slot了. 另外一点就是 对于sleepnextframe等操作, 因为是直接在工作线程上发起的操作, 并且唤醒操作也是由工作线程本身负责, 所以这里也不需要操作到manager上的wait list, 相当于特定实现的fast path, 进一步降低了对全局manager的依赖.


4.6 章节小结

通过重新明确多线程环境下协程的管理器, 将原来单线程版本的实现拆分为偏全局的coro_service_manager和与execution_context一一对应的coro_service, 通过它们来完成对coroutine的调度管理, 另外通过集中的IAsyncTask完成对协程在跨线程使用状态下的定义, 我们以一个对asio原有的跨线程调度机制低侵入的方式, 重新设计了一个有完整调度器和协程本身状态维护的跨线程版本的协程框架. 下面我们接着展开具体的实现.


5. 具体的实现

先来看一下相关的测试代码(c 20为例):

代码语言:javascript复制
void Task20TestCoroutineWithReturn(coro_service_manager& coro_manager) {
  coro_manager.create_task20(
    JobType::kLogicJob,
    []() -> cotask20<> {

    auto co_ret = co_await tasks::spawn_task(JobType::kWorkJob,
      []() -> cotask20<int> {
        printf("co with return called, in job type:%s!n", gbf::jobs::job_system::this_thread_job_type_name());
        co_await tasks::sleep(500);
        co_return 5;
      });

    printf("co_ret = %dn", co_ret.value());

    printf("before transfer run in :%sn", co_query_job_type_name());
    co_await tasks::transfer(JobType::kWorkJob);
    printf("after transfer run in :%sn", co_query_job_type_name());

    co_return;
  });
}

TEST(ASYNC_TASK20_TEST, CROUTINE_TEST) {
  gbf::jobs::job_system tmp_system;
  tmp_system.init_in_main_thread();
  tmp_system.add_new_slot(JobType::kWorkJob, 1);

  auto& coro_manager = tmp_system.coro_service_manager();
  Task20TestCoroutineWithReturn(coro_manager);

  gbf::threads::ThreadTimer tmp_timer;

  tmp_timer.Reset();

  do {
    gbf::threads::ThisThread::Sleep(1);
    tmp_system.update();
  } while (tmp_timer.GetMilliseconds() < 5000);

  tmp_system.destroy();
}

对应的输出如下:

代码语言:javascript复制
co with return called, in job type:WorkJob!
co_ret = 5
before transfer run in :LogicJob
after transfer run in :WorkJob

上面的示例简单的演示了coroutine在多个线程的迁移和返回值的处理, 可以看到, 通过相关的实现, 我们能够很好的监控和控制协程在指定的线程进行调度, 并且由于协程本身的特点, 用于表达跨线程的线性逻辑, 这套机制是非常适合的.


5.1 框架实现

我们先从一个异步任务的处理流程来剖析一下整个调度器的实现, 先来看一下概览图:

对于一个coroutine来说, 它在调度器中的调度步骤大概如下: 1. manager::create_task() 2. manager::dispatch_async_task_impl() 3. coro_service::dispatch() 4. iasync_task::resume() 5. 根据awaitable::susppend()的返回值做进一步的处理 a. co_service::add_to_next_frame_queue() b. co_service::register_timeout_for_task() c. manager::request_task_suspend() 6. a/b/c条件达成后, 跳转至步骤3继续执行, 直到coroutine执行结束.

整体的流程并不复杂, 最终我们还是利用到了asioexecution_context的能力, 直接dispatch()一个包含iasync_task::resume()调用的lambda到具体的线程上去执行. 跟asio coroutine对比, 最大的差异是我们有了一个多层的调度器表达, 所有的实现不需要像asio coroutine那样:

[!info] 强行依赖async_result, 多次wrapper以处理coroutine callstack的问题, 导致代码很难维护.

新的跨线程实现中, 各层各司其职, 大部分流程都是非常固定的, 对于iasync_task的调度处理, 主要都集中到了coro_service::dispatch()本身, 接下来我们具体展开一下coro_service::dispatch()的实现.


5.1.1 coro_service::dispatch() 的实现

代码语言:javascript复制
void coro_service::dispatch(const async_task_ptr& async_task) {
  job_slot_.dispatch([atask = async_task, this]() {
    do {
      auto running_state = atask->resume(job_slot_.job_type());
      if (running_state < 0) {
        // coroutine run finish, end loop
        break;
      }

      auto await_mode = atask->await_mode();
      if (await_mode == AwaitMode::kAwaitNextframe) {
        if (GBF_LIKELY(support_next_frame_)) {
          add_to_next_frame_queue(atask);
          break;
        } else {
          // Just ignore await here
          continue;
        }
        // add to next frame run, end loop
        break;
      } else if (await_mode == AwaitMode::kAwaitForNotifyNoTimeout || await_mode == AwaitMode::kAwaitForNotifyWithTimeout) {
        // do suspend handle, end loop
        after_suspend_handle(atask, await_mode, atask->await_timeout());
        break;
      } else if (await_mode == AwaitMode::kAwaitDoNothing) {
        // do nothing, end loop
        break;
      } else if (await_mode == AwaitMode::kAwaitNever) {
        // do nothing, repeat again
        continue;
      } else if (await_mode == AwaitMode::kAwaitUnknown) {
        // Just handle as await never here
        continue;
      } else {
        GBF_ERROR(CanNotRunToHereError());
        break;
      }
    } while (true);
  });
}

代码的实现比较简洁, 主要是向底层的job_slot_(可以简单看成一个execution_context)投递一个lambda, lambda主体是一个do{ }while(true)的循环, 主要负责发起对iasync_taskresume()操作, 并根据resume()的结果决定后续的执行, 也就是图中的5a, 5b, 5c对应的路径. 不同的唤醒方式主要是以下几类: - AwaitMode::kAwaitNextframe - 加入coro_sericenext_frame_queue_, 等待下一帧唤醒 - AwaitMode::kAwaitForNotify?TimeOut - 加入coro_manager_servicewaited_task_map_, 等待业务对其发起唤醒. 其中NoTimeOutWithTimeout的区别主要在于WithTimeout除了将 iasync_task 加入manager的等待队列外, 还会注册额外的timeout timer, 带有额外的超时支持功能 - AwaitMode::kAwaitNever: 不等待下次的唤醒继续执行协程 - AwaitMode::kAwaitNothing: 一些特殊的实现如transfer会自己接管协程的后续调度, 直接使用该项通知协程直接退出协程. 根据iasync_task本身绑定的唤醒类型, 满足唤醒条件后, 对应的iasync_task通常会被重新dispatch()到原来的execution_context继续执行.


5.1.2 transfer的实现

transfer是一个比较特殊的实现, 用于在不同的execution_context上调度coroutine的执行, 用于解决类似下图这类在多个线程间调度协程执行的需求:

前面中的代码中我们也实际实用了它:

代码语言:javascript复制
printf("before transfer run in :%sn", co_query_job_type_name());co_await tasks::transfer(JobType::kWorkJob);
printf("after transfer run in :%sn", co_query_job_type_name());

调度前协程是在JobType::kLogicJob对应的线程执行, 完成transfer重新唤醒后, 它会继续在JobType::kWorkJob对应的线程上被重新唤醒执行后续的逻辑.

transfer() 实现也比较简单, 利用上面提到的AwaitMode::kAwaitNothing, 结合:

代码语言:javascript复制
void coro_service_manager::request_task_transfer(iasync_task* task, JobType src_job_type, JobType target_job_type) {
  //Not need add to await list
  dispatch_async_task_impl(target_job_type, task->shared_from_this());
}

在目标execution_context上重新发起对iasync_task的唤醒操作, 整个transfer流程就顺利完成了, 再次被唤醒执行的协程已经运行在了新的线程上, 这也是我们多线程环境下coroutine调度的一个比较重要的特性, 在协程执行的过程中, 你始终可以选择一个符合预期的工作线程来执行当前协程, 当然也有相关的函数去查询当前协程真正运行的线程信息.


这种实现也会带来连贯性上的好处, 比如以异步读取文件为例: Lambda Post 实现:

代码语言:javascript复制
gbf::jobs::job_ticket_ptr ReadDataFromFileAsync(const std::string_view rel_path,
                                                                       const FileLoadFunction& load_func) {
  auto ticket = GJobSystem->RequestTicket();
  auto full_path = GetFullPath(rel_path);
  GJobSystem->Post(
      [this, ticket, rel_path, full_path, load_func]() {
        ByteBufferPtr out_buf;
        try {
          //... Read file from system implement.
        } catch (std::exception& ex) {
          ERR_DEF("read file:[%s] error:%s", fullPath.c_str(), ex.what());
        }

        GJobSystem->Post(
            [out_buf, ticket, rel_path, load_func]() {
              if (ticket) {
                loadFunc(ticket, rel_path, "", out_buf);
              }
            },
            JobType::kLogicJob);
      },
      JobType::kSlowJob);

  return ticket;
}

Coroutine 实现

代码语言:javascript复制
coro::cotask20<ByteBufferPtr> FileSystemModule::AwaitReadDataFromFile(const std::string_view rel_path) { 
  auto* coro_manager = GJobSystem->GetCoroServiceManager();
  std::string full_path = GetFullPath(rel_path);
  auto ret_val = coro_manager->create_task20(JobType::kSlowJob, [full_path, target_job_type]() -> coro::cotask20<ByteBufferPtr> {
    ByteBufferPtr out_buf;
    try {
      //... Read file from system implement.
    } catch (std::exception& ex) {
      ERR_DEF("read file:[%s] error:%s", full_path.c_str(), ex.what());
    }

    co_await coro::tasks::transfer(JobType::kLogicJob);
    co_return out_buf;
  });

  return ret_val; 
}

对比两者的实现, 可以看到本来内嵌的 Lambda Post 表达被平展为了线性表达, 并且协程本身很好的帮我们在切换线程的过程中保留了上下文, 我们不需要像多线程版实现那样手工capture 需要在第二步操作中使用的变量了, 这对于心智负担来说是极大的节约.


5.2 C 20 版的实现

因为整个设计围绕基本就是围绕 C 20 协程来进行的, 所以 C 20 部分需要特殊处理的地方不多, 这里主要给出几个实现上相对特殊的点.


5.2.1 promise_type 的特殊性

一些相对基础的实现里, 我们都是直接将 promise_type 类内置声明到作为协程函数形式返回值的类里的, 如:

代码语言:javascript复制
template <class ReturnType = void>
struct cotask20 {
  struct promise_type {
    //... Something implement for promise type
  };
};

但很多时候我们可能并不是使用内嵌的方式来定义 promise_type 的, 比如 ASIO所使用的方式:

代码语言:javascript复制
template<typename ... Ts, typename Executor>
struct promise_handler<void(Ts...), Executor>
{
  using promise_type = promise<void(Ts...), Executor>;
};

通过直接使用using promise_type = ???;的方式, 我们也可以正确定义一个符合 C 20 要求的协程形式返回值, 我们可以灵活处理相关的情况.


5.2.2 co_return 的处理难点

co_return关键字在 promise_type 里的实现比较特殊, 返回值是 void 的情况, 以及有非 void 返回值的情况, 对应的函数是不同的, 一个是 return_void(), 一个是 return_value(T&& value), 并且两者不能同时存在, 这给封装带来了极大的不便, 不过我们可以按下面的方式来正确的 Traits 出需要的 return_???() 函数:

代码语言:javascript复制
template <typename ReturnType>
struct return_value_or_void {
  struct type {
    template <typename Value = ReturnType, typename Extra = std::enable_if_t<std::is_convertible_v<Value, ReturnType>>>
    void return_value(Value&& value) noexcept(std::is_nothrow_constructible_v<ReturnType, Value>) {
      //...
    }
  };
};
template <>
struct return_value_or_void<void> {

  struct type {
    void return_void() noexcept { 
        //...
    }
  };
};

然后再让 promise_type 继承这个类, 我们就完成了不同 type 可正确匹配 return_???() 的实现了:

代码语言:javascript复制
template <class ReturnType = void>
struct cotask20 {
  struct promise_type : return_value_or_void<ReturnType>::type {
  };
};

这部分掌握了方法很好实现, 但自己想不那么直观, 这里直接列出来一下.


5.2.3 operator co_await() 操作符重载

我们之前讨论过协程定制的几种方式, 其中 promise_type 中的 await_transform 具有排它性, 也就是如果你定义了 await_transform 的某个特化实现, 那么所有的类型你需要都实现它们的 await_transform, 否则会提示 co_await 找不到匹配的类型. 但特定实现, 比如我们 co_await 一个新创建的子协程, 这种情况我们可以利用 operator co_await() 来完成对特定对象的 co_await 扩展:

代码语言:javascript复制
template <class ReturnType = void>
struct cotask20 {
  auto operator co_await() {
    return tasks::cotask_awaitable<ReturnType>((const iasync_task*)bind_task_);
  }
}

通过重载 co_await 操作符, 我们可以将一个非 Awaitable 对象向一个 Awaitable 对象转换, 如上例中我们将 cotask20<> 转换到 cotask_awaitable<>, 在cotask_awaitable<> 中实现正确的逻辑后, 我们就可以直接像前面的示例中那样, 正确的 co_await cotask20<> 了.


5.3 C 17 版的实现

C 17版的协程实现是利用特殊机制Hack出的一套实现, 缺乏compiler的支持, 存在诸如栈变量使用等限制, 所以我们实现C 17版的目标也比较明确, 尽量对齐与C 20的使用体验, 框架层面提供一些对比C 20缺失的特性: - 与C 20特性对齐的awaitable实现 - 特殊对象如cotask17<ReturnType>依赖的operator co_await()机制 下文我们分别就具体的实现详细展开这些内容.


5.3.1 与C 20特性对齐的awaitable实现

这其实包括两部分的处理: - 以重载的方式提供类似20的 await_ready(), await_suspend(), await_resume() 实现 - co_await表达式对返回值类型的约束和传递第一个部分比较简单, 我们为C 17的基类提供特定的虚函数并在框架中正确处理相关的逻辑, 就能够很好的跟C 20awaitable`对象对齐了:

代码语言:javascript复制
class iawaitable17 {
  virtual bool is_await_ready() { return false; }
  virtual void invoke_suspend(async_task17* task, coro_service_manager* manager) = 0;
  virtual void invoke_resume(async_task17* task, coro_service_manager* manager) {}
};

比较特殊的是第二部分, 我们知道C 20await_resume()的返回值将作为co_await表达式的返回值直接返回给协程, 而在C 17下, 我们肯定是没有办法做到与C 20完全一致的体验的, 但我们还是可以将复杂度转移到iawaitable17和框架本身, 尽量降低业务侧的使用负担.


5.3.2 operator co_await()机制

c 17的标准实现并没有提供co_await关键字, 以及oprerator co_await()操作符重载, 但我们可以先实现一个辅助模板, 再利用它来模拟一个相关实现:

代码语言:javascript复制
//specialized here for enable await transform in c  17 coroutine
template <typename T, typename U = void>
struct cotask17_need_await_transform {
  static constexpr bool value = false;
  //Must implement co17_transform function here when value is true
  // static something_awaitable co17_transform(T&& src); 
};

然后在执行__co_await()操作的时候, 将对应的类型向它对应的awaitable转化, 代码如下所示:

代码语言:javascript复制
template <typename AwaitableType>
  void do_awaitable_suspend(AwaitableType&& awaitable) {
    using transform_helper = cotask17_need_await_transform<AwaitableType>;
    if constexpr (std::is_base_of_v<tasks17::iawaitable17, AwaitableType>) {
      //normal awaitable just call impl version here
      do_awaitable_suspend_impl(std::forward<AwaitableType>(awaitable));
    } else if constexpr (transform_helper::value) {
      //transform awaitable support here, do transform first, then call impl version
      do_awaitable_suspend_impl(transform_helper::co17_transform(std::move(awaitable)));
    } else {
      SAFE_STATIC_ASSERT_FALSE("Invalid awaitable type for async_task17::do_awaitable_suspend()!");
    }
  }

C 17__co_await(cotask17<ReturnType>)举例, 我们先特化它的cotask17_need_await_transform:

代码语言:javascript复制
template <typename T>
struct cotask17_need_await_transform<T, typename std::enable_if_t<cotask17_type_traits<T>::value> > {
  using co_traits = cotask17_type_traits<T>;
  using co_return_type = typename co_traits::co_return_type;
  static constexpr bool value = true;
  static tasks17::cotask17_awaitable<co_return_type> co17_transform(T&& src) {
    return tasks17::cotask17_awaitable<co_return_type>(src.get_bind_task());
  }
};

然后通过已经实现好的 tasks17::cotask17_awaitable<ReturnType> 类, 我们即可做到__co_await(sub_coroutine)的功能了, 对应的代码比较多, 这里不再贴出, 我们主要关注对应的机制本身, 我们也可以来看一下C 20对应的实现:

代码语言:javascript复制
template <class ReturnType = void>
struct cotask20 {
  auto operator co_await() {
    return tasks::cotask_awaitable<ReturnType>((const iasync_task*)bind_task_);
  }
};

一个操作符重载就完成了, 从这里也能看出C 新特性带来的便利, 有特性支持的情况下, 我们可以用极少的代码实现一些复杂的功能.


6. coroutine 的定制点

当前框架下, 我们有三种定制扩展协程的方式: 1. 带具体返回值的协程函数 -> 利用cotask20<RetType>或者cotask17<RetType>我们可以方便的给模块增加协程接口, 用协程实现异步逻辑. 与现在的模块机制可以很好的结合. 2. awaitable -> 一些内置的功能, 如tasks::sleep, tasks::transfer等都可以通过这种方式来实现, 方便提供一些可复用的异步语义给业务侧使用. 3. use_awaitable_t 机制 -> asio coroutine的特色, 对传统callback的自动转换, 大量老业务需要兼容的时候可以考虑这种方式, 但对于新的系统, 很多时候直接实现对应的awaitable或许是更好的选择. 因为机制本身依赖比较重度的wrapper, 目前这种方式暂不支持.


7. 进一步的思考

7.1 DAG支持

taskflow的示例来说, 对于下图:

一个可能的taskflow代码实现如下:

代码语言:javascript复制
#include <taskflow/taskflow.hpp>  // Taskflow is header-only

int main(){

  tf::Executor executor;
  tf::Taskflow taskflow;

  auto [A, B, C, D] = taskflow.emplace(  // create four tasks
    [] () { std::cout << "TaskAn"; },
    [] () { std::cout << "TaskBn"; },
    [] () { std::cout << "TaskCn"; },
    [] () { std::cout << "TaskDn"; } 
  );                                  

  A.precede(B, C);  // A runs before B and C
  D.succeed(B, C);  // D runs after  B and C

  executor.run(taskflow).wait(); 

  return 0;
}

可以看到, 对于DAG来说, 直接显式的对node进行编码连接, 比我们使用wait()等去模拟同样的实现要直接的多, 使用节点的precede()succeed()样式的表达, 能够更好的适应一些非线性的执行场景实现. taskflow的实现中, 我们可以看到它会为每个lambda生成一个Handle, 每个Handle对应DAG中的一个节点, 也就是上例中的A, B, C, D, 然后再通过 Handleprecede()succeed()操作来进一步描述各 Handle 之间的关系. 而对于asio来说, 设计上, 它并没有为每个lambda分配句柄, 所以我们如果要实现对DAG的友好支持, 可以在已经是按Handle方式来实现的异步coroutine的基础上对DAG本身进行包装, 这是一种可行的方案, 如下面的代码所示:

代码语言:javascript复制
auto dag = create_dag();
auto co_a = dag.create_task(JobType::kWorkJob, []() -> cotask20<> { std::cout << "TaskA" << std::endl; });
auto co_b = dag.create_task(JobType::kWorkJob, []() -> cotask20<> { std::cout << "TaskB" << std::endl; });
auto co_c = dag.create_task(JobType::kWorkJob, []() -> cotask20<> { std::cout << "TaskC" << std::endl; });
auto co_d = dag.create_task(JobType::kWorkJob, []() -> cotask20<> { std::cout << "TaskD" << std::endl; });
//A runs before B and C
dag.precede(co_a, co_b, co_c);
//D runs after B and C
dag.succeed(co_d, co_b, co_c);
dag.dispatch(JobType::kWorkJob).wait();

这种方案的好处就是跟coroutine本身结合得很紧密, 每个节点都能够很自然的使用coroutine对实现进行表达, 缺点也比较明显, 对比post lambda的线程方案来说, 会多出这部分的coroutine创建开销, 但如果系统中涉及的节点数是可控的, 这种方案自然也是比较贴合我们目前设计的实现了. 而且对于lambda post的实现来说, 在节点间传值, 也因为coroutine的使用而变得可能. 我们完全可以扩展相关实现, 支持节点间的值传递.


7.2 关于CrossEngine的异步框架实现最终选型

我们最终的选择是以前面说到的 方案C 为基础, 再结合 DAG 实现, 最终用于支持我们的并发和异步逻辑. 对于未来的 executions 提案, 这里也给出一些我们总结的一些思考, 首先整个executions的抽象如下图所示:

它本身的思路是非常 "高级" 的, 先在 C 中定义一个异步的 DSL, 再以此为基础来对结构化并发做支持, 自然而然的, 这种设计下, 线性表达, DAG 类的非线性表达, 都能在其中很好的实现, 但它也存在底层机制理解成本高, tag_invoke 的使用带来了大量的代码噪声, 相关代码阅读并不是一个很直观的事情, 所以在它正式进入未来的 C 26 标准前, 我们都暂时先不会考虑引入它, 使用目前的多线程 coroutine 调度 DAG 的方式, 在比较长一段时间都能够很好的服务好 CrossEngine项目了. 这里也直接给出我们对几种异构框架实现的横向对比供大家参考:


8. 总结

整个跨线程的协程支持从构思到实现跨越的周期比较长, 很长一段时间没有比较有效的思路, 期间也如前文介绍的那样重新阅读了 ASIO 的 croutine 实现, 以期找到有效的解决方法, 最终还是以多线程通用任务调度作为基础, 重新思考了 coroutine 在多线程状态下的执行情况, 再结合部分来自 taskflow 的设计, 以及 CrossEngine 本身做的 DAG 实现, 才有了目前这版的实现, 整个过程记录于本文方便整理自己思路的同时也方便大家的参考.


9. Reference

  • <<从无栈协程到C 异步框架>>
  • asio官网
  • Modern C Parallel Task Programming - taskflow
  • href="https://www.zhihu.com/people/wuye9036">来自 @空明流转 的友情指导

0 人点赞