dataCoord的Compaction分析2
milvus版本:2.3.2
流程图:
compaction用来合并对象存储的小文件,将小的segment合并为大的segment。
Compaction 有一个配置项来控制是否启用自动压缩。此配置是全局的,会影响系统中的所有集合。
代码语言:txt复制dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true
enableAutoCompaction生效的前提是enableCompaction为true。
增加了collection级别的控制。
compaction相关参数(全局):
代码语言:shell复制dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true
dataCoord.compaction.indexBasedCompaction = true
dataCoord.compaction.global.interval = 60 #默认60秒,触发compaction信号
dataCoord.compaction.check.interval = 10 #默认10秒,更新状态
dataCoord.segment.smallProportion = 0.5 #默认0.5
dataCoord.compaction.max.segment = 30 #默认30
dataCoord.compaction.min.segment = 3 #默认3
在collection级别设置属性:
代码语言:shell复制collection.autocompaction.enabled = true
python设置代码:
代码语言:python代码运行次数:0复制hello_milvus.set_properties({"collection.autocompaction.enabled": True})
1.执行compaction的代码
代码语言:go复制func (t *compactionTrigger) start() {
t.quit = make(chan struct{})
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
t.wg.Add(2)
go func() {
defer logutil.LogPanic()
defer t.wg.Done()
for {
select {
case <-t.quit:
log.Info("compaction trigger quit")
return
case signal := <-t.signals:
switch {
case signal.isGlobal:
// 处理全局compaction信号
t.handleGlobalSignal(signal)
default:
// collection级别信号
t.handleSignal(signal)
// shouldn't reset, otherwise a frequent flushed collection will affect other collections
// t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
}
}
}
}()
// 触发全局compaction信号
go t.startGlobalCompactionLoop()
}
t.handleGlobalSignal(signal) 用来处理全局信号。
t.handleSignal(signal) 用来处理collection级别的信号。
go t.startGlobalCompactionLoop() 定时触发全局信号。
collection信号在flush的时候触发。
2.触发全局
代码语言:go复制// triggerCompaction trigger a compaction if any compaction condition satisfy.
func (t *compactionTrigger) triggerCompaction() error {
id, err := t.allocSignalID()
if err != nil {
return err
}
signal := &compactionSignal{
id: id,
isForce: false,
isGlobal: true,
}
t.signals <- signal
return nil
}
3.触发collection级别
代码语言:go复制// triggerSingleCompaction triger a compaction bundled with collection-partition-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string) error {
// If AutoCompaction disabled, flush request will not trigger compaction
if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
return nil
}
id, err := t.allocSignalID()
if err != nil {
return err
}
signal := &compactionSignal{
id: id,
isForce: false,
isGlobal: false,
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
channel: channel,
}
t.signals <- signal
return nil
}
调用堆栈:
代码语言:txt复制SaveBinlogPaths()(internaldatacoordservices.go)
|--s.compactionTrigger.triggerSingleCompaction()
当调用flush的时候触发。
代码语言:go复制// SaveBinlogPaths updates segment related binlog path
// works for Checkpoints and Flush
func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
......
if req.GetFlushed() {
s.segmentManager.DropSegment(ctx, req.SegmentID)
s.flushCh <- req.SegmentID
if !req.Importing && Params.DataCoordCfg.EnableCompaction.GetAsBool() {
err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(),
segmentID, segment.GetInsertChannel())
if err != nil {
log.Warn("failed to trigger single compaction")
} else {
log.Info("compaction triggered for segment")
}
}
}
return merr.Success(), nil
}
4.进入handleSignal()
代码语言:go复制// handleSignal processes segment flush caused partition-chan level compaction signal
func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
......
if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) {
log.RatedInfo(20, "collection auto compaction disabled",
zap.Int64("collectionID", collectionID),
)
return
}
......
plans := t.generatePlans(segments, signal.isForce, isDiskIndex, ct)
for _, plan := range plans {
......
}
}
isCollectionAutoCompactionEnabled()判断是否设置collection级别。
代码语言:go复制func (t *compactionTrigger) isCollectionAutoCompactionEnabled(coll *collectionInfo) bool {
enabled, err := getCollectionAutoCompactionEnabled(coll.Properties)
if err != nil {
log.Warn("collection properties auto compaction not valid, returning false", zap.Error(err))
return false
}
return enabled
}
进入getCollectionAutoCompactionEnabled():
代码语言:go复制// getCollectionAutoCompactionEnabled returns whether auto compaction for collection is enabled.
// if not set, returns global auto compaction config.
func getCollectionAutoCompactionEnabled(properties map[string]string) (bool, error) {
v, ok := properties[common.CollectionAutoCompactionKey]
if ok {
enabled, err := strconv.ParseBool(v)
if err != nil {
return false, err
}
return enabled, nil
}
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool(), nil
}
common.CollectionAutoCompactionKey=collection.autocompaction.enabled