C++ 异步编程之协程代码实践

2024-08-12 15:40:05 浏览数 (2)

引言

异步编程是实际开发当中不可或缺的一部分,尤其是在处理 I/O 操作、网络请求、用户界面响应等需要高并发场景时。进程和线程我们做研发的可能了解的比较多,虽然协程的概念很早就出现了,但语言层面上支持相对比较晚,直到C 20才正式被引入。本文分享一下笔者在工程上使用协程的一些实践和思考总结。

进程 vs 线程 vs 协程

用一个表格对比下进程、线程和协程之间的区别:

特征

进程

线程

协程

定义

独立的执行环境,拥有自己的地址空间。

进程内的执行单元,共享进程的资源。

轻量级的“线程”,不由操作系统直接管理,而是由应用程序控制。

资源消耗

高,每个进程都有独立的内存等资源。

较低,线程之间共享内存和资源。

最低,协程共享线程资源,切换开销小。

创建和销毁成本

高,涉及到操作系统的资源分配和回收。

中等,比进程轻量,但仍需操作系统管理。

非常低,由程序语言或框架层面控制。

控制复杂度

高,需要操作系统参与调度和同步。

中等,线程之间的同步和通信需要细致处理。

低,通常在一个线程内,同步和通信更简单。

并发性能

中等,进程间通信(IPC)开销较大。

高,线程之间切换和通信相对高效。

高,协程切换开销非常小,适合高并发场景。

应用场景

适合需要独立资源和保护的应用。

适合需要并行处理和资源共享的应用。

适合IO密集型和高并发的应用。

每种技术都有其适用的场景:

  • 进程:适合于需要独立运行和资源隔离的大型应用程序,如服务器的不同服务组件。
  • 线程:适合于需要并行处理任务并共享内存资源的场景,如多核处理器上的并行计算。
  • 协程:特别适合处理高并发的I/O密集型任务,如现代Web服务器和网络应用。

笔者主要是从事应用开发,进程一般情况下用的比较少,只有在需要实现跨进程通信的时候才会涉及到。线程就用得比较多,通常会使用线程池来管理,进而减少创建和销毁带来的开销。协程因为非常轻量,日常业务开发当中,比如发起网络请求、I/O操作和简单的异步操作,可以用同步的方式写异步代码,也能更便捷的控制协程的生命周期,不受系统管理,能给研发带来更多的灵活性。

Boost.Asio 异步模型

Boost.Asio 简介

Boost.Asio是一个用于C 的跨平台库,它提供了一组用于处理异步输入/输出(I/O)的工具和组件。它是Boost库的一部分,一个非常流行的C 库集合,旨在提供可移植且高质量的通用组件。 Boost.Asio主要用于网络和低级硬件交互,支持TCP、UDP、串行端口等协议。它不仅限于网络编程,也可以用于构建任何需要异步I/O操作的应用程序,比如文件处理、定时器等。异步I/O是指启动一个I/O操作后,不需要等待其完成即可继续执行其他任务。这对于需要高性能和响应性能的应用程序非常有用,因为它可以帮助你有效地使用系统资源,防止应用程序在等待I/O操作完成时空闲。 Boost.Asio提供了一个强大的异步模型,通过使用回调函数、绑定器和协程等技术,使得编写异步代码更加直观和简洁。此外,它也有同步操作的支持,使得用户可以根据需要选择最适合自己的编程风格。

图引自:https://think-async.com/Asio/

因为C 在语言层面需要将编译器升级至C 20才支持协程,包括关键字co_awaitco_returnco_yield. 我们的项目工程使用了Boost.Asio库可以在不支持C 20的环境中也可以使用协程,相比之下它提供了向后的兼容性。C 20 协程提供了一种更为现代和符合直觉的方式来处理异步代码,允许开发者以类似同步代码的方式编写异步逻辑,这极大简化了代码的复杂性。

Post vs CoSpawn

在 Boost.Asio 中,postco_spawn 是两种常用的处理异步操作的方法。post 用于将任务异步地提交到执行器(如 io_context)上执行,而 co_spawn 则是用于启动协程,使得异步代码的写法更接近同步代码的风格。

示例代码:

代码语言:javascript复制
#include <boost/asio.hpp>
#include <boost/asio/experimental/co_spawn.hpp>
#include <boost/asio/experimental/detached.hpp>
#include <iostream>
#include <chrono>

namespace asio = boost::asio;
using namespace std::chrono_literals;

