引言
异步编程是实际开发当中不可或缺的一部分,尤其是在处理 I/O 操作、网络请求、用户界面响应等需要高并发场景时。进程和线程我们做研发的可能了解的比较多,虽然协程的概念很早就出现了,但语言层面上支持相对比较晚,直到C 20才正式被引入。本文分享一下笔者在工程上使用协程的一些实践和思考总结。
进程 vs 线程 vs 协程
用一个表格对比下进程、线程和协程之间的区别:
特征 | 进程 | 线程 | 协程 |
---|---|---|---|
定义 | 独立的执行环境,拥有自己的地址空间。 | 进程内的执行单元,共享进程的资源。 | 轻量级的“线程”,不由操作系统直接管理,而是由应用程序控制。 |
资源消耗 | 高,每个进程都有独立的内存等资源。 | 较低,线程之间共享内存和资源。 | 最低,协程共享线程资源,切换开销小。 |
创建和销毁成本 | 高,涉及到操作系统的资源分配和回收。 | 中等,比进程轻量,但仍需操作系统管理。 | 非常低,由程序语言或框架层面控制。 |
控制复杂度 | 高,需要操作系统参与调度和同步。 | 中等,线程之间的同步和通信需要细致处理。 | 低,通常在一个线程内,同步和通信更简单。 |
并发性能 | 中等,进程间通信(IPC)开销较大。 | 高,线程之间切换和通信相对高效。 | 高,协程切换开销非常小,适合高并发场景。 |
应用场景 | 适合需要独立资源和保护的应用。 | 适合需要并行处理和资源共享的应用。 | 适合IO密集型和高并发的应用。 |
每种技术都有其适用的场景:
- 进程:适合于需要独立运行和资源隔离的大型应用程序,如服务器的不同服务组件。
- 线程:适合于需要并行处理任务并共享内存资源的场景,如多核处理器上的并行计算。
- 协程:特别适合处理高并发的I/O密集型任务,如现代Web服务器和网络应用。
笔者主要是从事应用开发,进程一般情况下用的比较少,只有在需要实现跨进程通信的时候才会涉及到。线程就用得比较多,通常会使用线程池来管理,进而减少创建和销毁带来的开销。协程因为非常轻量,日常业务开发当中,比如发起网络请求、I/O操作和简单的异步操作,可以用同步的方式写异步代码,也能更便捷的控制协程的生命周期,不受系统管理,能给研发带来更多的灵活性。
Boost.Asio 异步模型
Boost.Asio 简介
Boost.Asio是一个用于C 的跨平台库,它提供了一组用于处理异步输入/输出(I/O)的工具和组件。它是Boost库的一部分,一个非常流行的C 库集合,旨在提供可移植且高质量的通用组件。 Boost.Asio主要用于网络和低级硬件交互,支持TCP、UDP、串行端口等协议。它不仅限于网络编程,也可以用于构建任何需要异步I/O操作的应用程序,比如文件处理、定时器等。异步I/O是指启动一个I/O操作后,不需要等待其完成即可继续执行其他任务。这对于需要高性能和响应性能的应用程序非常有用,因为它可以帮助你有效地使用系统资源,防止应用程序在等待I/O操作完成时空闲。 Boost.Asio提供了一个强大的异步模型,通过使用回调函数、绑定器和协程等技术,使得编写异步代码更加直观和简洁。此外,它也有同步操作的支持,使得用户可以根据需要选择最适合自己的编程风格。
图引自:https://think-async.com/Asio/
因为C 在语言层面需要将编译器升级至C 20才支持协程,包括关键字co_await
、co_return
和co_yield
. 我们的项目工程使用了Boost.Asio库可以在不支持C 20的环境中也可以使用协程,相比之下它提供了向后的兼容性。C 20 协程提供了一种更为现代和符合直觉的方式来处理异步代码,允许开发者以类似同步代码的方式编写异步逻辑,这极大简化了代码的复杂性。
Post vs CoSpawn
在 Boost.Asio 中,post
和 co_spawn
是两种常用的处理异步操作的方法。post
用于将任务异步地提交到执行器(如 io_context)上执行,而 co_spawn
则是用于启动协程,使得异步代码的写法更接近同步代码的风格。
示例代码:
代码语言:javascript复制#include <boost/asio.hpp>
#include <boost/asio/experimental/co_spawn.hpp>
#include <boost/asio/experimental/detached.hpp>
#include <iostream>
#include <chrono>
namespace asio = boost::asio;
using namespace std::chrono_literals;
asio::awaitable<void> async_print(const std::string& message) {
co_await asio::this_coro::executor.sleep_for(1s);
std::cout << message << std::endl;
}
int main() {
asio::io_context io_context;
// 使用 post 提交一个简单的任务
asio::post(io_context, []() {
std::cout << "Hello from post!n";
});
// 使用 co_spawn 启动一个协程
asio::experimental::co_spawn(io_context, async_print("Hello from coroutine!"), asio::experimental::detached);
// 运行 io_context 直到所有作业完成
io_context.run();
return 0;
}
在这个例子中:
async_print
是一个协程函数,它等待 1 秒钟然后打印一条消息。这个函数返回asio::awaitable<void>
,表明它是一个异步协程。post
函数用于提交一个 lambda 函数到io_context
。此 lambda 函数直接打印一条消息。co_spawn
函数用于在io_context
的执行器上启动async_print
协程。第三个参数asio::experimental::detached
表示协程的完成是“分离”的,即不需要等待协程完成。
协程的一些代码实践
针对Boost.Asio协程实现的封装
以下的一些代码有针对Boost.Asio库中关于协程相关的封装,比如:
简化命名空间声明和变量定义
代码语言:javascript复制namespace asio = boost::asio;
using error_code = boost::system::error_code;
template <typename T>
using awaitable = boost::asio::awaitable<T>;
constexpr cross::comm::StrictDetachedType detached; // default use strict detached, instead of asio::detached
constexpr cross::comm::TolerantDetachedType tol_detached; // tolerant, like asio::detached, but with exception logging
using boost::asio::use_awaitable;
using boost::asio::experimental::awaitable_operators::operator&&;
using boost::asio::experimental::awaitable_operators::operator||;
using await_token_t = asio::as_tuple_t<asio::use_awaitable_t<>>;
constexpr await_token_t await_token;
- 简化代码中对Boost.Asio和错误码的引用
- 模版别名定义,简化
boost::asio::awaitable
的协程返回类型声明 - 引入
user_awaitable
以及逻辑与和逻辑或操作符,允许在协程中组合多个异步操作 - 定义便于协程支持的异步操作,返回元组的结果的
await_token
async_signal.h
#ifndef CROSS_COMM_ASYNC_SIGNAL_H
#define CROSS_COMM_ASYNC_SIGNAL_H
#include <optional>
#include "boost/asio/any_io_executor.hpp"
#include "boost/asio/deferred.hpp"
#include "boost/asio/experimental/parallel_group.hpp"
#include "boost/asio/post.hpp"
#include "boost/asio/steady_timer.hpp"
#include "boost/signals2/signal.hpp"
#include "boost/smart_ptr/local_shared_ptr.hpp"
namespace cross::comm {
template <typename CompletionToken, typename... SigArgs>
auto AsyncWaitSignal(boost::asio::any_io_executor ex, boost::signals2::signal<void(SigArgs...)> *sig,
CompletionToken &&token) {
return boost::asio::async_initiate<CompletionToken, void(boost::system::error_code, SigArgs...)>(
[](auto handler, auto ex, auto sig) mutable {
auto sl = boost::asio::get_associated_cancellation_slot(handler);
auto wrapper = std::make_shared<std::pair<decltype(handler), bool>>(std::move(handler), false);
auto conn = sig->connect_extended([wrapper, ex](const auto &conn, SigArgs &&...args) mutable {
// maybe in another thread in the callback
conn.disconnect();
boost::asio::post(ex,
[wrapper = std::move(wrapper),
args = std::make_tuple(boost::system::error_code{}, std::forward<SigArgs>(args)...)]() {
// now in ex's thread
if (wrapper->second) return;
wrapper->second = true;
std::apply(wrapper->first, std::move(args));
});
});
if (sl.is_connected()) {
sl.assign([conn = std::move(conn), ex,
weak_wrapper = std::weak_ptr<typename decltype(wrapper)::element_type>(wrapper)](
boost::asio::cancellation_type_t) {
auto wrapper = weak_wrapper.lock(); // acquire shared_ptr of handler before conn disconnect
conn.disconnect();
if (wrapper) {
boost::asio::post(ex, [wrapper = std::move(wrapper)]() {
// now in ex's thread
if (wrapper->second) return;
wrapper->second = true;
std::tuple<boost::system::error_code, SigArgs...> canceled_args;
std::get<0>(canceled_args) = boost::asio::error::operation_aborted;
std::apply(wrapper->first, std::move(canceled_args));
});
}
});
}
},
token, std::move(ex), sig);
}
template <typename CompletionToken, typename... SigArgs>
auto AsyncWaitSignalWithTimeout(boost::asio::any_io_executor ex, boost::signals2::signal<void(SigArgs...)> *sig,
std::chrono::milliseconds timeout, CompletionToken &&token) {
return boost::asio::async_initiate<CompletionToken, void(boost::system::error_code, SigArgs...)>(
[](auto handler, auto ex, auto sig, auto timeout) mutable {
auto sl = boost::asio::get_associated_cancellation_slot(handler);
boost::local_shared_ptr<boost::asio::steady_timer> timer(new boost::asio::steady_timer(ex, timeout));
boost::asio::experimental::make_parallel_group(timer->async_wait(boost::asio::deferred),
AsyncWaitSignal(ex, sig, boost::asio::deferred))
.async_wait(boost::asio::experimental::wait_for_one(),
[timer, handler = std::move(handler)](
std::array<std::size_t, 2> completion_order, boost::system::error_code ec1,
boost::system::error_code ec2, SigArgs &&...args) mutable {
if (completion_order[0] == 0 && !ec1) {
std::tuple<boost::system::error_code, SigArgs...> timeout_args;
std::get<0>(timeout_args) = boost::asio::error::timed_out;
std::apply(handler, std::move(timeout_args));
return;
}
std::apply(handler, std::make_tuple(ec2, std::forward<SigArgs>(args)...));
});
if (sl.is_connected()) {
sl.assign([timer](boost::asio::cancellation_type_t) { timer->cancel(); });
}
},
token, std::move(ex), sig, timeout);
}
// callback style, handler MUST be copyable, so coroutine is not suitable for this
template <typename Handler, typename... SigArgs>
boost::signals2::connection AsyncConnectSignal(boost::asio::any_io_executor ex,
boost::signals2::signal<void(SigArgs...)> *sig, Handler &&handler) {
return sig->connect_extended(
[ex = std::move(ex), handler = std::move(handler)](const auto &conn, SigArgs &&...args) mutable {
boost::asio::post(ex, [handler, conn, args = std::make_tuple(std::forward<SigArgs>(args)...)]() {
if (conn.connected()) std::apply(handler, std::move(args));
});
});
}
} // namespace cross::comm
#endif // CROSS_COMM_ASYNC_SIGNAL_H
AsyncWaitSignal
这个模板函数用于异步等待信号的触发。它接收一个执行器(ex
)、一个信号对象指针(sig
)和一个完成令牌(token
)。函数内部使用boost::asio::async_initiate
来包装异步操作。
- 内部逻辑:
- 连接到信号,当信号触发时,使用
boost::asio::post
将回调函数发布到指定的执行器上执行。这确保了回调是在正确的上下文中执行。 - 使用了
std::shared_ptr
来管理回调中的状态,确保在异步环境中安全地使用。 - 支持取消操作,如果与异步操作关联的取消槽被触发,则断开信号连接,并通过执行器发布一个表示操作被取消的回调。
- 连接到信号,当信号触发时,使用
AsyncWaitSignalWithTimeout
这个函数在AsyncWaitSignal
的基础上增加了超时机制。如果在指定的时间内信号没有被触发,则触发超时处理逻辑。
- 内部逻辑:
- 创建一个
steady_timer
,并与信号等待操作并行启动。 - 使用
boost::asio::experimental::make_parallel_group
来组合定时器和信号等待操作,这允许同时等待两个异步操作。 - 使用
async_wait
等待两个操作中的任意一个完成。根据完成的操作类型(定时器或信号),调用相应的处理逻辑。
- 创建一个
AsyncConnectSignal
这个函数用于将用户定义的回调连接到一个信号。
- 内部逻辑:
- 使用信号的
connect_extended
方法注册回调。 - 回调中使用
boost::asio::post
确保回调在正确的执行器上执行。 - 检查连接状态,确保在信号仍然连接时执行用户的处理逻辑。
- 使用信号的
实现一个协程方法
定义一个协程方法,使用awaitable
来声明协程或异步的返回类型。
awaitable<void> mock_pay(std::string auth_code) {
auto [ec, out_trade_no] = co_await PayRequest::SimulateMchPay(auth_code, 1);
if (ec) {
LOG_E("sim mch pay fail, ec: {} out_trade_no: {}", ec, out_trade_no);
} else {
LOG_I("sim mch pay out_trade_no: {}", out_trade_no);
}
co_return;
}
使用同步的代码风格写异步代码:
代码语言:javascript复制 co_await mock_pay(auth_code);
解析一下:
co_await
:一元运算符,语义是挂起协程,并将程序控制权返回给调用者。awaitable
: 支持co_await
运算符的类型,表示可等待对象。co_return
:用于从协程返回值,并标志着协程的结束。这与传统的return
语句类似,但它是专为协程设计的,确保在返回值之前正确地清理和挂起协程状态。
实现一个timer等待
代码语言:javascript复制 asio::steady_timer timeout(Threads::MainThread()->Executor(), std::chrono::seconds(2));
co_await timeout.async_wait(await_token);
解析一下:
asio::steady_timer
是 Boost.Asio 提供的一个用于精确计时的类。Threads::MainThread()->Executor()
获取了主线程的执行器(Executor)。这个执行器是处理异步事件的上下文。std::chrono::seconds(2)
指定定时器在两秒后激活。async_wait
是一个异步操作,当定时器达到指定的时间后,它被触发。await_token
是一个用于控制异步等待行为的对象。在 Boost.Asio 的 C 20 协程支持中,通常使用一种称为use_awaitable
的特殊对象作为await_token
。
实现等待一个超时异步信号
代码语言:javascript复制 auto [ec, result] = co_await comm::AsyncWaitSignalWithTimeout(
this_thread::Executor(), SystemInterface::Instance()->NetworkScanWifiCompletedSig(), std::chrono::seconds(10),
await_token);
以上这段代码将异步信号处理和超时逻辑封装到AsyncWaitSignalWithTimeout
方法中,开发者实现相应的信号处理逻辑和传递超时参数即可,代码非常简洁易读。等待异步信号处理的在日常开发中应用非常常见,按传统的实现方式会非常繁琐,使用协程之后就变得容易。这里涉及关于信号的实现,由于篇幅有限,这里后续有机会继续补充相应的实践,更详细的参考:
https://www.boost.org/doc/libs/1_84_0/doc/html/signals2.html
实现类似Promise的并发模型
代码语言:javascript复制 auto results = co_await (ShorkLinkQueryPayResult(auth_code) || AsyncWaitNetworkPushMessage(auth_code));
这段代码展示的是使用 ||
逻辑或操作符来实现多任务并行请求,等待两个异步任务,只要一个成功即视为成功。
ShorkLinkQueryPayResult代码示例:
这段代码展示的是通过短连接来实现查询支付结果,通过while循环结合timer实现一个定时轮询查单逻辑。可以看到每一次轮询都会调用co_await
来等待异步任务FetchPayResult
结果,也是通过协程的方式来处理网络请求。
AsyncWaitNetworkPushMessage代码示例:
这段代码展示的通过异步信号带超时时间,等待后台推送消息来实现推拉结合的方式查询支付结果。
除了逻辑或||
,自然也可以通过逻辑与&&
来实现等待多个异步任务的执行结果。对应的其实类似JavaScript中Promise机制中的Promise.all()
和Promise.any()
,使用async/await
语法糖即能实现类似的效果。
总结
本文介绍了协程的基本概念和用法,通过使用Boost.Asio框架实现了高效的协程封装,使用同步的方式编写异步代码带来的简洁性和代码健壮性,无需处理复杂的状态扭转,让开发更好的关注业务代码的实现,用更低的成本实现复杂的并发任务。笔者提供了我们在工程中常见的协程使用案例,比如:
- 使用
awaitable
来声明一个协程方法 - 使用
asio::steady_timer
来实现定时逻辑 - 使用
boost::signals2::signal
和post
方法来实现复杂的异步信号处理 - 使用
&&
和||
来实现类似Promise机制中的并发任务模型
通过以上的实践,基本可以满足90%以上业务开发当中的的异步编程场景,未来也可以继续深入学习异步编程的本质,探索更加高效和优雅的实现方式。