C++20 Coroutine

2023-03-17 15:25:39 浏览数 (3)

前言

最近的新闻里 C 20 已经确认的内容里已经有了协程组件,之前都是粗略看过这个协程草案。最近抽时间更加系统性的看了下接入和实现细节。

我的测试代码都是在MSVC下开启 /await 选项后测试的,在我本地的Linux clang环境中,可以通过 LLVM_CLANG_PREFIX/bin/clang -std=c 2a -O0 -g -ggdb -stdlib=libc -fcoroutines-ts -lc -lc abi -Wl,-rpath=LLVM_CLANG_PREFIX/lib/ test.cpp 编译和运行。

在gcc 10 中,可以使用 g -std=c 20 -O0 -g -ggdb -fcoroutines 并把所有的 std::experimental:: 都换成 std:: 之后编译运行。

LLVM Clang libc /libc abi的编译安装脚本可以参见: https://github.com/owent-utils/bash-shell/tree/master/LLVM&Clang Installer/7.0/

C 20 的协程基本原理

C 20 整个协程体系是 “无栈协程” 的思路,整个功能是需要结合编译器功能和STL来配合实现的。主要就是三个关键字(co_yieldco_awaitco_return)和围绕这三个关键字的接入。无栈协程对API的设计是有要求的,C 20 Coroutine也不例外, 编译器在检测到内部有使用 这三个关键字时会对函数的流程做patch,然后它的返回值类型必须符合你所使用的关键字的规范。这三个关键字的规范要求不太一样,下面会列举。

我原本以为的会放在协程的 awaiter 或者 handle 对象闭包里,然后由编译器分析和对闭包内的各级对象进行扩充的引用(类似Rust的那种实现)。但是在测试的MSVC和Clang的协程流程的过程中发现,实际上还是另外堆上分配空间来保存协程函数的栈上数据,并用这种方式实现Zero-Copy的。协程函数的执行栈和主函数执行栈并不在一个地址段内,这和之前猜想的不太一样。所以,C 20 的协程也不能完全说是 “无栈” ,只是在协程函数中需要能够评估出来它需要多少栈空间存数据,不像有栈协程那样会浪费比较大的地址空间且不利于内存页复用。

同时受限于这种设计,在C 20 的协程函数里,动态栈分配是不受支持的。在MSVC下,如果你使用了动态栈分配的函数 ( _alloca ) ,直接编译就不通过了。而在gcc/clang 下如果你使用了动态栈分配的函数 ( alloca ) ,分配出来的栈地址是不会受到协程的管理(即:多个协程分配出来的地址可能是重合的),在使用的时候用户得自己保证如果涉及协程且如何切出的话,运行结果不受这部分动态长度栈数据的影响,因为可能会被其他协程改掉(简单地说就是动态分配出来的栈只能在 co_yieldco_await 之前使用)。

不过我觉得类似GCC动态栈的那种方案可以让它支持动态栈空间,就是在栈溢出的signal里再mmap一段地址进去,按需增大栈空间。但是这玩意性能被诟病,信号和缺页中断都不稳定且栈空间地址分散不利于CPU Cache,估计最后也不会被采纳吧。

我目前看的提案以 N4736 为准(还有个使用文档是 p0973r0 )。一旦一个函数被断定为协程函数,那么它会被扩充为如下形式:

2019-09-29: 更新文档 https://en.cppreference.com/w/cpp/language/coroutines 这个 N4775 比 N4736 完整得多 : http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/n4775.pdf 后面会整合进 P0912R5: http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2019/p0912r5.html 目前还没有太大的变化。