asio::awaitable<void> async_print(const std::string& message) {
    co_await asio::this_coro::executor.sleep_for(1s);
    std::cout << message << std::endl;
}

int main() {
    asio::io_context io_context;

    // 使用 post 提交一个简单的任务
    asio::post(io_context, []() {
        std::cout << "Hello from post!n";
    });

    // 使用 co_spawn 启动一个协程
    asio::experimental::co_spawn(io_context, async_print("Hello from coroutine!"), asio::experimental::detached);

    // 运行 io_context 直到所有作业完成
    io_context.run();

    return 0;
}

在这个例子中:

  • async_print 是一个协程函数,它等待 1 秒钟然后打印一条消息。这个函数返回 asio::awaitable<void>,表明它是一个异步协程。
  • post 函数用于提交一个 lambda 函数到 io_context。此 lambda 函数直接打印一条消息。
  • co_spawn 函数用于在 io_context 的执行器上启动 async_print 协程。第三个参数 asio::experimental::detached 表示协程的完成是“分离”的,即不需要等待协程完成。

协程的一些代码实践

针对Boost.Asio协程实现的封装

以下的一些代码有针对Boost.Asio库中关于协程相关的封装,比如:

简化命名空间声明和变量定义

代码语言:javascript复制
namespace asio = boost::asio;
using error_code = boost::system::error_code;

template <typename T>
using awaitable = boost::asio::awaitable<T>;
constexpr cross::comm::StrictDetachedType detached;        // default use strict detached, instead of asio::detached
constexpr cross::comm::TolerantDetachedType tol_detached;  // tolerant, like asio::detached, but with exception logging
using boost::asio::use_awaitable;
using boost::asio::experimental::awaitable_operators::operator&&;
using boost::asio::experimental::awaitable_operators::operator||;
using await_token_t = asio::as_tuple_t<asio::use_awaitable_t<>>;
constexpr await_token_t await_token;
  • 简化代码中对Boost.Asio和错误码的引用
  • 模版别名定义,简化boost::asio::awaitable的协程返回类型声明
  • 引入user_awaitable以及逻辑与和逻辑或操作符,允许在协程中组合多个异步操作
  • 定义便于协程支持的异步操作,返回元组的结果的await_token

async_signal.h

代码语言:javascript复制
#ifndef CROSS_COMM_ASYNC_SIGNAL_H
#define CROSS_COMM_ASYNC_SIGNAL_H

#include <optional>

#include "boost/asio/any_io_executor.hpp"
#include "boost/asio/deferred.hpp"
#include "boost/asio/experimental/parallel_group.hpp"
#include "boost/asio/post.hpp"
#include "boost/asio/steady_timer.hpp"
#include "boost/signals2/signal.hpp"
#include "boost/smart_ptr/local_shared_ptr.hpp"

