Envoy:event相关代码阅读(二)

2023-10-30 16:04:52 浏览数 (3)

本篇文章试图来介绍envoy的事件处理部分的代码,对于envoy来说是基于libevent做了简单封装来实现的异步调度。

本篇文章会从下面两部分来进行讲解,libevent的基础知识介绍,envoy中event的类的实现和event在envoy中的调度逻辑,本篇介绍第二部分内容。

一、envoy中event相关类的介绍

envoy将libevent的三类事件做了一个简单的封装,如下图所示:

signal类

timer类

文件类:

envoy核心处理事件的逻辑主要是在Dispatcherimpl里面。

二、envoy中事件调度的逻辑介绍

DispatcherImpl 类里面维护了一个 post_callbacks_队列,用于存储这些事件触发的callback函数,通过生产者、消费者模式进行互动来进行操作。 (一)生产者的实现方式: 使用post的入口,以及这部分postcallbacks都有哪一些?这里有三类生产者,分别是: 1.guarddog的postcallback:

代码语言:javascript复制
[&guarddog_thread_started]() { guarddog_thread_started.Notify(); }

设置postcallback的代码位置:

代码语言:javascript复制
void GuardDogImpl::start(Api::Api& api) {
  Thread::LockGuard guard(mutex_);

  // Synchronize between calling thread and guarddog thread.
  absl::Notification guarddog_thread_started;

  // See comments in WorkerImpl::start for the naming convention.
  Thread::Options options{absl::StrCat("dog:", dispatcher_->name())};
  thread_ = api.threadFactory().createThread(
      [this, &guarddog_thread_started]() -> void {
        loop_timer_->enableTimer(std::chrono::milliseconds(0));
        dispatcher_->post([&guarddog_thread_started]() { guarddog_thread_started.Notify(); });
        // 事件触发方式是
        // Runs the event-loop until loopExit() is called, blocking
        // until there are pending or active events.
        dispatcher_->run(Event::Dispatcher::RunType::RunUntilExit);
      },
      options);

  guarddog_thread_started.WaitForNotification();
}

2.Server里的InstanceImpl实现: 这里对应的是主线程,对应的postcallback:

代码语言:javascript复制
[this] { notifyCallbacksForStage(Stage::Startup); }

实现代码的位置

代码语言:javascript复制
void InstanceImpl::run() {
  // RunHelper exists primarily to facilitate testing of how we respond to early shutdown during
  // startup (see RunHelperTest in server_test.cc).
  const auto run_helper = RunHelper(*this, options_, *dispatcher_, clusterManager(),
                                    access_log_manager_, init_manager_, overloadManager(), [this] {
                                      notifyCallbacksForStage(Stage::PostInit);
                                      startWorkers();
                                    });
 
  // Run the main dispatch loop waiting to exit.
  ENVOY_LOG(info, "starting main dispatch loop");
  auto watchdog = main_thread_guard_dog_->createWatchDog(api_->threadFactory().currentThreadId(),
                                                         "main_thread", *dispatcher_);
  dispatcher_->post([this] { notifyCallbacksForStage(Stage::Startup); });
  dispatcher_->run(Event::Dispatcher::RunType::Block); // Runs the event-loop until there are no pending events.
  ENVOY_LOG(info, "main dispatch loop exited");
  main_thread_guard_dog_->stopWatching(watchdog);
  watchdog.reset();
 
  terminate();
}

3.worker_impl实现的方式:// 这里对应的是worker 对应的postcallback,传递进来的cb是下面这个:

代码语言:javascript复制
[this, &guard_dog, cb]() {
cb();
watch_dog_ = guard_dog.createWatchDog(api_.threadFactory().currentThreadId(),
dispatcher_->name(), *dispatcher_);
}
// 上面对应的cb的代码实现如下所示:
[&workers_waiting_to_run]() {
workers_waiting_to_run.DecrementCount();
};

调用关系如下所示:startWorkers--->start--->createThread--->threadRoutine--->post // 最后一步添加的postcallback函数

