在获取数据的时候会根据每个 task 的 respChan 数据来做排序
sender 会将所有的 task 放入到 taskCh 中,发送完毕之后关闭 channel。
下面再来看看 worker:
代码语言:javascript复制Copyfunc (worker *copIteratorWorker) run(ctx context.Context) {
defer func() {
failpoint.Inject("ticase-4169", func(val failpoint.Value) {
if val.(bool) {
worker.memTracker.Consume(10 * MockResponseSizeForTest)
worker.memTracker.Consume(10 * MockResponseSizeForTest)
}
})
worker.wg.Done()
}()
for task := range worker.taskCh {
respCh := worker.respChan
// 这里是需要排序的时候为空,那么为每个 task 都创建一个 respChan
if respCh == nil {
respCh = task.respChan
}
// 发送rpc请求
worker.handleTask(ctx, task, respCh)
if worker.respChan != nil {
// 发送 finCopResp 到 respCh 中,告诉copIterator有一个task已经运行完毕了
worker.sendToRespCh(finCopResp, worker.respChan, false)
}
close(task.respChan)
if worker.vars != nil && worker.vars.Killed != nil && atomic.LoadUint32(worker.vars.Killed) == 1 {
return
}
select {
case <-worker.finishCh:
return
default:
}
}
}
worker 主要是处理 sender 发送过来的 taskCh 数据,通过遍历 taskCh 获取 task 之后调用 handleTask 发送 rpc 请求,返回的数据会放入到 respCh 中。需要注意这里如果是有序的 task ,那么 worker.respChan
为空,然后会为每个 task 创建一个 respChan,在获取数据的时候会根据每个 task 的 respChan 数据来做排序。