namespace cross::comm {

template <typename CompletionToken, typename... SigArgs>
auto AsyncWaitSignal(boost::asio::any_io_executor ex, boost::signals2::signal<void(SigArgs...)> *sig,
                     CompletionToken &&token) {
  return boost::asio::async_initiate<CompletionToken, void(boost::system::error_code, SigArgs...)>(
      [](auto handler, auto ex, auto sig) mutable {
        auto sl = boost::asio::get_associated_cancellation_slot(handler);
        auto wrapper = std::make_shared<std::pair<decltype(handler), bool>>(std::move(handler), false);
        auto conn = sig->connect_extended([wrapper, ex](const auto &conn, SigArgs &&...args) mutable {
          // maybe in another thread in the callback
          conn.disconnect();
          boost::asio::post(ex,
                            [wrapper = std::move(wrapper),
                             args = std::make_tuple(boost::system::error_code{}, std::forward<SigArgs>(args)...)]() {
                              // now in ex's thread
                              if (wrapper->second) return;
                              wrapper->second = true;
                              std::apply(wrapper->first, std::move(args));
                            });
        });
        if (sl.is_connected()) {
          sl.assign([conn = std::move(conn), ex,
                     weak_wrapper = std::weak_ptr<typename decltype(wrapper)::element_type>(wrapper)](
                        boost::asio::cancellation_type_t) {
            auto wrapper = weak_wrapper.lock();  // acquire shared_ptr of handler before conn disconnect
            conn.disconnect();
            if (wrapper) {
              boost::asio::post(ex, [wrapper = std::move(wrapper)]() {
                // now in ex's thread
                if (wrapper->second) return;
                wrapper->second = true;
                std::tuple<boost::system::error_code, SigArgs...> canceled_args;
                std::get<0>(canceled_args) = boost::asio::error::operation_aborted;
                std::apply(wrapper->first, std::move(canceled_args));
              });
            }
          });
        }
      },
      token, std::move(ex), sig);
}

template <typename CompletionToken, typename... SigArgs>
auto AsyncWaitSignalWithTimeout(boost::asio::any_io_executor ex, boost::signals2::signal<void(SigArgs...)> *sig,
                                std::chrono::milliseconds timeout, CompletionToken &&token) {
  return boost::asio::async_initiate<CompletionToken, void(boost::system::error_code, SigArgs...)>(
      [](auto handler, auto ex, auto sig, auto timeout) mutable {
        auto sl = boost::asio::get_associated_cancellation_slot(handler);
        boost::local_shared_ptr<boost::asio::steady_timer> timer(new boost::asio::steady_timer(ex, timeout));
        boost::asio::experimental::make_parallel_group(timer->async_wait(boost::asio::deferred),
                                                       AsyncWaitSignal(ex, sig, boost::asio::deferred))
            .async_wait(boost::asio::experimental::wait_for_one(),
                        [timer, handler = std::move(handler)](
                            std::array<std::size_t, 2> completion_order, boost::system::error_code ec1,
                            boost::system::error_code ec2, SigArgs &&...args) mutable {
                          if (completion_order[0] == 0 && !ec1) {
                            std::tuple<boost::system::error_code, SigArgs...> timeout_args;
                            std::get<0>(timeout_args) = boost::asio::error::timed_out;
                            std::apply(handler, std::move(timeout_args));
                            return;
                          }
                          std::apply(handler, std::make_tuple(ec2, std::forward<SigArgs>(args)...));
                        });
        if (sl.is_connected()) {
          sl.assign([timer](boost::asio::cancellation_type_t) { timer->cancel(); });
        }
      },
      token, std::move(ex), sig, timeout);
}

// callback style, handler MUST be copyable, so coroutine is not suitable for this
template <typename Handler, typename... SigArgs>
boost::signals2::connection AsyncConnectSignal(boost::asio::any_io_executor ex,
                                               boost::signals2::signal<void(SigArgs...)> *sig, Handler &&handler) {
  return sig->connect_extended(
      [ex = std::move(ex), handler = std::move(handler)](const auto &conn, SigArgs &&...args) mutable {
        boost::asio::post(ex, [handler, conn, args = std::make_tuple(std::forward<SigArgs>(args)...)]() {
          if (conn.connected()) std::apply(handler, std::move(args));
        });
      });
}

}  // namespace cross::comm

#endif  // CROSS_COMM_ASYNC_SIGNAL_H

AsyncWaitSignal

这个模板函数用于异步等待信号的触发。它接收一个执行器(ex)、一个信号对象指针(sig)和一个完成令牌(token)。函数内部使用boost::asio::async_initiate来包装异步操作。

  • 内部逻辑:
    • 连接到信号,当信号触发时,使用boost::asio::post将回调函数发布到指定的执行器上执行。这确保了回调是在正确的上下文中执行。
    • 使用了std::shared_ptr来管理回调中的状态,确保在异步环境中安全地使用。
    • 支持取消操作,如果与异步操作关联的取消槽被触发,则断开信号连接,并通过执行器发布一个表示操作被取消的回调。

AsyncWaitSignalWithTimeout

这个函数在AsyncWaitSignal的基础上增加了超时机制。如果在指定的时间内信号没有被触发,则触发超时处理逻辑。

  • 内部逻辑:
    • 创建一个steady_timer,并与信号等待操作并行启动。
    • 使用boost::asio::experimental::make_parallel_group来组合定时器和信号等待操作,这允许同时等待两个异步操作。
    • 使用async_wait等待两个操作中的任意一个完成。根据完成的操作类型(定时器或信号),调用相应的处理逻辑。

AsyncConnectSignal

这个函数用于将用户定义的回调连接到一个信号。

  • 内部逻辑:
    • 使用信号的connect_extended方法注册回调。
    • 回调中使用boost::asio::post确保回调在正确的执行器上执行。
    • 检查连接状态,确保在信号仍然连接时执行用户的处理逻辑。

实现一个协程方法

