asio 调度器实现 - strand 实现详解

2023-05-23 10:35:23 浏览数 (1)

只有一个线程在执行scheduler::run() 的情况, 我们不需要担心任务执行的先后顺序,它们始终是严格按照post()的先后顺序来执行的. 那么如果我们更多的利用多核, 使用多个线程执行同一个contextrun(), 那任务被哪个线程调度到并执行, 会变成一个不可预测的事情, 这种情况下, 如果任务之间存在依赖, 我们又不希望在业务侧过多的使用同步原语, 应该怎么做呢? 答案就是本篇的主角 - asio::strand.

1. 多线程 run()strand 的示例

我们先来看一下相关的示例代码:

代码语言:javascript复制
asio::io_context ctx{};
    auto wg = asio::make_work_guard(ctx);
    std::thread tmp_thread([&ctx] { ctx.run(); });
    std::thread tmp_thread1([&ctx] { ctx.run(); });
    std::thread tmp_thread2([&ctx] { ctx.run(); });
    std::thread tmp_thread3([&ctx] { ctx.run(); });

    std::allocator<void> alloc;

    char buf[256] = {0};
    for (int i = 0; i < 10; i  ) {
        sprintf_s(buf, sizeof(buf), "task id: %d run!", i);
        std::string tmpstr(buf);
        ctx.get_executor().post([tmpstr] {
            std::cout << tmpstr.c_str() << std::endl;
        }, alloc);
      }

    std::this_thread::sleep_for(5s);

能够相象得到, 在多核电脑上, 我们得到的输出必然不是一个整齐的从0到9的输出:

代码语言:javascript复制
task id: 0 run!task id: 2 run!

task id: 1 run!
task id: 3 run!
task id: 5 run!
task id: 7 run!
task id: 6 run!
task id: 4 run!
task id: 8 run!
task id: 9 run!

通过使用 strand, 我们对上面的代码稍作调整, 变为下面的实现:

代码语言:javascript复制
asio::io_context ctx{};
    auto wg = asio::make_work_guard(ctx);
    std::thread tmp_thread([&ctx] { ctx.run(); });
    std::thread tmp_thread1([&ctx] { ctx.run(); });
    std::thread tmp_thread2([&ctx] { ctx.run(); });
    std::thread tmp_thread3([&ctx] { ctx.run(); });

    std::allocator<void> alloc;

    asio::io_context::strand strand(ctx);
    char buf[256] = {0};
    for (int i = 0; i < 10; i  ) {
        sprintf_s(buf, sizeof(buf), "task id: %d run!", i);
        std::string tmpstr(buf);
        strand.post([tmpstr] {
            std::cout << tmpstr.c_str() << std::endl;
        }, alloc);
      }

    std::this_thread::sleep_for(5s);

调整后的执行结果为:

代码语言:javascript复制
task id: 0 run!
task id: 1 run!
task id: 2 run!
task id: 3 run!
task id: 4 run!
task id: 5 run!
task id: 6 run!
task id: 7 run!
task id: 8 run!
task id: 9 run!

我们发现所有task已经按照post顺序逐一打印了, 这是如何做到的呢? 我们来具体展开 asio::strand 相关的实现代码了解其中的机制.


2 strand 的实现细节

因为strand的特殊性, 肯定是没有办法直接使用前面介绍的普通任务的post()机制和相关的operation包装来完成相关的封装的, 我们分为三个部分来分析strand的实现: 1. strand相关的operation定义 2. strand上的task的投递 3. strand上的task的执行


2.1 strand相关的operation定义

