在获取数据的时候会根据每个 task 的 respChan 数据来做排序

2021-09-26 17:24:18 浏览数 (1)

在获取数据的时候会根据每个 task 的 respChan 数据来做排序

sender 会将所有的 task 放入到 taskCh 中,发送完毕之后关闭 channel。

下面再来看看 worker:

代码语言:javascript复制
Copyfunc (worker *copIteratorWorker) run(ctx context.Context) {
	defer func() {
		failpoint.Inject("ticase-4169", func(val failpoint.Value) {
			if val.(bool) {
				worker.memTracker.Consume(10 * MockResponseSizeForTest)
				worker.memTracker.Consume(10 * MockResponseSizeForTest)
			}
		})
		worker.wg.Done()
	}()
	for task := range worker.taskCh {
		respCh := worker.respChan
		// 这里是需要排序的时候为空,那么为每个 task 都创建一个 respChan
		if respCh == nil {
			respCh = task.respChan
		}
		// 发送rpc请求
		worker.handleTask(ctx, task, respCh)
		if worker.respChan != nil { 
			// 发送 finCopResp 到 respCh 中,告诉copIterator有一个task已经运行完毕了
			worker.sendToRespCh(finCopResp, worker.respChan, false)
		}
		close(task.respChan)
		if worker.vars != nil && worker.vars.Killed != nil && atomic.LoadUint32(worker.vars.Killed) == 1 {
			return
		}
		select {
		case <-worker.finishCh:
			return
		default:
		}
	}
}

worker 主要是处理 sender 发送过来的 taskCh 数据,通过遍历 taskCh 获取 task 之后调用 handleTask 发送 rpc 请求,返回的数据会放入到 respCh 中。需要注意这里如果是有序的 task ,那么 worker.respChan 为空,然后会为每个 task 创建一个 respChan,在获取数据的时候会根据每个 task 的 respChan 数据来做排序。

0 人点赞