定义一个协程方法,使用awaitable 来声明协程或异步的返回类型。

代码语言:javascript复制
awaitable<void> mock_pay(std::string auth_code) {
  auto [ec, out_trade_no] = co_await PayRequest::SimulateMchPay(auth_code, 1);
  if (ec) {
    LOG_E("sim mch pay fail, ec: {} out_trade_no: {}", ec, out_trade_no);
  } else {
    LOG_I("sim mch pay out_trade_no: {}", out_trade_no);
  }
  co_return;
}

使用同步的代码风格写异步代码:

代码语言:javascript复制
 co_await mock_pay(auth_code);

解析一下:

  • co_await:一元运算符,语义是挂起协程,并将程序控制权返回给调用者。
  • awaitable: 支持co_await运算符的类型,表示可等待对象。
  • co_return:用于从协程返回值,并标志着协程的结束。这与传统的 return 语句类似,但它是专为协程设计的,确保在返回值之前正确地清理和挂起协程状态。

实现一个timer等待

代码语言:javascript复制
  asio::steady_timer timeout(Threads::MainThread()->Executor(), std::chrono::seconds(2));
  co_await timeout.async_wait(await_token);

解析一下:

  • asio::steady_timer 是 Boost.Asio 提供的一个用于精确计时的类。
  • Threads::MainThread()->Executor() 获取了主线程的执行器(Executor)。这个执行器是处理异步事件的上下文。
  • std::chrono::seconds(2) 指定定时器在两秒后激活。
  • async_wait 是一个异步操作,当定时器达到指定的时间后,它被触发。
  • await_token 是一个用于控制异步等待行为的对象。在 Boost.Asio 的 C 20 协程支持中,通常使用一种称为 use_awaitable 的特殊对象作为 await_token

实现等待一个超时异步信号

代码语言:javascript复制
 auto [ec, result] = co_await comm::AsyncWaitSignalWithTimeout(
      this_thread::Executor(), SystemInterface::Instance()->NetworkScanWifiCompletedSig(), std::chrono::seconds(10),
      await_token);

以上这段代码将异步信号处理和超时逻辑封装到AsyncWaitSignalWithTimeout方法中,开发者实现相应的信号处理逻辑和传递超时参数即可,代码非常简洁易读。等待异步信号处理的在日常开发中应用非常常见,按传统的实现方式会非常繁琐,使用协程之后就变得容易。这里涉及关于信号的实现,由于篇幅有限,这里后续有机会继续补充相应的实践,更详细的参考:

https://www.boost.org/doc/libs/1_84_0/doc/html/signals2.html

实现类似Promise的并发模型

代码语言:javascript复制
 auto results = co_await (ShorkLinkQueryPayResult(auth_code) || AsyncWaitNetworkPushMessage(auth_code));

这段代码展示的是使用 ||逻辑或操作符来实现多任务并行请求,等待两个异步任务,只要一个成功即视为成功。

ShorkLinkQueryPayResult代码示例:

这段代码展示的是通过短连接来实现查询支付结果,通过while循环结合timer实现一个定时轮询查单逻辑。可以看到每一次轮询都会调用co_await来等待异步任务FetchPayResult结果,也是通过协程的方式来处理网络请求。

AsyncWaitNetworkPushMessage代码示例:

这段代码展示的通过异步信号带超时时间,等待后台推送消息来实现推拉结合的方式查询支付结果。

除了逻辑或||,自然也可以通过逻辑与&&来实现等待多个异步任务的执行结果。对应的其实类似JavaScript中Promise机制中的Promise.all()Promise.any(),使用async/await语法糖即能实现类似的效果。

总结

本文介绍了协程的基本概念和用法,通过使用Boost.Asio框架实现了高效的协程封装,使用同步的方式编写异步代码带来的简洁性和代码健壮性,无需处理复杂的状态扭转,让开发更好的关注业务代码的实现,用更低的成本实现复杂的并发任务。笔者提供了我们在工程中常见的协程使用案例,比如:

  • 使用awaitable来声明一个协程方法
  • 使用asio::steady_timer来实现定时逻辑
  • 使用boost::signals2::signalpost 方法来实现复杂的异步信号处理
  • 使用 &&|| 来实现类似Promise机制中的并发任务模型

通过以上的实践,基本可以满足90%以上业务开发当中的的异步编程场景,未来也可以继续深入学习异步编程的本质,探索更加高效和优雅的实现方式。

0 人点赞