代码语言:javascript复制
COROUTINE_OBJECT func(args...) {
    try {
        P p(promise_constructor_arguments);
        // 这个P是自己定义的 using P = COROUTINE_OBJECT::promise_type;
        // 文档上说promise_constructor_arguments 是空或者函数的参数的左值传入 args... ,但是目前版本的MSVC还仅支持空参数列表

        COROUTINE_OBJECT r = p.get_return_object();  //             MSVC 1, Clang 2
        co_await p.initial_suspend();                // 初始化接口  MSVC 2, Clang 1
        // 上面这两行是MSVC的顺序,在clang里上面两行的顺序相反

        try {
            // 原函数体 ...
            p.return_void() or p.return_value(RET) // 取决于函数体里有没有 co_return RET
        } catch(...) {
            p.unhandled_exception();    // 未捕获的异常接口
        }

    final_suspend:
        co_await p.final_suspend();     // final suspend point

        return r;
    } catch(...) {
        return COROUTINE_OBJECT::promise_type::get_return_object_on_allocation_failure(); // noexcept
    }
}

关键字 co_await 先简单理解为判定是否需要切出,后面会有更加详细一点的解释。 而协程函数一切的核心都在于上面的 COROUTINE_OBJECT 类型。里面有一些规范, co_yieldco_return 涉及 COROUTINE_OBJECT 里的 COROUTINE_OBJECT::promise_type 类型。必须实现某些公共接口和函数功能接口。而返回到外层的 COROUTINE_OBJECT 对象里也需要保存协程的handle(目前是 std::experimental::coroutine_handle<PROMISE_TYPE> ),以用于后续控制协程的上下文切换使用。 同样, 下面的例子因为必须写一个协程入口对象,所以有一部分比较烦杂的必须接入的代码。

结构如下:

协程闭包类型(promise)

要支持协程函数,首先要准备一个包装的类型,里面包含 promise_type ,然后提供基本的创建、维护handle的函数。比如:

代码语言:javascript复制
struct coroutine_task {
    struct promise_type {
        coroutine_task get_return_object() {
            return coroutine_task{};
        }
        bool initial_suspend() const {
            return false;
        }
        bool final_suspend() const {
            return false;
        }
    };
};

然后,就可以声明使用协程函数 coroutine_task f(); 了。但是要让他转变为协程函数,还需要至少接入一样协程关键字才行。我们先从最基本的 co_await 开始。

关键字 - co_await

关键字 co_await 是一个操作符,所以我们只要实现这个操作符重载就可以实现协程等待任意类型。 当然,返回的类型要求实现 bool await_ready(T&) noexceptvoid await_suspend(T&, std::experimental::coroutine_handle<T>) noexceptvoid await_resume(T&) noexcept 这三个接口。比如我们实现一个 co_wait 后结束的结构,可以按下面这种接入方式:

代码语言:javascript复制
struct wait_some_times {
    int left_times;
    std::experimental::coroutine_handle<> handle;
    wait_some_times(int t) : left_times(t), handle(nullptr) {}
};

struct suspend_some_times {
    wait_some_times& d;
    suspend_some_times(wait_some_times& _d): d(_d) {}
    bool await_ready() noexcept {
        std::cout << "call await_ready: " << d.left_times << std::endl;
        return d.left_times <= 0;
    }

    void await_suspend(std::experimental::coroutine_handle <> h) noexcept {
        // 记下来handle以便后面resume用
        d.handle = h;

        std::cout << "call await_suspend: " << (--d.left_times) << std::endl;
    }

    void await_resume() noexcept {
        std::cout << "call await_resume: " << d.left_times << std::endl;
        d.handle = nullptr;
    }
};

然后把整个连起来,完整的例子如下:

代码语言:javascript复制
#include <iostream>
#include <iomanip>
#include <vector>

#include <memory>

#include <experimental/coroutine>

struct wait_some_times {
    int left_times;
    std::experimental::coroutine_handle<> handle;
    wait_some_times(int t) : left_times(t), handle(nullptr) {}
};

struct suspend_some_times {
    wait_some_times& d;
    suspend_some_times(wait_some_times& _d) : d(_d) {}
    bool await_ready() noexcept {
        std::cout << "call await_ready: " << d.left_times << std::endl;
        return d.left_times <= 0;
    }

    void await_suspend(std::experimental::coroutine_handle <> h) noexcept {
        // 记下来handle以便后面resume用
        d.handle = h;

        std::cout << "call await_suspend: " << (--d.left_times) << std::endl;
    }