代码语言:javascript复制
void ListenerManagerImpl::startWorkers(GuardDog& guard_dog, std::function<void()> callback) {
  ENVOY_LOG(info, "all dependencies initialized. starting workers");
  ASSERT(!workers_started_);
  workers_started_ = true;
  uint32_t i = 0;
 
  absl::BlockingCounter workers_waiting_to_run(workers_.size());
  Event::PostCb worker_started_running = [&workers_waiting_to_run]() {
    workers_waiting_to_run.DecrementCount();
  };
 
  // We can not use "Cleanup" to simplify this logic here, because it results in a issue if Envoy is
  // killed before workers are actually started. Specifically the AdminRequestGetStatsAndKill test
  // case in main_common_test fails with ASAN error if we use "Cleanup" here.
  const auto listeners_pending_init =
      std::make_shared<std::atomic<uint64_t>>(workers_.size() * active_listeners_.size());
  for (const auto& worker : workers_) {
    ENVOY_LOG(debug, "starting worker {}", i);
    ASSERT(warming_listeners_.empty());
    for (const auto& listener : active_listeners_) {
      addListenerToWorker(*worker, absl::nullopt, *listener,
                          [this, listeners_pending_init, callback]() {
                            if (--(*listeners_pending_init) == 0) {
                              stats_.workers_started_.set(1);
                              callback();
                            }
                          });
    }
    worker->start(guard_dog, worker_started_running); // 这里是入口最终调用的threadRoutine
    if (enable_dispatcher_stats_) {
      worker->initializeStats(*scope_);
    }
    i  ;
  }
 
// worker的启动入口,会调用threadRoutine
void WorkerImpl::start(GuardDog& guard_dog, const Event::PostCb& cb) {
  ASSERT(!thread_);
 
  // In posix, thread names are limited to 15 characters, so contrive to make
  // sure all interesting data fits there. The naming occurs in
  // ListenerManagerImpl's constructor: absl::StrCat("worker_", i). Let's say we
  // have 9999 threads. We'd need, so we need 7 bytes for "worker_", 4 bytes
  // for the thread index, leaving us 4 bytes left to distinguish between the
  // two threads used per dispatcher. We'll call this one "dsp:" and the
  // one allocated in guarddog_impl.cc "dog:".
  //
  // TODO(jmarantz): consider refactoring how this naming works so this naming
  // architecture is centralized, resulting in clearer names.
  Thread::Options options{absl::StrCat("wrk:", dispatcher_->name())};
  thread_ = api_.threadFactory().createThread(
      [this, &guard_dog, cb]() -> void { threadRoutine(guard_dog, cb); }, options);
}

threadRoutine核心代码逻辑:

代码语言:javascript复制
void WorkerImpl::threadRoutine(GuardDog& guard_dog, const Event::PostCb& cb) {
  ENVOY_LOG(debug, "worker entering dispatch loop");
  // The watch dog must be created after the dispatcher starts running and has post events flushed,
  // as this is when TLS stat scopes start working.
  dispatcher_->post([this, &guard_dog, cb]() {
    cb();
    watch_dog_ = guard_dog.createWatchDog(api_.threadFactory().currentThreadId(),
                                          dispatcher_->name(), *dispatcher_);
  });
  dispatcher_->run(Event::Dispatcher::RunType::Block);
  ENVOY_LOG(debug, "worker exited dispatch loop");
  guard_dog.stopWatching(watch_dog_);
  dispatcher_->shutdown();
 
  // We must close all active connections before we actually exit the thread. This prevents any
  // destructors from running on the main thread which might reference thread locals. Destroying
  // the handler does this which additionally purges the dispatcher delayed deletion list.
  handler_.reset();
  tls_.shutdownThread();
  watch_dog_.reset();
}

4.核心的生产postcallback的代码逻辑,post函数:

代码语言:javascript复制
void DispatcherImpl::post(std::function<void()> callback) {
  bool do_post;
  {
    Thread::LockGuard lock(post_lock_);
    do_post = post_callbacks_.empty();
    post_callbacks_.push_back(callback);
  }
 
// 构造函数对post_cb_进行了初始化操作:runPostCallbacks()这里做的事情是消费事件的处理逻辑
// post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })),
// 下面实际上是通过event_active激活去执行run 操作进行消费
  if (do_post) { // 这里表示的是当前线程没有事件执行的时候,去主动唤醒另外一个线程去处理它里面的内容
    post_cb_->scheduleCallbackCurrentIteration();
  }
}
 
// 进行event_active的激活操作,这里执行之后,在event队列里面会执行 上面设置的callback函数 runPostCallbacks()
void SchedulableCallbackImpl::scheduleCallbackCurrentIteration() {
  if (enabled()) { // 这里的实现参考下面的函数,主要是判断当前线程里面的raw_event有没有正在排队的时间,有的话,就直接返回了
    return;
  }
  // event_active directly adds the event to the end of the work queue so it executes in the current
  // iteration of the event loop.
  event_active(&raw_event_, EV_TIMEOUT, 0);
}
 
bool SchedulableCallbackImpl::enabled() { return 0 != evtimer_pending(&raw_event_, nullptr); }

(二)消费者的实现方式

run()核心代码,先执行callback函数,再触发event_base_loop()。

代码语言:javascript复制
void DispatcherImpl::run(RunType type) {
  run_tid_ = api_.threadFactory().currentThreadId();
  // Flush all post callbacks before we run the event loop. We do this because there are post
  // callbacks that have to get run before the initial event loop starts running. libevent does
  // not guarantee that events are run in any particular order. So even if we post() and call
  // event_base_once() before some other event, the other event might get called first.
  runPostCallbacks(); // 批量执行callback函数
  base_scheduler_.run(type);
}

1.runPostCallbacks:这个函数是核心消费逻辑

代码语言:javascript复制
void DispatcherImpl::runPostCallbacks() {
  // Clear the deferred delete list before running post callbacks to reduce non-determinism in
  // callback processing, and more easily detect if a scheduled post callback refers to one of the
  // objects that is being deferred deleted.
  clearDeferredDeleteList(); // 延迟删除上一次event触发之后的未清理的数据结构
 
  std::list<std::function<void()>> callbacks;
  {
    // Take ownership of the callbacks under the post_lock_. The lock must be released before
    // callbacks execute. Callbacks added after this transfer will re-arm post_cb_ and will execute
    // later in the event loop.
    Thread::LockGuard lock(post_lock_);
    // 这里先操作了copy动作,相当于把post_callbacks_的内容转移到callbacks了,这样是为了post_callbacks_可以用来继续做别的事情
    callbacks = std::move(post_callbacks_);
    // post_callbacks_ should be empty after the move.
    ASSERT(post_callbacks_.empty());
  }
  // It is important that the execution and deletion of the callback happen while post_lock_ is not
  // held. Either the invocation or destructor of the callback can call post() on this dispatcher.
  while (!callbacks.empty()) {
    // Touch the watchdog before executing the callback to avoid spurious watchdog miss events when
    // executing a long list of callbacks.
    touchWatchdog();
    // Run the callback.
    callbacks.front()(); // 这里是把这些callback从队列的头开始逐次去调用执行
    // Pop the front so that the destructor of the callback that just executed runs before the next
    // callback executes.
    callbacks.pop_front();// 执行完之后,就从这个callbacks队列里面删除掉
  }
}

2.触发event_base_loop()

代码语言:javascript复制
void LibeventScheduler::run(Dispatcher::RunType mode) {
  int flag = 0;
  switch (mode) {
  case Dispatcher::RunType::NonBlock:
    flag = LibeventScheduler::flagsBasedOnEventType();
  case Dispatcher::RunType::Block:
    // The default flags have 'block' behavior. See
    // http://www.wangafu.net/~nickm/libevent-book/Ref3_eventloop.html
    break;
  case Dispatcher::RunType::RunUntilExit:
    flag = EVLOOP_NO_EXIT_ON_EMPTY;
    break;
  }
  event_base_loop(libevent_.get(), flag); // 默认是NonBlock,这里触发事件循环
}

参考文档: 版本对应的是1.11.2: https://github.com/istio/proxy

https://blog.csdn.net/weixin_34198797/article/details/89627369?ops_request_misc=%7B%22request%5Fid%22%3A%22167715942416800180647283%22%2C%22scm%22%3A%2220140713.130102334..%22%7D&request_id=167715942416800180647283&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-1-89627369-null-null.142^v73^wechat,201^v4^add_ask,239^v2^insert_chatgpt&utm_term=envoy dispatcher&spm=1018.2226.3001.4187

0 人点赞