# CreateIndex API执行流程_milvus源码解析2

2023-11-30 15:14:24 浏览数 (1)

CreateIndex API执行流程源码解析2

milvus版本:v2.3.2

上一篇介绍了CreateIndex对etcd元数据的操作,这里介绍另一个操作。

整体架构:

CreateIndex 的数据流向:

1.dataCoord执行CreateIndex。

代码语言:python代码运行次数:0复制
func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
	......
	// 分配indexID,indexID=0
	indexID, err := s.meta.CanCreateIndex(req)
	......

	err = s.meta.CreateIndex(index)
	......

	// 将collectionID发送到channel,其它的goroutine进行消费。
	select {
	case s.notifyIndexChan <- req.GetCollectionID():
	default:
	}

	......
}

上一篇已经分析过s.meta.CreateIndex(),这里重点分析s.notifyIndexChan。将collectionID发送到channel,其它的goroutine进行消费。

2.消费notifyIndexChan也在dataCoord。

代码路径:internaldatacoordindex_service.go

启动dataCoord服务的时候会调用此方法。

代码语言:go复制
func (s *Server) createIndexForSegmentLoop(ctx context.Context) {
	log.Info("start create index for segment loop...")
	defer s.serverLoopWg.Done()

	ticker := time.NewTicker(time.Minute)
	defer ticker.Stop()
	for {
		select {
		case <-ctx.Done():
			log.Warn("DataCoord context done, exit...")
			return
		case <-ticker.C:
			segments := s.meta.GetHasUnindexTaskSegments()
			for _, segment := range segments {
				if err := s.createIndexesForSegment(segment); err != nil {
					log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segment.ID))
					continue
				}
			}
		case collectionID := <-s.notifyIndexChan:
			log.Info("receive create index notify", zap.Int64("collectionID", collectionID))
			// 获取collection的segment信息
			segments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
				return isFlush(info) && collectionID == info.CollectionID
			})
			for _, segment := range segments {
				if err := s.createIndexesForSegment(segment); err != nil {
					log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segment.ID))
					continue
				}
			}
		case segID := <-s.buildIndexCh:
			log.Info("receive new flushed segment", zap.Int64("segmentID", segID))
			segment := s.meta.GetSegment(segID)
			if segment == nil {
				log.Warn("segment is not exist, no need to build index", zap.Int64("segmentID", segID))
				continue
			}
			if err := s.createIndexesForSegment(segment); err != nil {
				log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segment.ID))
				continue
			}
		}
	}
}

走case collectionID := <-s.notifyIndexChan这个分支。

3.进入s.createIndexesForSegment()

代码语言:go复制
func (s *Server) createIndexesForSegment(segment *SegmentInfo) error {
	// 要创建的索引的信息:索引名称,维度等信息
	indexes := s.meta.GetIndexesForCollection(segment.CollectionID, "")
	for _, index := range indexes {
		if _, ok := segment.segmentIndexes[index.IndexID]; !ok {
			if err := s.createIndexForSegment(segment, index.IndexID); err != nil {
				log.Warn("create index for segment fail", zap.Int64("segmentID", segment.ID),
					zap.Int64("indexID", index.IndexID))
				return err
			}
		}
	}
	return nil
}

4.进入s.createIndexForSegment()

在segment上建立索引。

代码语言:go复制
func (s *Server) createIndexForSegment(segment *SegmentInfo, indexID UniqueID) error {
	log.Info("create index for segment", zap.Int64("segmentID", segment.ID), zap.Int64("indexID", indexID))
	buildID, err := s.allocator.allocID(context.Background())
	if err != nil {
		return err
	}
	segIndex := &model.SegmentIndex{
		SegmentID:    segment.ID,
		CollectionID: segment.CollectionID,
		PartitionID:  segment.PartitionID,
		NumRows:      segment.NumOfRows,
		IndexID:      indexID,
		BuildID:      buildID,
		CreateTime:   uint64(segment.ID),
		WriteHandoff: false,
	}
	// 添加segment-index
	// 前缀/segment-index/{collectionID}/{partitionID}/{segmentID}/{buildID}
	if err = s.meta.AddSegmentIndex(segIndex); err != nil {
		return err
	}
	// 入队,通知IndexNode执行索引创建任务
	s.indexBuilder.enqueue(buildID)
	return nil
}

segIndex变量:

s.meta.AddSegmentIndex(segIndex)在etcd添加segment-index。

==前缀/segment-index/{collectionID}/{partitionID}/{segmentID}/{buildID}==