    void await_resume() noexcept {
        std::cout << "call await_resume: " << d.left_times << std::endl;
        d.handle = nullptr;
    }
};

struct coroutine_task {
    struct promise_type {
        coroutine_task get_return_object() {
            return coroutine_task{};
        }
        auto initial_suspend() {
            return std::experimental::suspend_never{};
        }
        auto final_suspend() {
            return std::experimental::suspend_never{};
        }

        void unhandled_exception() {}
        void return_void() {}
    };
};

auto operator co_await(wait_some_times& x) noexcept {
    return suspend_some_times{ x };
}

coroutine_task f(wait_some_times& waiter) {
    std::cout << "begin to co_await" << std::endl;
    co_await waiter; // 只有前三次会协程切出
    co_await waiter;
    co_await waiter;
    co_await waiter; // 这之后await_ready返回true了,不会再切出
    co_await waiter;
    std::cout << "end of corotine" << std::endl;
}

int main(int argc, char* argv[]) {
#ifdef __cpp_coroutines
    std::cout << "__cpp_coroutines: " << __cpp_coroutines << std::endl;
#endif
    wait_some_times waiter{ 3 };
    f(waiter);

    while (waiter.handle && !waiter.handle.done()) {
        std::cout << "about to resume: " << waiter.left_times << std::endl;
        // 这里用传出的handle来恢复切入协程
        waiter.handle.resume();
    }

    return 0;
}

输出如下:

代码语言:javascript复制
__cpp_coroutines: 201703
begin to co_await
call await_ready: 3
call await_suspend: 2
about to resume: 2
call await_resume: 2
call await_ready: 2
call await_suspend: 1
about to resume: 1
call await_resume: 1
call await_ready: 1
call await_suspend: 0
about to resume: 0
call await_resume: 0
call await_ready: 0
call await_resume: 0
call await_ready: 0
call await_resume: 0
end of corotine

关键字 - co_yield

关键字 co_yield 要求实现 COROUTINE_OBJECT::promise_type::yield_value(参数) 。比较贴近于单独一次异步调用的实现。

简单的描述 co_yield VALUE 就是相当于 co_await p.yield_value(VALUE)

代码语言:javascript复制
#include <iostream>
#include <iomanip>
#include <vector>

#include <memory>

#include <experimental/coroutine>

struct test_rpc_generator {
    test_rpc_generator(const test_rpc_generator&) = delete;
    test_rpc_generator(test_rpc_generator&& other): coro(other.coro) {
        other.coro = nullptr;
    };
    ~test_rpc_generator() {
        if (coro) {
            coro.destroy();
        }
    }

    struct promise_type;
    using handle = std::experimental::coroutine_handle<promise_type>;

    struct promise_type {
        int* current_value;
        static auto get_return_object_on_allocation_failure() {
            return test_rpc_generator{nullptr};
        }

        auto get_return_object() {
            return test_rpc_generator{handle::from_promise(*this)};
        }

        auto initial_suspend() {
            current_value = nullptr;
            return std::experimental::suspend_never{};
        }

        auto final_suspend() {
            return std::experimental::suspend_always{};
        }

        void unhandled_exception() {
            std::terminate();
        }

        void return_void() {
        }

        auto yield_value(int* value) {
            current_value = value;
            return std::experimental::suspend_always{};
        }
    };


    int* value() const {
        if (coro) {
            return coro.promise().current_value;
        }

        return 0;
    }

    bool move_next(int rpc_result) {
        if (coro && coro.promise().current_value) {
            *coro.promise().current_value = rpc_result;
        }

        return coro ? (coro.resume(), !coro.done()) : false; 
    }

    bool await_ready() const {
        return !coro || coro.done();
    }

private:
    test_rpc_generator(handle h) : coro(h) {}
    handle coro;
};

