上一篇文章,介绍了TaskControl(简称TC)的初始化逻辑、worker的基本概念,并引出了TaskGroup(简称TG)的主要函数:run_main_task()。在谈run_main_task()之前,我们先看一下TG的几个主要成员。
TG的主要成员
讲到TG先看TG的主要成员:
代码语言:javascript复制 size_t _steal_seed;
size_t _steal_offset;
ContextualStack* _main_stack;
bthread_t _main_tid;
WorkStealingQueue<bthread_t> _rq;
RemoteTaskQueue _remote_rq;
每个TG都维护自己一个单独的栈指针:_main_stack和_main_tid。也就是是说TG中有一个特殊的TM。我姑且称之为“主TM”。这两个是在TG初始化的时候赋值的。
每个TG有两个TM的队列:rq和remote_rq,它们之间有啥区别呢?
rq 与 remote_rq
通过在代码里搜索这两个队列入队的逻辑,可以发现。当调用bthread_start_background()创建bthread任务时,其内部会继续调用TG的ready_to_run(),接着push_rq()函数,给TG的rq入队。而remote_rq队列的入队是是通过执行TG的ready_to_run_remote()完成的。
再看一下ready_to_run_remote注释:
代码语言:javascript复制 // Push a bthread into the runqueue from another non-worker thread.
void ready_to_run_remote(bthread_t tid, bool nosignal = false);
在没有woker(TG)的线程中把bthread入队,只能入到有worder线程中的TG的remote_rq队列。
再看下ready_to_run_remote()的调用的地方。
在butex_wake()中:
代码语言:javascript复制 TaskGroup* g = tls_task_group;
if (g) {
TaskGroup::exchange(&g, bbw->tid);
} else {
bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
}
在start_background()中:
代码语言:javascript复制template <bool REMOTE>
int TaskGroup::start_background(bthread_t* __restrict th,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
...
...
if (REMOTE) {
ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
}
return 0;
}
start_background<>()
模板参数为true的时候被调用ready_to_run_remote()
。而在start_from_non_worker()
中,会调用start_background<true>()
!
好了,言归正传。
TaskGroup::run_main_task()
run_main_task(),去掉一些bvar相关的代码,这个函数也异常简洁。
代码语言:javascript复制void TaskGroup::run_main_task() {
...
TaskGroup* dummy = this;
bthread_t tid;
while (wait_task(&tid)) {
TaskGroup::sched_to(&dummy, tid);
DCHECK_EQ(this, dummy);
DCHECK_EQ(_cur_meta->stack, _main_stack);
if (_cur_meta->tid != _main_tid) {
TaskGroup::task_runner(1/*skip remained*/);
}
...
}
// Don't forget to add elapse of last wait_task.
current_task()->stat.cputime_ns = butil::cpuwide_time_ns() - _last_run_ns;
}
死循环执行wait_task来等待有效的任务,如果能等到任务,wait_task()的出参tid(bthread_t类型)会记录这个任务的ID号。好了,拿到任务ID号tid后,执行sched_to函数来切换栈。在进行了一些check工作后,判断如果当前的tid不是TG的主要tid(main_tid)则执行:TaskGroup::task_runner(1);
由此观之,我们发现三个关键函数:wait_task()
、sched_to()
、 task_runner()
简述一下他们的基本功能:
- wait_task():找到一个任务。其中会涉及工作窃取(work stealing)。
- sched_to():进行栈、寄存器等运行时上下文的切换,为接下来运行的任务恢复其上下文。
- task_runner():执行任务。
现在我们的观察视角终于可以切入到“work stealing”了。
工作窃取(work stealing)
首先声明,work stealing不是协程的专利,更不是Go语言的专利。work stealing是一种通用的实现负载均衡的算法。这里的负载均衡说的不是像Nginx那种对于外部网络请求做负载均衡,此处指的是每个CPU处理任务时,每个核的负载均衡。不止协程,其实线程池也可以做work stealing。
20世纪90年代,MIT的Charles E. Leiserson 教授发起并指导了CILK项目。该项目发表了许多论文,启发了各种使用“工作窃取”的基于任务的调度器。
TaskGroup::wait_task()
看源码,这里简化起见,去掉了BTHREAD_DONT_SAVE_PARKING_STATE条件宏判断逻辑相关
代码语言:javascript复制bool TaskGroup::wait_task(bthread_t* tid) {
do {
if (_last_pl_state.stopped()) {
return false;
}
_pl->wait(_last_pl_state);
if (steal_task(tid)) {
return true;
}
} while (true);
}
_pl是ParkingLot*类型,_last_plstate是pl中的state。关于它俩的更多介绍,后面会有其他文章。
_pl->wait(_last_pl_state)内部调用的futex做的wait操作,这里可以简单理解为阻塞等待被通知来终止阻塞,当阻塞结束之后,执行steal_task()来进行工作窃取。如果窃取成功则返回。
代码语言:javascript复制TaskGoup::steal_task()
bool steal_task(bthread_t* tid) {
if (_remote_rq.pop(tid)) {
return true;
}
_last_pl_state = _pl->get_state();
return _control->steal_task(tid, &_steal_seed, _steal_offset);
}
首先TG的remote_rq队列中的任务出队,如果没有则同全局TC来窃取任务。
视角从TG跳出,来看一看TC的steal_task()
TaskControl::steal_task
代码语言:javascript复制bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
// 1: Acquiring fence is paired with releasing fence in _add_group to
// avoid accessing uninitialized slot of _groups.
const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
if (0 == ngroup) {
return false;
}
// NOTE: Don't return inside `for' iteration since we need to update |seed|
bool stolen = false;
size_t s = *seed;
for (size_t i = 0; i < ngroup; i, s = offset) {
TaskGroup* g = _groups[s % ngroup];
// g is possibly NULL because of concurrent _destroy_group
if (g) {
if (g->_rq.steal(tid)) {
stolen = true;
break;
}
if (g->_remote_rq.pop(tid)) {
stolen = true;
break;
}
}
}
*seed = s;
return stolen;
}
可以看出是随机找一个TG,先从它的rq队列窃取任务,如果失败再从它的remote_rq队列窃取任务。在消费的时候rq比remote_rq有更高的优先级,显而易见,我们一定是想先执行有woker的线程自己push到队列中的bthread,然后再消费其他线程push给自己的bthread。
通过上面三个函数可以看出TaskGroup::wait_task() 在等待任务的时候,是优先获取当前TG的remote_rq,然后是依次窃取其他TG的rq、remote_rq。它并没有从当前TG的rq找任务!这是为什么呢?原因是避免race condition。也就是避免多个TG 等待任务的时候,当前TG从rq取任务,与其他TG过来自己这边窃取任务造成竞态。从而提升一点点的性能。
那么当前TG的rq是什么时候被消费的呢?
在TG的ending_sched()函数中有rq的出队操作,而ending_sched()在task_runner中被调用,task_runner也是run_main_task()的三个关键函数之一。
TaskGroup::task_runner()
代码语言:javascript复制void TaskGroup::task_runner(intptr_t skip_remained) {
TaskGroup* g = tls_task_group;
if (!skip_remained) {
while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = tls_task_group;
}
...
}
在run_main_task()中task_runner()的输入参数是1,所以上面的if逻辑会被跳过。这里忽略这个if,继续向下看,下面是一个很长的do-while循环(去掉一些日志和bvar相关逻辑,补充注释):
代码语言:javascript复制 do {
// Meta and identifier of the task is persistent in this run.
TaskMeta* const m = g->_cur_meta;
...
// 执行TM(bthread)中的回调函数
void* thread_return;
try {
thread_return = m->fn(m->arg);
} catch (ExitException& e) {
thread_return = e.value();
}
// Group is probably changed
g = tls_task_group;
// TODO: Save thread_return
(void)thread_return;
... 日志
// 清理 线程局部变量(下面是原注释)
// Clean tls variables, must be done before changing version_butex
// otherwise another thread just joined this thread may not see side
// effects of destructing tls variables.
KeyTable* kt = tls_bls.keytable;
if (kt != NULL) {
return_keytable(m->attr.keytable_pool, kt);
// After deletion: tls may be set during deletion.
tls_bls.keytable = NULL;
m->local_storage.keytable = NULL; // optional
}
// 累加版本号,且版本号不能为0(下面是原注释)
// Increase the version and wake up all joiners, if resulting version
// is 0, change it to 1 to make bthread_t never be 0. Any access
// or join to the bthread after changing version will be rejected.
// The spinlock is for visibility of TaskGroup::get_attr.
{
BAIDU_SCOPED_LOCK(m->version_lock);
if (0 == *m->version_butex) {
*m->version_butex;
}
}
// 唤醒joiner
butex_wake_except(m->version_butex, 0);
// _nbthreads减1(注意_nbthreads不是整型)
g->_control->_nbthreads << -1;
g->set_remained(TaskGroup::_release_last_context, m);
// 查找下一个任务,并切换到其对应的运行时上下文
ending_sched(&g);
} while (g->_cur_meta->tid != g->_main_tid);
do while循环中会执行回调函数,结束的时候会查找下一个任务,并切换上下文。循环的终止条件是tls_task_group的_cur_meta不等于其_main_tid。
在ending_sched()中,会有依次从TG的rq、remote_rq取任务,找不到再窃取其他TG的任务,如果都找不到任务,则设置_cur_meta为_main_tid,也就是让task_runner()的循环终止。
然后就会回到run_main_task()的主循环,继续wait_task()等待新任务了。
好了,run_main_task()的三大关键函数,已过其二,还剩下一个sched_to()还未揭开其庐山真面,下一篇文章,我来带大家解读sched_to()。之所以把它单独成篇,是因为会涉及一些汇编的知识,读起来可能晦涩艰深。大家做好准备!