代码语言:go复制
func (kc *Catalog) CreateSegmentIndex(ctx context.Context, segIdx *model.SegmentIndex) error {
    // key规则
	key := BuildSegmentIndexKey(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.BuildID)

	value, err := proto.Marshal(model.MarshalSegmentIndexModel(segIdx))
	if err != nil {
		return err
	}
    // 写入etcd
	err = kc.MetaKv.Save(key, string(value))
	if err != nil {
		log.Error("failed to save segment index meta in etcd", zap.Int64("buildID", segIdx.BuildID),
			zap.Int64("segmentID", segIdx.SegmentID), zap.Error(err))
		return err
	}
	return nil
}

5.进入s.indexBuilder.enqueue(buildID)

此函数作用:入队,通知IndexNode执行索引创建任务。

6.进入process()

代码语言:go复制
func (ib *indexBuilder) process(buildID UniqueID) bool {
	......
	// 有一个定时器,默认1秒,配置项indexCoord.scheduler.interval
	switch state {
	case indexTaskInit:
		segment := ib.meta.GetSegment(meta.SegmentID)
		if !isSegmentHealthy(segment) || !ib.meta.IsIndexExist(meta.CollectionID, meta.IndexID) {
			log.Ctx(ib.ctx).Info("task is no need to build index, remove it", zap.Int64("buildID", buildID))
			if err := ib.meta.DeleteTask(buildID); err != nil {
				log.Ctx(ib.ctx).Warn("IndexCoord delete index failed", zap.Int64("buildID", buildID), zap.Error(err))
				return false
			}
			deleteFunc(buildID)
			return true
		}
		indexParams := ib.meta.GetIndexParams(meta.CollectionID, meta.IndexID)
		if isFlatIndex(getIndexType(indexParams)) || meta.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() {
			// 数量小于最低数量无需建立索引,默认为1024
			log.Ctx(ib.ctx).Info("segment does not need index really", zap.Int64("buildID", buildID),
				zap.Int64("segmentID", meta.SegmentID), zap.Int64("num rows", meta.NumRows))
			if err := ib.meta.FinishTask(&indexpb.IndexTaskInfo{
				BuildID:        buildID,
				State:          commonpb.IndexState_Finished,
				IndexFileKeys:  nil,
				SerializedSize: 0,
				FailReason:     "",
			}); err != nil {
				log.Ctx(ib.ctx).Warn("IndexCoord update index state fail", zap.Int64("buildID", buildID), zap.Error(err))
				return false
			}
			updateStateFunc(buildID, indexTaskDone)
			return true
		}
		// peek client
		// if all IndexNodes are executing task, wait for one of them to finish the task.
		nodeID, client := ib.nodeManager.PeekClient(meta)
		if client == nil {
			log.Ctx(ib.ctx).WithRateGroup("dc.indexBuilder", 1, 60).RatedInfo(5, "index builder peek client error, there is no available")
			return false
		}
		// update version and set nodeID
		if err := ib.meta.UpdateVersion(buildID, nodeID); err != nil {
			log.Ctx(ib.ctx).Warn("index builder update index version failed", zap.Int64("build", buildID), zap.Error(err))
			return false
		}
		// binLogs为向量数据文件的位置信息
		// files/insert_log/444517122896489678/444517122896489679/444517122896489694/102/444517122896236031
		// files/insert_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/444517122896236031
		binLogs := make([]string, 0)
		fieldID := ib.meta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID)
		for _, fieldBinLog := range segment.GetBinlogs() {
			if fieldBinLog.GetFieldID() == fieldID {
				for _, binLog := range fieldBinLog.GetBinlogs() {
					binLogs = append(binLogs, binLog.LogPath)
				}
				break
			}
		}

		typeParams := ib.meta.GetTypeParams(meta.CollectionID, meta.IndexID)

		var storageConfig *indexpb.StorageConfig
		// 获取元数据存储类型:minio
		if Params.CommonCfg.StorageType.GetValue() == "local" {
			storageConfig = &indexpb.StorageConfig{
				RootPath:    Params.LocalStorageCfg.Path.GetValue(),
				StorageType: Params.CommonCfg.StorageType.GetValue(),
			}
		} else {
			storageConfig = &indexpb.StorageConfig{
				Address:          Params.MinioCfg.Address.GetValue(),
				AccessKeyID:      Params.MinioCfg.AccessKeyID.GetValue(),
				SecretAccessKey:  Params.MinioCfg.SecretAccessKey.GetValue(),
				UseSSL:           Params.MinioCfg.UseSSL.GetAsBool(),
				BucketName:       Params.MinioCfg.BucketName.GetValue(),
				RootPath:         Params.MinioCfg.RootPath.GetValue(),
				UseIAM:           Params.MinioCfg.UseIAM.GetAsBool(),
				IAMEndpoint:      Params.MinioCfg.IAMEndpoint.GetValue(),
				StorageType:      Params.CommonCfg.StorageType.GetValue(),
				Region:           Params.MinioCfg.Region.GetValue(),
				UseVirtualHost:   Params.MinioCfg.UseVirtualHost.GetAsBool(),
				CloudProvider:    Params.MinioCfg.CloudProvider.GetValue(),
				RequestTimeoutMs: Params.MinioCfg.RequestTimeoutMs.GetAsInt64(),
			}
		}
		req := &indexpb.CreateJobRequest{
			ClusterID:           Params.CommonCfg.ClusterPrefix.GetValue(),
			IndexFilePrefix:     path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath),
			BuildID:             buildID,
			DataPaths:           binLogs,
			IndexVersion:        meta.IndexVersion   1,
			StorageConfig:       storageConfig,
			IndexParams:         indexParams,
			TypeParams:          typeParams,
			NumRows:             meta.NumRows,
			CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
		}
		// 通知IndexNode创建索引
		if err := ib.assignTask(client, req); err != nil {
			......
		}
		......
		// update index meta state to InProgress
		// 更新索引状态
		if err := ib.meta.BuildIndex(buildID); err != nil {
			......
		}
		updateStateFunc(buildID, indexTaskInProgress)

	case indexTaskDone:
		......

	default:
		......
	}
	return true
}

