前言
最近的新闻里 C 20 已经确认的内容里已经有了协程组件,之前都是粗略看过这个协程草案。最近抽时间更加系统性的看了下接入和实现细节。
我的测试代码都是在MSVC下开启 /await 选项后测试的,在我本地的Linux clang环境中,可以通过 LLVM_CLANG_PREFIX/bin/clang -std=c 2a -O0 -g -ggdb -stdlib=libc -fcoroutines-ts -lc -lc abi -Wl,-rpath=LLVM_CLANG_PREFIX/lib/ test.cpp 编译和运行。
在gcc 10 中,可以使用 g -std=c 20 -O0 -g -ggdb -fcoroutines
并把所有的 std::experimental::
都换成 std::
之后编译运行。
LLVM Clang libc /libc abi的编译安装脚本可以参见: https://github.com/owent-utils/bash-shell/tree/master/LLVM&Clang Installer/7.0/
C 20 的协程基本原理
C 20 整个协程体系是 “无栈协程” 的思路,整个功能是需要结合编译器功能和STL来配合实现的。主要就是三个关键字(co_yield
、 co_await
或 co_return
)和围绕这三个关键字的接入。无栈协程对API的设计是有要求的,C 20 Coroutine也不例外, 编译器在检测到内部有使用 这三个关键字时会对函数的流程做patch,然后它的返回值类型必须符合你所使用的关键字的规范。这三个关键字的规范要求不太一样,下面会列举。
我原本以为的会放在协程的 awaiter 或者 handle 对象闭包里,然后由编译器分析和对闭包内的各级对象进行扩充的引用(类似Rust的那种实现)。但是在测试的MSVC和Clang的协程流程的过程中发现,实际上还是另外堆上分配空间来保存协程函数的栈上数据,并用这种方式实现Zero-Copy的。协程函数的执行栈和主函数执行栈并不在一个地址段内,这和之前猜想的不太一样。所以,C 20 的协程也不能完全说是 “无栈” ,只是在协程函数中需要能够评估出来它需要多少栈空间存数据,不像有栈协程那样会浪费比较大的地址空间且不利于内存页复用。
同时受限于这种设计,在C 20 的协程函数里,动态栈分配是不受支持的。在MSVC下,如果你使用了动态栈分配的函数 ( _alloca
) ,直接编译就不通过了。而在gcc/clang 下如果你使用了动态栈分配的函数 ( alloca
) ,分配出来的栈地址是不会受到协程的管理(即:多个协程分配出来的地址可能是重合的),在使用的时候用户得自己保证如果涉及协程且如何切出的话,运行结果不受这部分动态长度栈数据的影响,因为可能会被其他协程改掉(简单地说就是动态分配出来的栈只能在 co_yield
和 co_await
之前使用)。
不过我觉得类似GCC动态栈的那种方案可以让它支持动态栈空间,就是在栈溢出的signal里再mmap一段地址进去,按需增大栈空间。但是这玩意性能被诟病,信号和缺页中断都不稳定且栈空间地址分散不利于CPU Cache,估计最后也不会被采纳吧。
我目前看的提案以 N4736 为准(还有个使用文档是 p0973r0 )。一旦一个函数被断定为协程函数,那么它会被扩充为如下形式:
代码语言:javascript复制2019-09-29: 更新文档 https://en.cppreference.com/w/cpp/language/coroutines 这个 N4775 比 N4736 完整得多 : http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/n4775.pdf 后面会整合进 P0912R5: http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2019/p0912r5.html 目前还没有太大的变化。
COROUTINE_OBJECT func(args...) {
try {
P p(promise_constructor_arguments);
// 这个P是自己定义的 using P = COROUTINE_OBJECT::promise_type;
// 文档上说promise_constructor_arguments 是空或者函数的参数的左值传入 args... ,但是目前版本的MSVC还仅支持空参数列表
COROUTINE_OBJECT r = p.get_return_object(); // MSVC 1, Clang 2
co_await p.initial_suspend(); // 初始化接口 MSVC 2, Clang 1
// 上面这两行是MSVC的顺序,在clang里上面两行的顺序相反
try {
// 原函数体 ...
p.return_void() or p.return_value(RET) // 取决于函数体里有没有 co_return RET
} catch(...) {
p.unhandled_exception(); // 未捕获的异常接口
}
final_suspend:
co_await p.final_suspend(); // final suspend point
return r;
} catch(...) {
return COROUTINE_OBJECT::promise_type::get_return_object_on_allocation_failure(); // noexcept
}
}
关键字 co_await
先简单理解为判定是否需要切出,后面会有更加详细一点的解释。 而协程函数一切的核心都在于上面的 COROUTINE_OBJECT
类型。里面有一些规范, co_yield
和 co_return
涉及 COROUTINE_OBJECT
里的 COROUTINE_OBJECT::promise_type
类型。必须实现某些公共接口和函数功能接口。而返回到外层的 COROUTINE_OBJECT
对象里也需要保存协程的handle(目前是 std::experimental::coroutine_handle<PROMISE_TYPE>
),以用于后续控制协程的上下文切换使用。 同样, 下面的例子因为必须写一个协程入口对象,所以有一部分比较烦杂的必须接入的代码。
结构如下:
协程闭包类型(promise)
要支持协程函数,首先要准备一个包装的类型,里面包含 promise_type
,然后提供基本的创建、维护handle的函数。比如:
struct coroutine_task {
struct promise_type {
coroutine_task get_return_object() {
return coroutine_task{};
}
bool initial_suspend() const {
return false;
}
bool final_suspend() const {
return false;
}
};
};
然后,就可以声明使用协程函数 coroutine_task f();
了。但是要让他转变为协程函数,还需要至少接入一样协程关键字才行。我们先从最基本的 co_await
开始。
关键字 - co_await
关键字 co_await
是一个操作符,所以我们只要实现这个操作符重载就可以实现协程等待任意类型。 当然,返回的类型要求实现 bool await_ready(T&) noexcept
、 void await_suspend(T&, std::experimental::coroutine_handle<T>) noexcept
、 void await_resume(T&) noexcept
这三个接口。比如我们实现一个 co_wait
后结束的结构,可以按下面这种接入方式:
struct wait_some_times {
int left_times;
std::experimental::coroutine_handle<> handle;
wait_some_times(int t) : left_times(t), handle(nullptr) {}
};
struct suspend_some_times {
wait_some_times& d;
suspend_some_times(wait_some_times& _d): d(_d) {}
bool await_ready() noexcept {
std::cout << "call await_ready: " << d.left_times << std::endl;
return d.left_times <= 0;
}
void await_suspend(std::experimental::coroutine_handle <> h) noexcept {
// 记下来handle以便后面resume用
d.handle = h;
std::cout << "call await_suspend: " << (--d.left_times) << std::endl;
}
void await_resume() noexcept {
std::cout << "call await_resume: " << d.left_times << std::endl;
d.handle = nullptr;
}
};
然后把整个连起来,完整的例子如下:
代码语言:javascript复制#include <iostream>
#include <iomanip>
#include <vector>
#include <memory>
#include <experimental/coroutine>
struct wait_some_times {
int left_times;
std::experimental::coroutine_handle<> handle;
wait_some_times(int t) : left_times(t), handle(nullptr) {}
};
struct suspend_some_times {
wait_some_times& d;
suspend_some_times(wait_some_times& _d) : d(_d) {}
bool await_ready() noexcept {
std::cout << "call await_ready: " << d.left_times << std::endl;
return d.left_times <= 0;
}
void await_suspend(std::experimental::coroutine_handle <> h) noexcept {
// 记下来handle以便后面resume用
d.handle = h;
std::cout << "call await_suspend: " << (--d.left_times) << std::endl;
}
void await_resume() noexcept {
std::cout << "call await_resume: " << d.left_times << std::endl;
d.handle = nullptr;
}
};
struct coroutine_task {
struct promise_type {
coroutine_task get_return_object() {
return coroutine_task{};
}
auto initial_suspend() {
return std::experimental::suspend_never{};
}
auto final_suspend() {
return std::experimental::suspend_never{};
}
void unhandled_exception() {}
void return_void() {}
};
};
auto operator co_await(wait_some_times& x) noexcept {
return suspend_some_times{ x };
}
coroutine_task f(wait_some_times& waiter) {
std::cout << "begin to co_await" << std::endl;
co_await waiter; // 只有前三次会协程切出
co_await waiter;
co_await waiter;
co_await waiter; // 这之后await_ready返回true了,不会再切出
co_await waiter;
std::cout << "end of corotine" << std::endl;
}
int main(int argc, char* argv[]) {
#ifdef __cpp_coroutines
std::cout << "__cpp_coroutines: " << __cpp_coroutines << std::endl;
#endif
wait_some_times waiter{ 3 };
f(waiter);
while (waiter.handle && !waiter.handle.done()) {
std::cout << "about to resume: " << waiter.left_times << std::endl;
// 这里用传出的handle来恢复切入协程
waiter.handle.resume();
}
return 0;
}
输出如下:
代码语言:javascript复制__cpp_coroutines: 201703
begin to co_await
call await_ready: 3
call await_suspend: 2
about to resume: 2
call await_resume: 2
call await_ready: 2
call await_suspend: 1
about to resume: 1
call await_resume: 1
call await_ready: 1
call await_suspend: 0
about to resume: 0
call await_resume: 0
call await_ready: 0
call await_resume: 0
call await_ready: 0
call await_resume: 0
end of corotine
关键字 - co_yield
关键字 co_yield
要求实现 COROUTINE_OBJECT::promise_type::yield_value(参数)
。比较贴近于单独一次异步调用的实现。
简单的描述 co_yield VALUE
就是相当于 co_await p.yield_value(VALUE)
。
#include <iostream>
#include <iomanip>
#include <vector>
#include <memory>
#include <experimental/coroutine>
struct test_rpc_generator {
test_rpc_generator(const test_rpc_generator&) = delete;
test_rpc_generator(test_rpc_generator&& other): coro(other.coro) {
other.coro = nullptr;
};
~test_rpc_generator() {
if (coro) {
coro.destroy();
}
}
struct promise_type;
using handle = std::experimental::coroutine_handle<promise_type>;
struct promise_type {
int* current_value;
static auto get_return_object_on_allocation_failure() {
return test_rpc_generator{nullptr};
}
auto get_return_object() {
return test_rpc_generator{handle::from_promise(*this)};
}
auto initial_suspend() {
current_value = nullptr;
return std::experimental::suspend_never{};
}
auto final_suspend() {
return std::experimental::suspend_always{};
}
void unhandled_exception() {
std::terminate();
}
void return_void() {
}
auto yield_value(int* value) {
current_value = value;
return std::experimental::suspend_always{};
}
};
int* value() const {
if (coro) {
return coro.promise().current_value;
}
return 0;
}
bool move_next(int rpc_result) {
if (coro && coro.promise().current_value) {
*coro.promise().current_value = rpc_result;
}
return coro ? (coro.resume(), !coro.done()) : false;
}
bool await_ready() const {
return !coro || coro.done();
}
private:
test_rpc_generator(handle h) : coro(h) {}
handle coro;
};
test_rpc_generator f() {
int rpc_res1, rpc_res2;
co_yield &rpc_res1;
// _alloca(rpc_res1);
std::cout<< "resumed got rpc_res1: "<< rpc_res1<< "(@"<< &rpc_res1<< ")" << std::endl;
co_yield &rpc_res2;
// _alloca(rpc_res2);
std::cout<< "resumed got rpc_res1: "<< rpc_res1<< "(@"<< &rpc_res1<< ")" << ", rpc_res2: "<< rpc_res2 <<"(@"<< &rpc_res2<< ")"<< std::endl;
}
int main(int argc, char * argv[]) {
#ifdef __cpp_coroutines
std::cout<< "__cpp_coroutines: "<< __cpp_coroutines<< std::endl;
#endif
int rpc_fake_data = 1;
auto g1 = f();
auto g2 = f();
void* detect_addr = malloc(4000);
std::cout << "detect_addr:" << detect_addr << std::endl;
free(detect_addr);
for (bool is_continue = true; is_continue; is_continue = (!g1.await_ready() || !g2.await_ready())) {
if (!g1.await_ready()) {
g1.move_next( rpc_fake_data);
std::cout << "g1 value:" << g1.value() << std::endl;
}
if (!g2.await_ready()) {
g2.move_next( rpc_fake_data);
std::cout << "g2 value:" << g2.value() << std::endl;
}
}
return 0;
}
我这里一次示例输出是:
代码语言:javascript复制__cpp_coroutines: 201703
detect_addr:00881BD0
resumed got rpc_res1: 2(@0087F96C)
g1 value:0087F980
resumed got rpc_res1: 3(@00881B1C)
g2 value:00881B30
resumed got rpc_res1: 2(@0087F96C), rpc_res2: 4(@0087F980)
g1 value:0087F980
resumed got rpc_res1: 3(@00881B1C), rpc_res2: 5(@00881B30)
g2 value:00881B30
关键字 - co_return
最后一个是 co_return
。 这个关键字主要是用于直接退出协程函数的, 因为协程函数的返回值是我们自己定义的这个 COROUTINE_OBJECT
, 所以函数逻辑附带的返回值就要用这个关键字来实现。 这个关键字要求实现 COROUTINE_OBJECT::promise_type::return_value(参数)
或者 COROUTINE_OBJECT::promise_type::return_void()
。如果协程函数中有 co_return VALUE
。则是最终调用了 COROUTINE_OBJECT::promise_type::return_value(VALUE)
, 否则是调用 COROUTINE_OBJECT::promise_type::return_void()
。
我们可以把协程函数的最终结果放在这里面来实现转储到某个地方。这部分的代码和 yield 的很像。就不另外贴了,下面贴一个全部整合到一起的吧。
全功能整合到一起
我们来一个功能完整,并且贴近单线程工程并且时候异步IO的实践的例子。
代码语言:javascript复制#include <iostream>
#include <iomanip>
#include <vector>
#include <memory>
#include <experimental/coroutine>
static std::vector<std::pair<int*, std::experimental::coroutine_handle<> > > g_test_rpc_manager;
static int g_test_rpc_fake_data = 0;
struct test_rpc_generator {
struct test_rpc_data {
int final_value;
int yield_times;
std::vector<std::experimental::coroutine_handle<> > follower;
};
struct promise_type;
using data_ptr = std::shared_ptr<test_rpc_data>;
struct promise_type {
data_ptr data;
static auto get_return_object_on_allocation_failure() {
return test_rpc_generator{ nullptr };
}
auto get_return_object() {
data = std::make_shared<test_rpc_data>();
if (data) {
data->final_value = 0;
data->yield_times = 0;
}
return test_rpc_generator{ data };
}
auto initial_suspend() {
return std::experimental::suspend_never{}; // STL提供了一些自带的awaiter实现,我们其实很多情况下也不需要另外写,直接用STL就好了
}
auto final_suspend() {
return std::experimental::suspend_always{}; // 和上面一样,也是STL自带的awaiter实现
}
void unhandled_exception() {
std::terminate();
}
// 用以支持 co_return
void return_value(int v) {
// 最终co_return时保存最终数据
if (data) {
data->final_value = v;
auto followers = std::move(data->follower);
for (auto& h : followers) {
h.resume();
}
}
}
// 用以支持 co_yield
auto yield_value(int* value) {
// 每次调用都会执行,创建handle用以后面恢复数据
g_test_rpc_manager.emplace_back(std::make_pair(value, std::experimental::coroutine_handle<>::from_address(
std::experimental::coroutine_handle<promise_type>::from_promise(*this).address()
)));
if (data) {
data->yield_times;
}
return std::experimental::suspend_always{};
}
};
// 下面的接入用侵入式的方式支持 co_await test_rpc_generator
// MSVC 目前支持使用非侵入式的方式实现,但是clang不支持
bool await_ready() noexcept {
return value() > 0;
}
void await_resume() {
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": test_rpc_generator resume for " << yield_times() << " time(s)" << std::endl;
}
void await_suspend(std::experimental::coroutine_handle<> h) {
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": test_rpc_generator yield for " << yield_times() << " time(s), wait for " << h.address() << std::endl;
// 记录要恢复父协程
if (h) {
add_follower(h);
}
}
int value() const {
if (data) {
return data->final_value;
}
return 0;
}
int yield_times() const {
if (data) {
return data->yield_times;
}
return -1;
}
void add_follower(std::experimental::coroutine_handle<> h) {
if (data) {
data->follower.emplace_back(std::move(h));
}
}
private:
test_rpc_generator(data_ptr d) : data(d) {}
data_ptr data;
};
// 异步协程函数
test_rpc_generator f() {
int rpc_res1, rpc_res2;
co_yield &rpc_res1;
// _alloca(rpc_res1);
std::cout << "resumed got rpc_res1: " << rpc_res1 << "(@" << &rpc_res1 << ")" << std::endl;
co_yield &rpc_res2;
// _alloca(rpc_res2);
std::cout << "resumed got rpc_res1: " << rpc_res1 << "(@" << &rpc_res1 << ")" << ", rpc_res2: " << rpc_res2 << "(@" << &rpc_res2 << ")" << std::endl;
// 模拟多次RPC然后返回最终结果
co_return rpc_res1 * 100 rpc_res2;
}
// 这里模拟生成数据
void test_rpc_manager_run() {
std::vector<std::pair<int*, std::experimental::coroutine_handle<> > > rpc_manager;
g_test_rpc_manager.swap(rpc_manager);
for (auto& generator : rpc_manager) {
if (generator.first) {
*generator.first = g_test_rpc_fake_data;
}
if (generator.second && !generator.second.done()) {
generator.second.resume();
}
}
}
struct test_task {
using ptr_t = std::shared_ptr<test_task>;
test_task(int ms) : status(0), max_status(ms) {}
bool is_ready() const noexcept {
return status >= max_status;
}
int status;
int max_status;
};
struct test_task_future {
struct promise_type;
using ptr_t = std::shared_ptr<test_task>;
using handle = std::experimental::coroutine_handle<promise_type>;
struct promise_type {
static auto get_return_object_on_allocation_failure() {
return test_task_future{ nullptr };
}
auto get_return_object() {
return test_task_future{ handle::from_promise(*this) };
}
auto initial_suspend() {
return std::experimental::suspend_never{};
}
auto final_suspend() {
return std::experimental::suspend_always{};
}
void unhandled_exception() {
std::terminate();
}
void return_void() {
for (auto& h : follower) {
h.resume();
}
}
// 这里是为了把task::ptr_t数据关联进去,目前版本不支持promise构造函数参数,没想到什么其他好方法
auto yield_value(ptr_t t) {
task = t;
return std::experimental::suspend_never{};
}
ptr_t task;
std::vector<handle> follower;
};
// 下面的接入用侵入式的方式支持 co_await test_task::ptr_t
struct awaitable {
awaitable(const test_task_future& pt) {
if (pt.coro) {
data = pt.coro.promise().task;
coro = pt.coro;
}
};
bool await_ready() const {
bool ret = !data || data->is_ready();
if (ret) {
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << data.get() << " ready" << std::endl;
}
return ret;
}
void await_resume() {
if (data) {
data->status;
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << data.get() << " resume to " << (data ? data->status : -1) << std::endl;
}
}
void await_suspend(handle h) {
if (data) {
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << data.get() << " suspend test_task_future::handle from " << (data ? data->status : -1) <<
", wait for " << h.address() << std::endl;
}
if (coro && h) {
coro.promise().follower.emplace_back(h);
}
}
ptr_t data;
handle coro;
};
bool done() const {
return !coro || coro.done();
}
private:
test_task_future(handle h) : coro(h) {}
handle coro;
};
// 接入 co_await test_task::ptr_t
auto operator co_await(const test_task_future & pt) noexcept {
return test_task_future::awaitable{ pt };
}
test_task_future h(test_task::ptr_t task) {
// 这里是为了把task::ptr_t数据关联进去,目前版本不支持promise构造函数参数,没想到什么其他好方法
co_yield task;
// 模拟任务内部流程并调用外部RPC
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << task.get() << std::endl;
test_rpc_generator rpc_res = f();
co_await rpc_res;
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << task.get() << " call f() ret: " << rpc_res.value() << std::endl;
}
test_task_future g(test_task::ptr_t task) {
// 这里是为了把task::ptr_t数据关联进去,目前版本不支持promise构造函数参数,没想到什么其他好方法
co_yield task;
// 等待子任务完成
while (task && !task->is_ready()) {
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << task.get() << std::endl;
co_await h(task);
}
}
int main(int argc, char* argv[]) {
#ifdef __cpp_coroutines
std::cout << "__cpp_coroutines: " << __cpp_coroutines << std::endl;
#endif
// 创建一个任务
test_task::ptr_t task = std::make_shared<test_task>(3);
// 运行任务
auto fut = g(task);
// 模拟从外部获取数据然会恢复协程
while (!fut.done()) {
test_rpc_manager_run();
}
return 0;
}
这回贴一下linux内clang的输出吧:
代码语言:javascript复制__cpp_coroutines: 201703
g : 280: task 0x1d3c028
h : 268: task 0x1d3c028
await_suspend : 89: test_rpc_generator yield for 1 time(s), wait for 0x1d3c320
await_suspend : 235: task 0x1d3c028 suspend test_task_future::handle from 0, wait for 0x1d3c040
resumed got rpc_res1: 1(@0x1d3c730)
resumed got rpc_res1: 1(@0x1d3c730), rpc_res2: 2(@0x1d3c734)
await_resume : 85: test_rpc_generator resume for 2 time(s)
h : 271: task 0x1d3c028 call f() ret: 102
await_resume : 229: task 0x1d3c028 resume to 1
g : 280: task 0x1d3c028
h : 268: task 0x1d3c028
await_suspend : 89: test_rpc_generator yield for 1 time(s), wait for 0x1d3c850
await_suspend : 235: task 0x1d3c028 suspend test_task_future::handle from 1, wait for 0x1d3c040
resumed got rpc_res1: 3(@0x1d3cc60)
resumed got rpc_res1: 3(@0x1d3cc60), rpc_res2: 4(@0x1d3cc64)
await_resume : 85: test_rpc_generator resume for 2 time(s)
h : 271: task 0x1d3c028 call f() ret: 304
await_resume : 229: task 0x1d3c028 resume to 2
g : 280: task 0x1d3c028
h : 268: task 0x1d3c028
await_suspend : 89: test_rpc_generator yield for 1 time(s), wait for 0x1d3cd40
await_suspend : 235: task 0x1d3c028 suspend test_task_future::handle from 2, wait for 0x1d3c040
resumed got rpc_res1: 5(@0x1d3d150)
resumed got rpc_res1: 5(@0x1d3d150), rpc_res2: 6(@0x1d3d154)
await_resume : 85: test_rpc_generator resume for 2 time(s)
h : 271: task 0x1d3c028 call f() ret: 506
await_resume : 229: task 0x1d3c028 resume to 3
promise/future 支持
我本地测试的Clang版本(7.0.1)尚未实现支持。 MSVC的标准库用偏特化的方式对 std::promise 和 std::future 实现了协程的接入。它的接入代码如下(为了简短,精简掉了一个偏特化实现,流程和不特化的类似):
代码语言:javascript复制namespace experimental {
template <class _Ty, class... _ArgTypes>
struct coroutine_traits<future<_Ty>, _ArgTypes...> { // defines resumable traits for functions returning future<_Ty>
struct promise_type {
promise<_Ty> _MyPromise;
future<_Ty> get_return_object() {
return _MyPromise.get_future();
}
bool initial_suspend() const {
return false;
}
bool final_suspend() const {
return false;
}
template <class _Ut>
void return_value(_Ut&& _Value) {
_MyPromise.set_value(_STD forward<_Ut>(_Value));
}
void set_exception(exception_ptr _Exc) {
_MyPromise.set_exception(_STD move(_Exc));
}
};
};
} // namespace experimental
template <class _Ty>
bool await_ready(future<_Ty>& _Fut) {
return _Fut._Is_ready();
}
template <class _Ty>
void await_suspend(future<_Ty>& _Fut,
experimental::coroutine_handle<> _ResumeCb) { // change to .then when future gets .then
thread _WaitingThread([&_Fut, _ResumeCb] {
_Fut.wait();
_ResumeCb();
});
_WaitingThread.detach();
}
template <class _Ty>
auto await_resume(future<_Ty>& _Fut) {
return _Fut.get();
}
可以看到,它的 co_await std::future<T>
挂起是开了个新线程来等待,真他喵暴力,建议不要用。但是还是可以比较容易地让自己的管理器接入 co_await
。
我们借助STL的协程接入, 可以实现一个最小化的自定义协程支持:
代码语言:javascript复制#include <iostream>
#include <future>
#include <thread>
#include <chrono>
#include <experimental/coroutine>
struct custom_rpc_generator {};
struct suspend_custom_rpc : public std::experimental::suspend_always {
void await_suspend(std::experimental::coroutine_handle<> h) noexcept {
std::thread thd{ [h]() {
using namespace std;
std::this_thread::sleep_for(2s);
std::cout << "start to resume coroutine" << std::endl;
h.resume();
} };
thd.detach();
}
};
auto operator co_await(const custom_rpc_generator&) {
return suspend_custom_rpc{};
}
std::future<void> outter_fn() {
co_await custom_rpc_generator{};
}
int main() {
auto fut = outter_fn();
std::cout << "start to wait future" << std::endl;
fut.wait();
std::cout << "future finished, ready to exit" << std::endl;
return 0;
}
总结
总体感觉上,C 20协程为了兼顾灵活和支持非侵入式接入,设计了好几个互相交织的大模块,函数级有处理协程函数内部的 promise_type
、 协程函数对外暴露的交互对象 (我们这里统称为 future
) 、 用于协程上下文切换的 handle
,单次切换有用于支持await功能的 awaitable
和一些回调函数接口。几个对象之间的数据共享也并不是很方便。而且和传统有栈协程的区别仅仅是约束了返回值类型,并且可以依次在编译期推断出需要多少栈空间,从而减少浪费。
我打算后面有时间尝试对 libcopp 接入C 协程支持,在研究C 协程的时候也想到几个问题。在和 ultramanhu 讨论了一下以后主要的问题也有了一些初步的解决方案的想法,但是目前细节上还是有一些没太想清楚的地方。
首先是如果业务有自己的线程池,其实还是要由管理层来控制resume,不能直接像STL那样开线程,那基本上就和 std::future<T>
say Good-Bye 了。虽然在小心维护的情况下,避免 co_await std::future<T>
也是可以避免STL乱开线程的,但是我觉得一旦使用了,后面就很难控制住。特别是这个C 协程的对象关系互耦合插如此严重的情况下,本身就不容易理解。
第二个问题是调用链。C 协程接口设计是非对称的,我们实际业务中,肯定还是需要对称协程的支持,即子协程结束后能够自动恢复 co_await
它的父协程。这个需要我们自己去实现,上面 std::future<T>
也就是这里是开了个线程去实现的(都开线程了还用协程干啥)。在上面的sample代码中,我是开了一个共享数据区,在 await_suspend 的时候去追加等待链,在 return_void/return_value
的时候去执行父级协程的恢复切入,这里面写成vector是为了支持N个协程await一个协程的情况,实际上用链表会更好一些。这里有个比较“脏”的设计是handle的加入是在 awaitable
执行挂起的时候,而恢复是在 promise_type
的 return_void/return_value
事件里。这就分了两个地方。这里的 awaitable
对 promise_type
和N个协程等一个协程是保持一致的N:1关系。但是 awaitable
仅仅能访问 future
, 要让 promise_type
也能访问 future
的话,一种方法是开一个共享数据块,在 promise_type
创建 future
的时候传进去这样了。这也是上面sample代码里 using data_ptr = std::shared_ptr<test_rpc_data>;
的实现。 MSVC的STL的实现也是这样 promise::get_future()
是使用了promise内部的数据创建的future对象。 另外一种尝试的方法是后面task的,进入协程后先 co_yield
一次,然后 yield_value(VALUE)
函数返回 std::experimental::suspend_never
。这样不会造成协程挂起,并且可以给 promise_type
注入任意数据。不过这样就有个约定式的规范了,也不是很严谨。这方面等后面支持了 promise_type
的带参数构造函数可能可以好一些。
第三个问题是handle提前结束的问题。在看了MSVC的实现,这个handle是可以copy也可以转换的。copy开销也很小,里面只包含一个和 promise_type
地址有关的指针(就是 promise_type
的地址然后加了 align和padding)。比如一个RPC任务,我可能copy一个handle用来在有数据的时候resume,然后我还会copy一个handle在超时的时候强行resume然后走失败流程。现在这种情况,就是需要在 awaitable
的 await_suspend 里添加这两个handle到两个manager里,然后在 await_resume 里要把这两个都移除。不管哪个流程,肯定有一个handle是处于resume回调中的,这还涉及递归调用的问题(删除正在resume过程中的handle)。 我目前的想法是,封装一个handle resumer,让多个事件manager都指向同一个 handle resumer的weak_ptr。这个resumer的生命周期可以和 awaitable
共存,一旦 awaitable
消失,这次的await也就结束了。
libcopp 所有组件都是可拆卸和自定义的,所以剩下还有些细节就是接入哪些东西的哪些接口,如果拆卸掉某些组件之后怎么保证这些接入仍然可用和是否要支持 libcopp 内yield或是await其他C 协程对象。这些在后面结合一些实际应用再取舍吧。
最后,贴一个更详细一点的C 20 Coroutine生成的汇编( https://gist.github.com/owent/aa7b093caddcea5a79f32d0ebf4efa88 )。如果上面有哪些理解不对的地方或者建议,欢迎有兴趣的小伙伴们一起来交流探讨哈。