API请求执行流程_milvus源码解析

2023-11-13 17:37:09 浏览数 (3)

API请求执行流程

1.milvus客户端发起api rpc请求,请求内容为request。

2.proxy接受api请求,将request包装为task。

3.将task压入队列。

4.调度器执行队列中的task。

以创建collection的API(CreateCollection)为例:

1.客户端发起创建collection的请求。

代码语言:python代码运行次数:0复制
from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 3000, 1024

print(f"start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus2", schema, consistency_level="Strong",shards_num=2)

2.proxy接受客户端发送过来的request,将其包装为createCollectionTask。

将createCollectionTask压入队列ddTaskQueue,等待调度器执行。

代码路径:internalproxyimpl.go

代码语言:go复制
func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
	...省略...
    // 将request包装为createCollectionTask
	cct := &createCollectionTask{
		ctx:                     ctx,
		Condition:               NewTaskCondition(ctx),
		CreateCollectionRequest: request,
		rootCoord:               node.rootCoord,
	}

	...省略...
    // 将task压入队列ddTaskQueue
	if err := node.sched.ddQueue.Enqueue(cct); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err))

	...省略...
    // 等待task执行完成
	if err := cct.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.Uint64("BeginTs", cct.BeginTs()),
			zap.Uint64("EndTs", cct.EndTs()))

	...省略...
}

3.调度器执行队列中的task。

会依次执行cct的PreExecute()、Execute()、PostExecute()方法。

PreExecute()一般用来做预处理。

Execute()真正执行task的任务。

PostExecute()用来task完成后执行的动作,一般直接返回nil,也就是什么都不做。

代码路径:internalproxytask.go

代码语言:go复制
type createCollectionTask struct {
	Condition
	*milvuspb.CreateCollectionRequest
	ctx       context.Context
	rootCoord types.RootCoordClient
	result    *commonpb.Status
	schema    *schemapb.CollectionSchema
}

func (t *createCollectionTask) PreExecute(ctx context.Context) error {
	...省略...
}

func (t *createCollectionTask) Execute(ctx context.Context) error {
	var err error
	t.result, err = t.rootCoord.CreateCollection(ctx, t.CreateCollectionRequest)
	return err
}

func (t *createCollectionTask) PostExecute(ctx context.Context) error {
	return nil
}

为什么会是PreExecute()、Execute()、PostExecute()这个顺序,这个就需要阅读task调度器的源码了。

代码路径:internalproxytask_scheduler.go

核心代码如下:

task压入队列后执行的是processTask()方法。

代码语言:go复制
func (sched *taskScheduler) processTask(t task, q taskQueue) {
	......

	err := t.PreExecute(ctx)

	......
	err = t.Execute(ctx)
	
	......
	err = t.PostExecute(ctx)
    ......
}

这里再思考另一个问题,processTask()是由谁调用的,调度器是什么时候启动的。

task_scheduler.go有一个方法Start()。由这个方法启动一个goroutine进行调度。

代码语言:go复制
// ddQueue *ddTaskQueue
// dmQueue *dmTaskQueue
// dqQueue *dqTaskQueue
// dcQueue *ddTaskQueue

func (sched *taskScheduler) Start() error {
	sched.wg.Add(1)
    // ddQueue的调度,数据定义的task
	go sched.definitionLoop()

	sched.wg.Add(1)
    // dcQueue的调度,数据控制的task
	go sched.controlLoop()

	sched.wg.Add(1)
    // dmQueue的调度,数据操作的task
	go sched.manipulationLoop()

	sched.wg.Add(1)
    // dqQueue的调度,数据查询的task
	go sched.queryLoop()

	return nil
}

createCollectionTask是数据定义语言,走go sched.definitionLoop()这条路径。

代码语言:go复制
// definitionLoop schedules the ddl tasks.
func (sched *taskScheduler) definitionLoop() {
	defer sched.wg.Done()
	for {
		select {
		case <-sched.ctx.Done():
			return
		case <-sched.ddQueue.utChan():
			if !sched.ddQueue.utEmpty() {
				t := sched.scheduleDdTask()
				sched.processTask(t, sched.ddQueue)
			}
		}
	}
}

在这里可以看到processTask()方法的调用。for循环里,只要通道有值就会调用processTask()方法。

这样PreExecute()、Execute()、PostExecute()的逻辑就搞清楚了。

0 人点赞