代码语言:javascript复制
// The underlying implementation of a strand.
  class strand_impl
    : public operation
  {
  public:
    strand_impl(): operation(&strand_service::do_complete), locked_(false)
  private:
    // Only this service will have access to the internal values.
    friend class strand_service;
    friend struct on_do_complete_exit;
    friend struct on_dispatch_exit;

    // Mutex to protect access to internal data.
    asio::detail::mutex mutex_;

    // Indicates whether the strand is currently "locked" by a handler. This
    // means that there is a handler upcall in progress, or that the strand
    // itself has been scheduled in order to invoke some pending handlers.
    bool locked_;

    // The handlers that are waiting on the strand but should not be run until
    // after the next time the strand is scheduled. This queue must only be
    // modified while the mutex is locked.
    op_queue<operation> waiting_queue_;

    // The handlers that are ready to be run. Logically speaking, these are the
    // handlers that hold the strand's lock. The ready queue is only modified
    // from within the strand and so may be accessed without locking the mutex.
    op_queue<operation> ready_queue_;
  };

这部分代码本身注释比较多, 我们主要关注几点: 1. 构造函数处, 我们将strand_implcomplete关联到了strand_service::do_complete()处 2. 首先strandoperation本身是带锁的, 后面也会提到, 相关的锁粒度非常小. 3. strandoperation包含两个队列, 一个ready_queue_和一个waiting_queue_. 4. 一个locked_标志, 这些共同配合, 使得strand能够达成最小粒度锁的实现. 5. 注释比较详细, 结合相关的post和complete过程理解更佳.


2.2 strand上的task投递

strand::post()的执行过程如下: 1. strand::post()开始执行 2. 内部会触发strand_service::post()的执行 3. 会继续触发strand_service::do_post()的执行 我们挨级分析相关的实现重点:


2.2.1 level 1: strand::post()

代码语言:javascript复制
template <typename Function, typename Allocator>
 void post(Function&& f, const Allocator& a) const
 {
 typename std::decay<Function>::type tmp(static_cast<Function&&>(f));
 service_.post(impl_, tmp);
 (void)a;
 }

这里就是很简单的将 Function类型退化后, 继续调用strand_service::post(), 注意此处直接抛弃了外部传递的allocator, 应该是1.16版本实现不完整, 直接没给strand的operation匹配正确的allocator, 翻阅1.22的代码实现, 这部分的allocator是有被正确处理的, 对于我们来说这处细节影响不大, 我们直接忽略.


2.2.2 level 2: strand_service::post()

代码语言:javascript复制
template <typename Handler>
void strand_service::post(strand_service::implementation_type& impl,
    Handler& handler)
{
  bool is_continuation =
    asio_handler_cont_helpers::is_continuation(handler);

  // Allocate and construct an operation to wrap the handler.
  typedef completion_handler<Handler> op;
  typename op::ptr p = { asio::detail::addressof(handler),
    op::ptr::allocate(handler), 0 };
  p.p = new (p.v) op(handler);

  ASIO_HANDLER_CREATION((this->context(),
        *p.p, "strand", impl, 0, "post"));

  do_post(impl, p.p, is_continuation);
  p.v = p.p = 0;
}

这部分代码基本就是我们之前分析过的scheduler::post()的翻版了, 略有差异的地方是此处使用的不是execution_op, 而是使用了另外一个类型 completion_handler<Handler>, 该类型的实现与execution_op基本没有太大的差别, 除了completion_handler本身不保存外部传入的Alloc这点外. 它本身也是完成对传入的Function的类型擦除, 提供一个统一类型的do_completion()接口, 方便scheduler侧对相应的task进行延迟调用, 相关的实现对比之前讲述的post()部分可以比较好的理解, 这里不再赘述了.


2.2.3 level 3: strand_service::do_post()

代码语言:javascript复制
void strand_service::do_post(implementation_type& impl,
    operation* op, bool is_continuation)
{
  impl->mutex_.lock();
  if (impl->locked_)
  {
    // Some other handler already holds the strand lock. Enqueue for later.
    impl->waiting_queue_.push(op);
    impl->mutex_.unlock();
  }
  else
  {
    // The handler is acquiring the strand lock and so is responsible for
    // scheduling the strand.
    impl->locked_ = true;
    impl->mutex_.unlock();
    impl->ready_queue_.push(op);
    io_context_.post_immediate_completion(impl, is_continuation);
  }
}

