只有一个线程在执行scheduler::run()
的情况, 我们不需要担心任务执行的先后顺序,它们始终是严格按照post()
的先后顺序来执行的. 那么如果我们更多的利用多核, 使用多个线程执行同一个context
的run()
, 那任务被哪个线程调度到并执行, 会变成一个不可预测的事情, 这种情况下, 如果任务之间存在依赖, 我们又不希望在业务侧过多的使用同步原语, 应该怎么做呢? 答案就是本篇的主角 - asio::strand
.
1. 多线程 run()
和 strand
的示例
我们先来看一下相关的示例代码:
代码语言:javascript复制asio::io_context ctx{};
auto wg = asio::make_work_guard(ctx);
std::thread tmp_thread([&ctx] { ctx.run(); });
std::thread tmp_thread1([&ctx] { ctx.run(); });
std::thread tmp_thread2([&ctx] { ctx.run(); });
std::thread tmp_thread3([&ctx] { ctx.run(); });
std::allocator<void> alloc;
char buf[256] = {0};
for (int i = 0; i < 10; i ) {
sprintf_s(buf, sizeof(buf), "task id: %d run!", i);
std::string tmpstr(buf);
ctx.get_executor().post([tmpstr] {
std::cout << tmpstr.c_str() << std::endl;
}, alloc);
}
std::this_thread::sleep_for(5s);
能够相象得到, 在多核电脑上, 我们得到的输出必然不是一个整齐的从0到9的输出:
代码语言:javascript复制task id: 0 run!task id: 2 run!
task id: 1 run!
task id: 3 run!
task id: 5 run!
task id: 7 run!
task id: 6 run!
task id: 4 run!
task id: 8 run!
task id: 9 run!
通过使用 strand
, 我们对上面的代码稍作调整, 变为下面的实现:
asio::io_context ctx{};
auto wg = asio::make_work_guard(ctx);
std::thread tmp_thread([&ctx] { ctx.run(); });
std::thread tmp_thread1([&ctx] { ctx.run(); });
std::thread tmp_thread2([&ctx] { ctx.run(); });
std::thread tmp_thread3([&ctx] { ctx.run(); });
std::allocator<void> alloc;
asio::io_context::strand strand(ctx);
char buf[256] = {0};
for (int i = 0; i < 10; i ) {
sprintf_s(buf, sizeof(buf), "task id: %d run!", i);
std::string tmpstr(buf);
strand.post([tmpstr] {
std::cout << tmpstr.c_str() << std::endl;
}, alloc);
}
std::this_thread::sleep_for(5s);
调整后的执行结果为:
代码语言:javascript复制task id: 0 run!
task id: 1 run!
task id: 2 run!
task id: 3 run!
task id: 4 run!
task id: 5 run!
task id: 6 run!
task id: 7 run!
task id: 8 run!
task id: 9 run!
我们发现所有task已经按照post顺序逐一打印了, 这是如何做到的呢? 我们来具体展开 asio::strand
相关的实现代码了解其中的机制.
2 strand
的实现细节
因为strand
的特殊性, 肯定是没有办法直接使用前面介绍的普通任务的post()
机制和相关的operation
包装来完成相关的封装的, 我们分为三个部分来分析strand的实现: 1. strand
相关的operation
定义 2. strand
上的task
的投递 3. strand
上的task
的执行
2.1 strand
相关的operation
定义
代码语言:javascript复制// The underlying implementation of a strand.
class strand_impl
: public operation
{
public:
strand_impl(): operation(&strand_service::do_complete), locked_(false)
private:
// Only this service will have access to the internal values.
friend class strand_service;
friend struct on_do_complete_exit;
friend struct on_dispatch_exit;
// Mutex to protect access to internal data.
asio::detail::mutex mutex_;
// Indicates whether the strand is currently "locked" by a handler. This
// means that there is a handler upcall in progress, or that the strand
// itself has been scheduled in order to invoke some pending handlers.
bool locked_;
// The handlers that are waiting on the strand but should not be run until
// after the next time the strand is scheduled. This queue must only be
// modified while the mutex is locked.
op_queue<operation> waiting_queue_;
// The handlers that are ready to be run. Logically speaking, these are the
// handlers that hold the strand's lock. The ready queue is only modified
// from within the strand and so may be accessed without locking the mutex.
op_queue<operation> ready_queue_;
};
这部分代码本身注释比较多, 我们主要关注几点: 1. 构造函数处, 我们将strand_impl
的complete
关联到了strand_service::do_complete()
处 2. 首先strand
的operation
本身是带锁的, 后面也会提到, 相关的锁粒度非常小. 3. strand
的operation
包含两个队列, 一个ready_queue_
和一个waiting_queue_
. 4. 一个locked_标志, 这些共同配合, 使得strand能够达成最小粒度锁的实现. 5. 注释比较详细, 结合相关的post和complete过程理解更佳.
2.2 strand
上的task
投递
strand::post()
的执行过程如下: 1. strand::post()
开始执行 2. 内部会触发strand_service::post()
的执行 3. 会继续触发strand_service::do_post()
的执行 我们挨级分析相关的实现重点:
2.2.1 level 1: strand::post()
代码语言:javascript复制template <typename Function, typename Allocator>
void post(Function&& f, const Allocator& a) const
{
typename std::decay<Function>::type tmp(static_cast<Function&&>(f));
service_.post(impl_, tmp);
(void)a;
}
这里就是很简单的将 Function类型退化后, 继续调用strand_service::post(), 注意此处直接抛弃了外部传递的allocator, 应该是1.16版本实现不完整, 直接没给strand的operation匹配正确的allocator, 翻阅1.22的代码实现, 这部分的allocator是有被正确处理的, 对于我们来说这处细节影响不大, 我们直接忽略.
2.2.2 level 2: strand_service::post()
代码语言:javascript复制template <typename Handler>
void strand_service::post(strand_service::implementation_type& impl,
Handler& handler)
{
bool is_continuation =
asio_handler_cont_helpers::is_continuation(handler);
// Allocate and construct an operation to wrap the handler.
typedef completion_handler<Handler> op;
typename op::ptr p = { asio::detail::addressof(handler),
op::ptr::allocate(handler), 0 };
p.p = new (p.v) op(handler);
ASIO_HANDLER_CREATION((this->context(),
*p.p, "strand", impl, 0, "post"));
do_post(impl, p.p, is_continuation);
p.v = p.p = 0;
}
这部分代码基本就是我们之前分析过的scheduler::post()
的翻版了, 略有差异的地方是此处使用的不是execution_op
, 而是使用了另外一个类型 completion_handler<Handler>
, 该类型的实现与execution_op
基本没有太大的差别, 除了completion_handler
本身不保存外部传入的Alloc
这点外. 它本身也是完成对传入的Function的类型擦除, 提供一个统一类型的do_completion()
接口, 方便scheduler
侧对相应的task
进行延迟调用, 相关的实现对比之前讲述的post()
部分可以比较好的理解, 这里不再赘述了.
2.2.3 level 3: strand_service::do_post()
代码语言:javascript复制void strand_service::do_post(implementation_type& impl,
operation* op, bool is_continuation)
{
impl->mutex_.lock();
if (impl->locked_)
{
// Some other handler already holds the strand lock. Enqueue for later.
impl->waiting_queue_.push(op);
impl->mutex_.unlock();
}
else
{
// The handler is acquiring the strand lock and so is responsible for
// scheduling the strand.
impl->locked_ = true;
impl->mutex_.unlock();
impl->ready_queue_.push(op);
io_context_.post_immediate_completion(impl, is_continuation);
}
}
这部分也有很详细的注释, 详细解释了分支的处理情况: 1. impl
(就是strand
独有的strand_impl
这个operation
) locked_
标识为true
, 则将任务推送至waiting_queue_
; 2. impl->locked_
标识为false
, 则将任务推送至ready_queue_
, 并将locked_
标识置为true
;
此处我们需要注意第2种情况锁的释放时机, 锁是在标识设置完成后立即解锁的, 然后马上执行io_context::post_immediate_completion()
将impl
本身推送至scheduler
等待执行. 实际上我们也可以很直白的来理解它, 当strand
刚开始工作时, 我们推送一个任务, 必然走的是2这个分支, 如果推送的任务没有得到及时的执行, 那么locked_
标识依然还是true
, 则后续推送的任务会被加入到waiting_queue_
, 而因为waiting_queue_
本身是带锁的, 这也不难理解, 为什么通过strand投递任务后, 所有任务的执行都会是有序的了. 实际过程会比这段描述的更复杂一点, 后续讲述执行的过程中会逐步展开. post_immediate_completion()
的代码实现前面scheduler
部分我们已经具体展开了, 此处就不再细说了.
2.3 strand
上的 task
执行
前面介绍strand_impl
的时候我们也提到过, strand_impl
构造的时候将自己的complete()
调用挂接到了strand_service::do_complete()
处, 前面我们已经介绍过了scheduler
中任务的执行时机 - run()
中会通过调用do_run_one()
, 获取到一个operation
进行执行, 我们此处直接从strand_service::do_complete()
进行分析:
void strand_service::do_complete(void* owner, operation* base,
const asio::error_code& ec, std::size_t /*bytes_transferred*/)
{
if (owner)
{
strand_impl* impl = static_cast<strand_impl*>(base);
// Indicate that this strand is executing on the current thread.
call_stack<strand_impl>::context ctx(impl);
// Ensure the next handler, if any, is scheduled on block exit.
on_do_complete_exit on_exit;
on_exit.owner_ = static_cast<io_context_impl*>(owner);
on_exit.impl_ = impl;
// Run all ready handlers. No lock is required since the ready queue is
// accessed only within the strand.
while (operation* o = impl->ready_queue_.front())
{
impl->ready_queue_.pop();
o->complete(owner, ec, 0);
}
}
}
因为关联strand_service::do_complete()
的operation
只会是strand_impl
, 所以此处一开始我们就将base
转换到了该类型. 剩下的代码就比较简单了, 对于所有ready_queue_
中的operation
, 我们挨个取出并执行. 比较特殊的地方是on_do_comple_exit
这个scope
对象, 我们仔细分析, 发现我们的waiting_queue_
并没有被处理, 这其实就是由on_do_comple_exit
这个scope对象通过RAII来完成的操作:
struct strand_service::on_do_complete_exit
{
io_context_impl* owner_;
strand_impl* impl_;
~on_do_complete_exit()
{
impl_->mutex_.lock();
impl_->ready_queue_.push(impl_->waiting_queue_);
bool more_handlers = impl_->locked_ = !impl_->ready_queue_.empty();
impl_->mutex_.unlock();
if (more_handlers)
owner_->post_immediate_completion(impl_, true);
}
};
通过scope
对象 on_do_complete_exit
, 我们完成了将所有waiting_queue_
包含的operation
向ready_queue_
转移的过程, 并且在ready_queue_
非空的情况 , 我们会再次通过post_immediate_completion()
将当前impl推送的scheduler
中等待再次执行.
3. strand
小结
asio::strand
机制利用内部小粒度的锁换来业务层可以用更简单的方式来处理并发任务的关联执行, 本身还是很巧妙的, 这种方式也有助于业务层写出易理解维护的代码, 同时小粒度的锁本身也会有比较好的性能表现.