Upsert api写s3的流程
milvus版本:v2.3.2
实现:先insert再delete,并限制不能修改主键列。
整体架构:
Upsert 的数据流向
upsert写入s3的流程
upsert先insert,再delete。从proxy的execute()方法可以看出。
代码语言:go复制func (it *upsertTask) Execute(ctx context.Context) (err error) {
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")
defer sp.End()
log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))
tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))
// 拿到stream,类型为msgstream.mqMsgStream
stream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)
if err != nil {
return err
}
// 创建msgPack
msgPack := &msgstream.MsgPack{
BeginTs: it.BeginTs(),
EndTs: it.EndTs(),
}
// 添加insertMsgPack
err = it.insertExecute(ctx, msgPack)
if err != nil {
log.Warn("Fail to insertExecute", zap.Error(err))
return err
}
// 添加deleteMsgPack
err = it.deleteExecute(ctx, msgPack)
if err != nil {
log.Warn("Fail to deleteExecute", zap.Error(err))
return err
}
tr.RecordSpan()
// 发送数据至mq
err = stream.Produce(msgPack)
if err != nil {
it.result.Status = merr.Status(err)
return err
}
sendMsgDur := tr.RecordSpan()
metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))
totalDur := tr.ElapseSpan()
log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),
zap.Duration("total duration", totalDur))
return nil
}
将insertmsg和deletemsg加入msgPack,然后datanode进行消费。
insert和delete流程分别可以参考对应写入s3的流程。产生insertlog和deletelog。
代码语言:go复制// Operate handles input messages, implementing flowgrpah.Node
func (ddn *ddNode) Operate(in []Msg) []Msg {
......
// 遍历msMsg
for _, msg := range msMsg.TsMessages() {
switch msg.Type() {
case commonpb.MsgType_DropCollection:
......
case commonpb.MsgType_DropPartition:
......
// 处理insert消息
case commonpb.MsgType_Insert:
......
// 处理delete消息
case commonpb.MsgType_Delete:
......
}
}
......
}
s3文件
upsert结合了insert和delete操作。因此在s3对应的文件也即insert和delete对应的文件。
主要涉及delta_log和stats_log。
insert:
代码语言:shell复制files/insert_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/{logidx}
files/stats_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/1(flushed)
files/stats_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/{logidx}(not flushed)
delete:
代码语言:shell复制files/delta_log/{collID}/{partID}/{segmentID}/{logID}
files/stats_log/{collID}/{partID}/{segmentID}/{fieldID}/1(flushed)
files/stats_log/{collID}/{partID}/{segmentID}/{fieldID}/{logID}(not flushed)