这部分也有很详细的注释, 详细解释了分支的处理情况: 1. impl(就是strand独有的strand_impl这个operation) locked_标识为true, 则将任务推送至waiting_queue_; 2. impl->locked_标识为false, 则将任务推送至ready_queue_, 并将locked_标识置为true;

此处我们需要注意第2种情况锁的释放时机, 锁是在标识设置完成后立即解锁的, 然后马上执行io_context::post_immediate_completion()impl本身推送至scheduler等待执行. 实际上我们也可以很直白的来理解它, 当strand刚开始工作时, 我们推送一个任务, 必然走的是2这个分支, 如果推送的任务没有得到及时的执行, 那么locked_标识依然还是true, 则后续推送的任务会被加入到waiting_queue_, 而因为waiting_queue_本身是带锁的, 这也不难理解, 为什么通过strand投递任务后, 所有任务的执行都会是有序的了. 实际过程会比这段描述的更复杂一点, 后续讲述执行的过程中会逐步展开. post_immediate_completion()的代码实现前面scheduler部分我们已经具体展开了, 此处就不再细说了.


2.3 strand 上的 task 执行

前面介绍strand_impl的时候我们也提到过, strand_impl构造的时候将自己的complete()调用挂接到了strand_service::do_complete()处, 前面我们已经介绍过了scheduler中任务的执行时机 - run()中会通过调用do_run_one(), 获取到一个operation进行执行, 我们此处直接从strand_service::do_complete()进行分析:

代码语言:javascript复制
void strand_service::do_complete(void* owner, operation* base,
    const asio::error_code& ec, std::size_t /*bytes_transferred*/)
{
  if (owner)
  {
    strand_impl* impl = static_cast<strand_impl*>(base);

    // Indicate that this strand is executing on the current thread.
    call_stack<strand_impl>::context ctx(impl);

    // Ensure the next handler, if any, is scheduled on block exit.
    on_do_complete_exit on_exit;
    on_exit.owner_ = static_cast<io_context_impl*>(owner);
    on_exit.impl_ = impl;

    // Run all ready handlers. No lock is required since the ready queue is
    // accessed only within the strand.
    while (operation* o = impl->ready_queue_.front())
    {
      impl->ready_queue_.pop();
      o->complete(owner, ec, 0);
    }
  }
}

因为关联strand_service::do_complete()operation只会是strand_impl, 所以此处一开始我们就将base转换到了该类型. 剩下的代码就比较简单了, 对于所有ready_queue_中的operation, 我们挨个取出并执行. 比较特殊的地方是on_do_comple_exit这个scope对象, 我们仔细分析, 发现我们的waiting_queue_并没有被处理, 这其实就是由on_do_comple_exit这个scope对象通过RAII来完成的操作:

代码语言:javascript复制
struct strand_service::on_do_complete_exit
{
  io_context_impl* owner_;
  strand_impl* impl_;

  ~on_do_complete_exit()
  {
    impl_->mutex_.lock();
    impl_->ready_queue_.push(impl_->waiting_queue_);
    bool more_handlers = impl_->locked_ = !impl_->ready_queue_.empty();
    impl_->mutex_.unlock();

    if (more_handlers)
      owner_->post_immediate_completion(impl_, true);
  }
};

通过scope对象 on_do_complete_exit, 我们完成了将所有waiting_queue_包含的operationready_queue_转移的过程, 并且在ready_queue_非空的情况 , 我们会再次通过post_immediate_completion()将当前impl推送的scheduler中等待再次执行.


3. strand小结

asio::strand 机制利用内部小粒度的锁换来业务层可以用更简单的方式来处理并发任务的关联执行, 本身还是很巧妙的, 这种方式也有助于业务层写出易理解维护的代码, 同时小粒度的锁本身也会有比较好的性能表现.

0 人点赞