ib.assignTask()通知indexNode创建索引。

7.进入ib.assignTask()

代码语言:go复制
// assignTask sends the index task to the IndexNode, it has a timeout interval, if the IndexNode doesn't respond within
// the interval, it is considered that the task sending failed.
func (ib *indexBuilder) assignTask(builderClient types.IndexNodeClient, req *indexpb.CreateJobRequest) error {
	......
    // rpc调用indexNode
	resp, err := builderClient.CreateJob(ctx, req)
	......
}

8.进入builderClient.CreateJob()

代码路径:internalindexnodeindexnode_service.go

代码语言:go复制
func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest) (*commonpb.Status, error) {
	......
	task := &indexBuildTask{
		ident:          fmt.Sprintf("%s/%d", req.ClusterID, req.BuildID),
		ctx:            taskCtx,
		cancel:         taskCancel,
		BuildID:        req.GetBuildID(),
		ClusterID:      req.GetClusterID(),
		node:           i,
		req:            req,
		cm:             cm,
		nodeID:         i.GetNodeID(),
		tr:             timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.BuildID, req.ClusterID)),
		serializedSize: 0,
	}
	ret := merr.Success()
	if err := i.sched.IndexBuildQueue.Enqueue(task); err != nil {
		......
	}
	......
}

task执行逻辑:

代码语言:go复制
pipelines := []func(context.Context) error{t.Prepare, t.BuildIndex, t.SaveIndexFiles}

依次执行task的Prepare()、BuildIndex()、SaveIndexFiles()方法。

9.进入BuildIndex()

代码语言:go复制
func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
	......
	var buildIndexInfo *indexcgowrapper.BuildIndexInfo
	buildIndexInfo, err = indexcgowrapper.NewBuildIndexInfo(it.req.GetStorageConfig())
	......
	err = buildIndexInfo.AppendFieldMetaInfo(it.collectionID, it.partitionID, it.segmentID, it.fieldID, it.fieldType)
	......

	err = buildIndexInfo.AppendIndexMetaInfo(it.req.IndexID, it.req.BuildID, it.req.IndexVersion)
	......

	err = buildIndexInfo.AppendBuildIndexParam(it.newIndexParams)
	......

	jsonIndexParams, err := json.Marshal(it.newIndexParams)
	......

	err = buildIndexInfo.AppendBuildTypeParam(it.newTypeParams)
	......

	for _, path := range it.req.GetDataPaths() {
		err = buildIndexInfo.AppendInsertFile(path)
		if err != nil {
			log.Ctx(ctx).Warn("append insert binlog path failed", zap.Error(err))
			return err
		}
	}

	it.currentIndexVersion = getCurrentIndexVersion(it.req.GetCurrentIndexVersion())
	if err := buildIndexInfo.AppendIndexEngineVersion(it.currentIndexVersion); err != nil {
		log.Ctx(ctx).Warn("append index engine version failed", zap.Error(err))
		return err
	}
	// cgo层调用CreateIndex()
    // it.index有个Upload()方法写入s3(SaveIndexFiles方法里调用it.index.Upload()写入s3)
	it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexInfo)
	if err != nil {
		......
	}

	......
}

indexcgowrapper.CreateIndex()调用C层创建索引。

10.进入SaveIndexFiles()

代码语言:go复制
func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error {
	......
	// c  层上传索引文件到s3
	indexFilePath2Size, err := it.index.UpLoad()
	......

}

总结:

  • CreateIndex由proxy传递给协调器dataCoord操作etcd。
  • 前缀/field-index/{collectionID}/{IndexID}
  • 前缀/segment-index/{collectionID}/{partitionID}/{segmentID}/{buildID}
  • dataCoord通知indexNode创建索引,indexNode通过cgo调用C 层创建索引,并上传至s3。

0 人点赞