test_rpc_generator f() {
    int rpc_res1, rpc_res2;
    co_yield &rpc_res1;
    // _alloca(rpc_res1);
    std::cout<< "resumed got rpc_res1: "<< rpc_res1<< "(@"<< &rpc_res1<< ")" << std::endl;

    co_yield &rpc_res2;
    // _alloca(rpc_res2);
    std::cout<< "resumed got rpc_res1: "<< rpc_res1<< "(@"<< &rpc_res1<< ")" << ", rpc_res2: "<< rpc_res2 <<"(@"<< &rpc_res2<< ")"<< std::endl;
}

int main(int argc, char * argv[]) {
#ifdef __cpp_coroutines
    std::cout<< "__cpp_coroutines: "<< __cpp_coroutines<< std::endl;
#endif

    int rpc_fake_data = 1;
    auto g1 = f();
    auto g2 = f();
    void* detect_addr = malloc(4000);
    std::cout << "detect_addr:" << detect_addr << std::endl;
    free(detect_addr);

    for (bool is_continue = true; is_continue; is_continue = (!g1.await_ready() || !g2.await_ready())) {
        if (!g1.await_ready()) {
            g1.move_next(   rpc_fake_data);
            std::cout << "g1 value:" << g1.value() << std::endl;
        }

        if (!g2.await_ready()) {
            g2.move_next(   rpc_fake_data);
            std::cout << "g2 value:" << g2.value() << std::endl;
        }
    }
    return 0;
}

我这里一次示例输出是:

代码语言:javascript复制
__cpp_coroutines: 201703
detect_addr:00881BD0
resumed got rpc_res1: 2(@0087F96C)
g1 value:0087F980
resumed got rpc_res1: 3(@00881B1C)
g2 value:00881B30
resumed got rpc_res1: 2(@0087F96C), rpc_res2: 4(@0087F980)
g1 value:0087F980
resumed got rpc_res1: 3(@00881B1C), rpc_res2: 5(@00881B30)
g2 value:00881B30

关键字 - co_return

最后一个是 co_return 。 这个关键字主要是用于直接退出协程函数的, 因为协程函数的返回值是我们自己定义的这个 COROUTINE_OBJECT , 所以函数逻辑附带的返回值就要用这个关键字来实现。 这个关键字要求实现 COROUTINE_OBJECT::promise_type::return_value(参数) 或者 COROUTINE_OBJECT::promise_type::return_void() 。如果协程函数中有 co_return VALUE 。则是最终调用了 COROUTINE_OBJECT::promise_type::return_value(VALUE) , 否则是调用 COROUTINE_OBJECT::promise_type::return_void()

我们可以把协程函数的最终结果放在这里面来实现转储到某个地方。这部分的代码和 yield 的很像。就不另外贴了,下面贴一个全部整合到一起的吧。

全功能整合到一起

我们来一个功能完整,并且贴近单线程工程并且时候异步IO的实践的例子。

代码语言:javascript复制
#include <iostream>
#include <iomanip>
#include <vector>

#include <memory>

#include <experimental/coroutine>

static std::vector<std::pair<int*, std::experimental::coroutine_handle<> > > g_test_rpc_manager;
static int g_test_rpc_fake_data = 0;

struct test_rpc_generator {
    struct test_rpc_data {
        int final_value;
        int yield_times;

        std::vector<std::experimental::coroutine_handle<> > follower;
    };

    struct promise_type;
    using data_ptr = std::shared_ptr<test_rpc_data>;

    struct promise_type {
        data_ptr data;
        static auto get_return_object_on_allocation_failure() {
            return test_rpc_generator{ nullptr };
        }

        auto get_return_object() {
            data = std::make_shared<test_rpc_data>();
            if (data) {
                data->final_value = 0;
                data->yield_times = 0;
            }
            return test_rpc_generator{ data };
        }

        auto initial_suspend() {
            return std::experimental::suspend_never{}; // STL提供了一些自带的awaiter实现,我们其实很多情况下也不需要另外写,直接用STL就好了
        }

        auto final_suspend() {
            return std::experimental::suspend_always{}; // 和上面一样,也是STL自带的awaiter实现
        }

