milvus Upsert api写s3的流程

2024-03-11 14:29:12 浏览数 (2)

Upsert api写s3的流程

milvus版本:v2.3.2

实现:先insert再delete,并限制不能修改主键列。

整体架构:

Upsert 的数据流向

upsert写入s3的流程

upsert先insert,再delete。从proxy的execute()方法可以看出。

代码语言:go复制
func (it *upsertTask) Execute(ctx context.Context) (err error) {
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")
	defer sp.End()
	log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))

	tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))
	// 拿到stream,类型为msgstream.mqMsgStream
    stream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)
	if err != nil {
		return err
	}
    // 创建msgPack
	msgPack := &msgstream.MsgPack{
		BeginTs: it.BeginTs(),
		EndTs:   it.EndTs(),
	}
    // 添加insertMsgPack
	err = it.insertExecute(ctx, msgPack)
	if err != nil {
		log.Warn("Fail to insertExecute", zap.Error(err))
		return err
	}
    // 添加deleteMsgPack
	err = it.deleteExecute(ctx, msgPack)
	if err != nil {
		log.Warn("Fail to deleteExecute", zap.Error(err))
		return err
	}

	tr.RecordSpan()
    // 发送数据至mq
	err = stream.Produce(msgPack)
	if err != nil {
		it.result.Status = merr.Status(err)
		return err
	}
	sendMsgDur := tr.RecordSpan()
	metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))
	totalDur := tr.ElapseSpan()
	log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),
		zap.Duration("total duration", totalDur))
	return nil
}

将insertmsg和deletemsg加入msgPack,然后datanode进行消费。

insert和delete流程分别可以参考对应写入s3的流程。产生insertlog和deletelog。

代码语言:go复制
// Operate handles input messages, implementing flowgrpah.Node
func (ddn *ddNode) Operate(in []Msg) []Msg {
	......
    // 遍历msMsg
	for _, msg := range msMsg.TsMessages() {
		switch msg.Type() {
		case commonpb.MsgType_DropCollection:
			......

		case commonpb.MsgType_DropPartition:
			......
        // 处理insert消息
		case commonpb.MsgType_Insert:
			......
        // 处理delete消息
		case commonpb.MsgType_Delete:
			......
		}
	}

	......
}

s3文件

upsert结合了insert和delete操作。因此在s3对应的文件也即insert和delete对应的文件。

主要涉及delta_log和stats_log。

insert:

代码语言:shell复制
files/insert_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/{logidx}
files/stats_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/1(flushed)
files/stats_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/{logidx}(not flushed)

delete:

代码语言:shell复制
files/delta_log/{collID}/{partID}/{segmentID}/{logID}
files/stats_log/{collID}/{partID}/{segmentID}/{fieldID}/1(flushed)
files/stats_log/{collID}/{partID}/{segmentID}/{fieldID}/{logID}(not flushed)

0 人点赞