dataCoord的Compaction分析2

2023-12-08 16:21:28 浏览数 (2)

dataCoord的Compaction分析2

milvus版本:2.3.2

流程图:

compaction用来合并对象存储的小文件,将小的segment合并为大的segment。

Compaction 有一个配置项来控制是否启用自动压缩。此配置是全局的,会影响系统中的所有集合。

代码语言:txt复制
dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true

enableAutoCompaction生效的前提是enableCompaction为true。

增加了collection级别的控制。

compaction相关参数(全局):

代码语言:shell复制
dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true
dataCoord.compaction.indexBasedCompaction = true
dataCoord.compaction.global.interval = 60 #默认60秒,触发compaction信号
dataCoord.compaction.check.interval = 10 #默认10秒,更新状态
dataCoord.segment.smallProportion = 0.5 #默认0.5
dataCoord.compaction.max.segment = 30 #默认30
dataCoord.compaction.min.segment = 3 #默认3

在collection级别设置属性:

代码语言:shell复制
collection.autocompaction.enabled = true

python设置代码:

代码语言:python代码运行次数:0复制
hello_milvus.set_properties({"collection.autocompaction.enabled": True})

1.执行compaction的代码

代码语言:go复制
func (t *compactionTrigger) start() {
	t.quit = make(chan struct{})
	t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
	t.wg.Add(2)
	go func() {
		defer logutil.LogPanic()
		defer t.wg.Done()

		for {
			select {
			case <-t.quit:
				log.Info("compaction trigger quit")
				return
			case signal := <-t.signals:
				switch {
				case signal.isGlobal:
					// 处理全局compaction信号
					t.handleGlobalSignal(signal)
				default:
					// collection级别信号
					t.handleSignal(signal)
					// shouldn't reset, otherwise a frequent flushed collection will affect other collections
					// t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
				}
			}
		}
	}()
	// 触发全局compaction信号
	go t.startGlobalCompactionLoop()
}

t.handleGlobalSignal(signal) 用来处理全局信号。

t.handleSignal(signal) 用来处理collection级别的信号。

go t.startGlobalCompactionLoop() 定时触发全局信号。

collection信号在flush的时候触发。

2.触发全局

代码语言:go复制
// triggerCompaction trigger a compaction if any compaction condition satisfy.
func (t *compactionTrigger) triggerCompaction() error {
	id, err := t.allocSignalID()
	if err != nil {
		return err
	}
	signal := &compactionSignal{
		id:       id,
		isForce:  false,
		isGlobal: true,
	}
	t.signals <- signal
	return nil
}

3.触发collection级别

代码语言:go复制
// triggerSingleCompaction triger a compaction bundled with collection-partition-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string) error {
	// If AutoCompaction disabled, flush request will not trigger compaction
	if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
		return nil
	}

	id, err := t.allocSignalID()
	if err != nil {
		return err
	}
	signal := &compactionSignal{
		id:           id,
		isForce:      false,
		isGlobal:     false,
		collectionID: collectionID,
		partitionID:  partitionID,
		segmentID:    segmentID,
		channel:      channel,
	}
	t.signals <- signal
	return nil
}

调用堆栈:

代码语言:txt复制
SaveBinlogPaths()(internaldatacoordservices.go)
  |--s.compactionTrigger.triggerSingleCompaction()

当调用flush的时候触发。

代码语言:go复制
// SaveBinlogPaths updates segment related binlog path
// works for Checkpoints and Flush
func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
	......

	if req.GetFlushed() {
		s.segmentManager.DropSegment(ctx, req.SegmentID)
		s.flushCh <- req.SegmentID

		if !req.Importing && Params.DataCoordCfg.EnableCompaction.GetAsBool() {
			err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(),
				segmentID, segment.GetInsertChannel())
			if err != nil {
				log.Warn("failed to trigger single compaction")
			} else {
				log.Info("compaction triggered for segment")
			}
		}
	}
	return merr.Success(), nil
}

4.进入handleSignal()

代码语言:go复制
// handleSignal processes segment flush caused partition-chan level compaction signal
func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
	......

	if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) {
		log.RatedInfo(20, "collection auto compaction disabled",
			zap.Int64("collectionID", collectionID),
		)
		return
	}

	......

	plans := t.generatePlans(segments, signal.isForce, isDiskIndex, ct)
	for _, plan := range plans {
		......
	}
}

isCollectionAutoCompactionEnabled()判断是否设置collection级别。

代码语言:go复制
func (t *compactionTrigger) isCollectionAutoCompactionEnabled(coll *collectionInfo) bool {
	enabled, err := getCollectionAutoCompactionEnabled(coll.Properties)
	if err != nil {
		log.Warn("collection properties auto compaction not valid, returning false", zap.Error(err))
		return false
	}
	return enabled
}

进入getCollectionAutoCompactionEnabled():

代码语言:go复制
// getCollectionAutoCompactionEnabled returns whether auto compaction for collection is enabled.
// if not set, returns global auto compaction config.
func getCollectionAutoCompactionEnabled(properties map[string]string) (bool, error) {
	v, ok := properties[common.CollectionAutoCompactionKey]
	if ok {
		enabled, err := strconv.ParseBool(v)
		if err != nil {
			return false, err
		}
		return enabled, nil
	}
	return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool(), nil
}

common.CollectionAutoCompactionKey=collection.autocompaction.enabled

0 人点赞