        void unhandled_exception() {
            std::terminate();
        }

        // 用以支持 co_return
        void return_value(int v) {
            // 最终co_return时保存最终数据
            if (data) {
                data->final_value = v;

                auto followers = std::move(data->follower);
                for (auto& h : followers) {
                    h.resume();
                }
            }
        }

        // 用以支持 co_yield
        auto yield_value(int* value) {
            // 每次调用都会执行,创建handle用以后面恢复数据
            g_test_rpc_manager.emplace_back(std::make_pair(value, std::experimental::coroutine_handle<>::from_address(
                std::experimental::coroutine_handle<promise_type>::from_promise(*this).address()
            )));

            if (data) {
                  data->yield_times;
            }

            return std::experimental::suspend_always{};
        }
    };

    // 下面的接入用侵入式的方式支持 co_await test_rpc_generator
    // MSVC 目前支持使用非侵入式的方式实现,但是clang不支持
    bool await_ready() noexcept {
        return value() > 0;
    }

    void await_resume() {
        std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": test_rpc_generator resume for " << yield_times() << " time(s)" << std::endl;
    }

    void await_suspend(std::experimental::coroutine_handle<> h) {
        std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": test_rpc_generator yield for " << yield_times() << " time(s), wait for " << h.address() << std::endl;

        // 记录要恢复父协程
        if (h) {
            add_follower(h);
        }
    }

    int value() const {
        if (data) {
            return data->final_value;
        }
        return 0;
    }

    int yield_times() const {
        if (data) {
            return data->yield_times;
        }

        return -1;
    }

    void add_follower(std::experimental::coroutine_handle<> h) {
        if (data) {
            data->follower.emplace_back(std::move(h));
        }
    }
private:
    test_rpc_generator(data_ptr d) : data(d) {}
    data_ptr data;
};

// 异步协程函数
test_rpc_generator f() {
    int rpc_res1, rpc_res2;
    co_yield &rpc_res1;
    // _alloca(rpc_res1);
    std::cout << "resumed got rpc_res1: " << rpc_res1 << "(@" << &rpc_res1 << ")" << std::endl;

    co_yield &rpc_res2;
    // _alloca(rpc_res2);
    std::cout << "resumed got rpc_res1: " << rpc_res1 << "(@" << &rpc_res1 << ")" << ", rpc_res2: " << rpc_res2 << "(@" << &rpc_res2 << ")" << std::endl;

    // 模拟多次RPC然后返回最终结果
    co_return rpc_res1 * 100   rpc_res2;
}

// 这里模拟生成数据
void test_rpc_manager_run() {
    std::vector<std::pair<int*, std::experimental::coroutine_handle<> > > rpc_manager;
    g_test_rpc_manager.swap(rpc_manager);

    for (auto& generator : rpc_manager) {
        if (generator.first) {
            *generator.first =   g_test_rpc_fake_data;
        }

        if (generator.second && !generator.second.done()) {
            generator.second.resume();
        }
    }
}

struct test_task {
    using ptr_t = std::shared_ptr<test_task>;

    test_task(int ms) : status(0), max_status(ms) {}

    bool is_ready() const noexcept {
        return status >= max_status;
    }

    int status;
    int max_status;
};

struct test_task_future {
    struct promise_type;
    using ptr_t = std::shared_ptr<test_task>;
    using handle = std::experimental::coroutine_handle<promise_type>;

    struct promise_type {
        static auto get_return_object_on_allocation_failure() {
            return test_task_future{ nullptr };
        }

        auto get_return_object() {
            return test_task_future{ handle::from_promise(*this) };
        }

        auto initial_suspend() {
            return std::experimental::suspend_never{};
        }

        auto final_suspend() {
            return std::experimental::suspend_always{};
        }

        void unhandled_exception() {
            std::terminate();
        }

        void return_void() {
            for (auto& h : follower) {
                h.resume();
            }
        }

        // 这里是为了把task::ptr_t数据关联进去,目前版本不支持promise构造函数参数,没想到什么其他好方法
        auto yield_value(ptr_t t) {
            task = t;
            return std::experimental::suspend_never{};
        }

        ptr_t task;
        std::vector<handle> follower;
    };

    // 下面的接入用侵入式的方式支持 co_await test_task::ptr_t
    struct awaitable {
        awaitable(const test_task_future& pt) {
            if (pt.coro) {
                data = pt.coro.promise().task;
                coro = pt.coro;
            }
        };

        bool await_ready() const {
            bool ret = !data || data->is_ready();
            if (ret) {
                std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << data.get() << " ready" << std::endl;
            }

            return ret;
        }

        void await_resume() {
            if (data) {
                  data->status;
                std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << data.get() << " resume to " << (data ? data->status : -1) << std::endl;
            }
        }

        void await_suspend(handle h) {
            if (data) {
                std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << data.get() << " suspend test_task_future::handle from " << (data ? data->status : -1) <<
                    ", wait for " << h.address() << std::endl;

            }

            if (coro && h) {
                coro.promise().follower.emplace_back(h);
            }
        }

        ptr_t data;
        handle coro;
    };

    bool done() const {
        return !coro || coro.done();
    }

private:
    test_task_future(handle h) : coro(h) {}
    handle coro;
};

// 接入 co_await test_task::ptr_t
auto operator co_await(const test_task_future & pt) noexcept {
    return test_task_future::awaitable{ pt };
}

test_task_future h(test_task::ptr_t task) {
    // 这里是为了把task::ptr_t数据关联进去,目前版本不支持promise构造函数参数,没想到什么其他好方法
    co_yield task;

    // 模拟任务内部流程并调用外部RPC
    std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << task.get() << std::endl;
    test_rpc_generator rpc_res = f();
    co_await rpc_res;
    std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << task.get() << " call f() ret: " << rpc_res.value() << std::endl;
}

test_task_future g(test_task::ptr_t task) {
    // 这里是为了把task::ptr_t数据关联进去,目前版本不支持promise构造函数参数,没想到什么其他好方法
    co_yield task;

    // 等待子任务完成
    while (task && !task->is_ready()) {
        std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << task.get() << std::endl;
        co_await h(task);
    }
}

int main(int argc, char* argv[]) {
#ifdef __cpp_coroutines
    std::cout << "__cpp_coroutines: " << __cpp_coroutines << std::endl;
#endif

    // 创建一个任务
    test_task::ptr_t task = std::make_shared<test_task>(3);
    // 运行任务
    auto fut = g(task);
    // 模拟从外部获取数据然会恢复协程
    while (!fut.done()) {
        test_rpc_manager_run();
    }

    return 0;
}

这回贴一下linux内clang的输出吧:

代码语言:javascript复制
__cpp_coroutines: 201703
                               g : 280: task 0x1d3c028
                               h : 268: task 0x1d3c028
                   await_suspend : 89: test_rpc_generator yield for 1 time(s), wait for 0x1d3c320
                   await_suspend : 235: task 0x1d3c028 suspend test_task_future::handle from 0, wait for 0x1d3c040
resumed got rpc_res1: 1(@0x1d3c730)
resumed got rpc_res1: 1(@0x1d3c730), rpc_res2: 2(@0x1d3c734)
                    await_resume : 85: test_rpc_generator resume for 2 time(s)
                               h : 271: task 0x1d3c028 call f() ret: 102
                    await_resume : 229: task 0x1d3c028 resume to 1
                               g : 280: task 0x1d3c028
                               h : 268: task 0x1d3c028
                   await_suspend : 89: test_rpc_generator yield for 1 time(s), wait for 0x1d3c850
                   await_suspend : 235: task 0x1d3c028 suspend test_task_future::handle from 1, wait for 0x1d3c040
resumed got rpc_res1: 3(@0x1d3cc60)
resumed got rpc_res1: 3(@0x1d3cc60), rpc_res2: 4(@0x1d3cc64)
                    await_resume : 85: test_rpc_generator resume for 2 time(s)
                               h : 271: task 0x1d3c028 call f() ret: 304
                    await_resume : 229: task 0x1d3c028 resume to 2
                               g : 280: task 0x1d3c028
                               h : 268: task 0x1d3c028
                   await_suspend : 89: test_rpc_generator yield for 1 time(s), wait for 0x1d3cd40
                   await_suspend : 235: task 0x1d3c028 suspend test_task_future::handle from 2, wait for 0x1d3c040
resumed got rpc_res1: 5(@0x1d3d150)
resumed got rpc_res1: 5(@0x1d3d150), rpc_res2: 6(@0x1d3d154)
                    await_resume : 85: test_rpc_generator resume for 2 time(s)
                               h : 271: task 0x1d3c028 call f() ret: 506
                    await_resume : 229: task 0x1d3c028 resume to 3

promise/future 支持

我本地测试的Clang版本(7.0.1)尚未实现支持。 MSVC的标准库用偏特化的方式对 std::promise 和 std::future 实现了协程的接入。它的接入代码如下(为了简短,精简掉了一个偏特化实现,流程和不特化的类似):

代码语言:javascript复制
namespace experimental {
    template <class _Ty, class... _ArgTypes>
    struct coroutine_traits<future<_Ty>, _ArgTypes...> { // defines resumable traits for functions returning future<_Ty>
        struct promise_type {
            promise<_Ty> _MyPromise;

            future<_Ty> get_return_object() {
                return _MyPromise.get_future();
            }

            bool initial_suspend() const {
                return false;
            }

            bool final_suspend() const {
                return false;
            }

            template <class _Ut>
            void return_value(_Ut&& _Value) {
                _MyPromise.set_value(_STD forward<_Ut>(_Value));
            }

            void set_exception(exception_ptr _Exc) {
                _MyPromise.set_exception(_STD move(_Exc));
            }
        };
    };
} // namespace experimental

template <class _Ty>
bool await_ready(future<_Ty>& _Fut) {
    return _Fut._Is_ready();
}

template <class _Ty>
void await_suspend(future<_Ty>& _Fut,
    experimental::coroutine_handle<> _ResumeCb) { // change to .then when future gets .then
    thread _WaitingThread([&_Fut, _ResumeCb] {
        _Fut.wait();
        _ResumeCb();
    });
    _WaitingThread.detach();
}

template <class _Ty>
auto await_resume(future<_Ty>& _Fut) {
    return _Fut.get();
}

可以看到,它的 co_await std::future<T> 挂起是开了个新线程来等待,真他喵暴力,建议不要用。但是还是可以比较容易地让自己的管理器接入 co_await

我们借助STL的协程接入, 可以实现一个最小化的自定义协程支持:

代码语言:javascript复制
#include <iostream>
#include <future>
#include <thread>
#include <chrono>

#include <experimental/coroutine>

struct custom_rpc_generator {};

struct suspend_custom_rpc : public std::experimental::suspend_always {
    void await_suspend(std::experimental::coroutine_handle<> h) noexcept {
        std::thread thd{ [h]() {
            using namespace std;
            std::this_thread::sleep_for(2s);
            std::cout << "start to resume coroutine" << std::endl;
            h.resume();
        } };

        thd.detach();
    }
};

auto operator co_await(const custom_rpc_generator&) {
    return suspend_custom_rpc{};
}

std::future<void> outter_fn() {
    co_await custom_rpc_generator{};
}

int main() {
    auto fut = outter_fn();
    std::cout << "start to wait future" << std::endl;
    fut.wait();
    std::cout << "future finished, ready to exit" << std::endl;
    return 0;
}

总结

总体感觉上,C 20协程为了兼顾灵活和支持非侵入式接入,设计了好几个互相交织的大模块,函数级有处理协程函数内部的 promise_type 、 协程函数对外暴露的交互对象 (我们这里统称为 future) 、 用于协程上下文切换的 handle ,单次切换有用于支持await功能的 awaitable 和一些回调函数接口。几个对象之间的数据共享也并不是很方便。而且和传统有栈协程的区别仅仅是约束了返回值类型,并且可以依次在编译期推断出需要多少栈空间,从而减少浪费。

我打算后面有时间尝试对 libcopp 接入C 协程支持,在研究C 协程的时候也想到几个问题。在和 ultramanhu 讨论了一下以后主要的问题也有了一些初步的解决方案的想法,但是目前细节上还是有一些没太想清楚的地方。

首先是如果业务有自己的线程池,其实还是要由管理层来控制resume,不能直接像STL那样开线程,那基本上就和 std::future<T> say Good-Bye 了。虽然在小心维护的情况下,避免 co_await std::future<T> 也是可以避免STL乱开线程的,但是我觉得一旦使用了,后面就很难控制住。特别是这个C 协程的对象关系互耦合插如此严重的情况下,本身就不容易理解。

第二个问题是调用链。C 协程接口设计是非对称的,我们实际业务中,肯定还是需要对称协程的支持,即子协程结束后能够自动恢复 co_await 它的父协程。这个需要我们自己去实现,上面 std::future<T> 也就是这里是开了个线程去实现的(都开线程了还用协程干啥)。在上面的sample代码中,我是开了一个共享数据区,在 await_suspend 的时候去追加等待链,在 return_void/return_value 的时候去执行父级协程的恢复切入,这里面写成vector是为了支持N个协程await一个协程的情况,实际上用链表会更好一些。这里有个比较“脏”的设计是handle的加入是在 awaitable 执行挂起的时候,而恢复是在 promise_typereturn_void/return_value 事件里。这就分了两个地方。这里的 awaitablepromise_type 和N个协程等一个协程是保持一致的N:1关系。但是 awaitable 仅仅能访问 future , 要让 promise_type 也能访问 future 的话,一种方法是开一个共享数据块,在 promise_type 创建 future 的时候传进去这样了。这也是上面sample代码里 using data_ptr = std::shared_ptr<test_rpc_data>; 的实现。 MSVC的STL的实现也是这样 promise::get_future() 是使用了promise内部的数据创建的future对象。 另外一种尝试的方法是后面task的,进入协程后先 co_yield 一次,然后 yield_value(VALUE) 函数返回 std::experimental::suspend_never。这样不会造成协程挂起,并且可以给 promise_type 注入任意数据。不过这样就有个约定式的规范了,也不是很严谨。这方面等后面支持了 promise_type 的带参数构造函数可能可以好一些。

第三个问题是handle提前结束的问题。在看了MSVC的实现,这个handle是可以copy也可以转换的。copy开销也很小,里面只包含一个和 promise_type 地址有关的指针(就是 promise_type 的地址然后加了 align和padding)。比如一个RPC任务,我可能copy一个handle用来在有数据的时候resume,然后我还会copy一个handle在超时的时候强行resume然后走失败流程。现在这种情况,就是需要在 awaitableawait_suspend 里添加这两个handle到两个manager里,然后在 await_resume 里要把这两个都移除。不管哪个流程,肯定有一个handle是处于resume回调中的,这还涉及递归调用的问题(删除正在resume过程中的handle)。 我目前的想法是,封装一个handle resumer,让多个事件manager都指向同一个 handle resumer的weak_ptr。这个resumer的生命周期可以和 awaitable 共存,一旦 awaitable 消失,这次的await也就结束了。

libcopp 所有组件都是可拆卸和自定义的,所以剩下还有些细节就是接入哪些东西的哪些接口,如果拆卸掉某些组件之后怎么保证这些接入仍然可用和是否要支持 libcopp 内yield或是await其他C 协程对象。这些在后面结合一些实际应用再取舍吧。

最后,贴一个更详细一点的C 20 Coroutine生成的汇编( https://gist.github.com/owent/aa7b093caddcea5a79f32d0ebf4efa88 )。如果上面有哪些理解不对的地方或者建议,欢迎有兴趣的小伙伴们一起来交流探讨哈。

1 人点赞