Skip to content

Commit

Permalink
fix: compaction task not be cleaned correctly (milvus-io#34765)
Browse files Browse the repository at this point in the history
1.fix compaction task not be cleaned correctly
2.add a new parameter to control compaction gc loop interval
3.remove some useless configs of clustering compaction

bug: milvus-io#34764

Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Jul 30, 2024
1 parent 39d2593 commit ce3f836
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 37 deletions.
5 changes: 2 additions & 3 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -465,16 +465,15 @@ dataCoord:
rpcTimeout: 10
maxParallelTaskNum: 10
workerMaxParallelTaskNum: 2
dropTolerance: 86400 # Compaction task will be cleaned after finish longer than this time(in seconds)
gcInterval: 1800 # The time interval in seconds for compaction gc
clustering:
enable: true # Enable clustering compaction
autoEnable: false # Enable auto clustering compaction
triggerInterval: 600 # clustering compaction trigger interval in seconds
stateCheckInterval: 10
gcInterval: 600
minInterval: 3600 # The minimum interval between clustering compaction executions of one collection, to avoid redundant compaction
maxInterval: 259200 # If a collection haven't been clustering compacted for longer than maxInterval, force compact
newDataSizeThreshold: 512m # If new data size is large than newDataSizeThreshold, execute clustering compaction
dropTolerance: 86400 # If clustering compaction job is finished for a long time, gc it
preferSegmentSize: 512m
maxSegmentSize: 1024m
maxTrainSizeRatio: 0.8 # max data size ratio in Kmeans train, if larger than it, will down sampling to meet this limit
Expand Down
9 changes: 6 additions & 3 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,10 @@ func (c *compactionPlanHandler) loopCheck() {
}

func (c *compactionPlanHandler) loopClean() {
interval := Params.DataCoordCfg.CompactionGCIntervalInSeconds.GetAsDuration(time.Second)
log.Info("compactionPlanHandler start clean check loop", zap.Any("gc interval", interval))
defer c.stopWg.Done()
cleanTicker := time.NewTicker(30 * time.Minute)
cleanTicker := time.NewTicker(interval)
defer cleanTicker.Stop()
for {
select {
Expand All @@ -400,10 +402,11 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() {
for _, tasks := range triggers {
for _, task := range tasks {
if task.State == datapb.CompactionTaskState_completed || task.State == datapb.CompactionTaskState_cleaned {
duration := time.Since(time.Unix(task.StartTime, 0)).Seconds()
if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second)) {
duration := time.Since(time.UnixMilli(task.StartTime)).Seconds()
if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) {
// try best to delete meta
err := c.meta.DropCompactionTask(task)
log.Debug("drop compaction task meta", zap.Int64("planID", task.PlanID))
if err != nil {
log.Warn("fail to drop task", zap.Int64("planID", task.PlanID), zap.Error(err))
}
Expand Down
40 changes: 40 additions & 0 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
package datacoord

import (
"context"
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -759,6 +762,43 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
s.Equal(datapb.CompactionTaskState_executing, t.GetState())
}

func (s *CompactionPlanHandlerSuite) TestCompactionGC() {
s.SetupTest()
inTasks := []*datapb.CompactionTask{
{
PlanID: 1,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_completed,
StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(),
},
{
PlanID: 2,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_cleaned,
StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(),
},
{
PlanID: 3,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_cleaned,
StartTime: time.Now().UnixMilli(),
},
}

catalog := &datacoord.Catalog{MetaKv: NewMetaMemoryKV()}
compactionTaskMeta, err := newCompactionTaskMeta(context.TODO(), catalog)
s.NoError(err)
s.handler.meta = &meta{compactionTaskMeta: compactionTaskMeta}
for _, t := range inTasks {
s.handler.meta.SaveCompactionTask(t)
}

s.handler.cleanCompactionTaskMeta()
// two task should be cleaned, one remains
tasks := s.handler.meta.GetCompactionTaskMeta().GetCompactionTasks()
s.Equal(1, len(tasks))
}

func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
s.SetupTest()

Expand Down
41 changes: 12 additions & 29 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2976,6 +2976,7 @@ type dataCoordConfig struct {
SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"`
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"`
SingleCompactionRatioThreshold ParamItem `refreshable:"true"`
SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"`
Expand All @@ -2989,12 +2990,9 @@ type dataCoordConfig struct {
ClusteringCompactionEnable ParamItem `refreshable:"true"`
ClusteringCompactionAutoEnable ParamItem `refreshable:"true"`
ClusteringCompactionTriggerInterval ParamItem `refreshable:"true"`
ClusteringCompactionStateCheckInterval ParamItem `refreshable:"true"`
ClusteringCompactionGCInterval ParamItem `refreshable:"true"`
ClusteringCompactionMinInterval ParamItem `refreshable:"true"`
ClusteringCompactionMaxInterval ParamItem `refreshable:"true"`
ClusteringCompactionNewDataSizeThreshold ParamItem `refreshable:"true"`
ClusteringCompactionDropTolerance ParamItem `refreshable:"true"`
ClusteringCompactionPreferSegmentSize ParamItem `refreshable:"true"`
ClusteringCompactionMaxSegmentSize ParamItem `refreshable:"true"`
ClusteringCompactionMaxTrainSizeRatio ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -3311,11 +3309,21 @@ During compaction, the size of segment # of rows is able to exceed segment max #
p.CompactionDropToleranceInSeconds = ParamItem{
Key: "dataCoord.compaction.dropTolerance",
Version: "2.4.2",
Doc: "If compaction job is finished for a long time, gc it",
Doc: "Compaction task will be cleaned after finish longer than this time(in seconds)",
DefaultValue: "86400",
Export: true,
}
p.CompactionDropToleranceInSeconds.Init(base.mgr)

p.CompactionGCIntervalInSeconds = ParamItem{
Key: "dataCoord.compaction.gcInterval",
Version: "2.4.7",
Doc: "The time interval in seconds for compaction gc",
DefaultValue: "1800",
Export: true,
}
p.CompactionGCIntervalInSeconds.Init(base.mgr)

p.CompactionCheckIntervalInSeconds = ParamItem{
Key: "dataCoord.compaction.check.interval",
Version: "2.0.0",
Expand Down Expand Up @@ -3447,22 +3455,6 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.ClusteringCompactionTriggerInterval.Init(base.mgr)

p.ClusteringCompactionStateCheckInterval = ParamItem{
Key: "dataCoord.compaction.clustering.stateCheckInterval",
Version: "2.4.6",
DefaultValue: "10",
Export: true,
}
p.ClusteringCompactionStateCheckInterval.Init(base.mgr)

p.ClusteringCompactionGCInterval = ParamItem{
Key: "dataCoord.compaction.clustering.gcInterval",
Version: "2.4.6",
DefaultValue: "600",
Export: true,
}
p.ClusteringCompactionGCInterval.Init(base.mgr)

p.ClusteringCompactionMinInterval = ParamItem{
Key: "dataCoord.compaction.clustering.minInterval",
Version: "2.4.6",
Expand Down Expand Up @@ -3497,15 +3489,6 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr)

p.ClusteringCompactionDropTolerance = ParamItem{
Key: "dataCoord.compaction.clustering.dropTolerance",
Version: "2.4.6",
Doc: "If clustering compaction job is finished for a long time, gc it",
DefaultValue: "259200",
Export: true,
}
p.ClusteringCompactionDropTolerance.Init(base.mgr)

p.ClusteringCompactionPreferSegmentSize = ParamItem{
Key: "dataCoord.compaction.clustering.preferSegmentSize",
Version: "2.4.6",
Expand Down
7 changes: 5 additions & 2 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,11 @@ func TestComponentParam(t *testing.T) {
params.Save("datacoord.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))

params.Save("dataCoord.compaction.gcInterval", "100")
assert.Equal(t, float64(100), Params.CompactionGCIntervalInSeconds.GetAsDuration(time.Second).Seconds())
params.Save("dataCoord.compaction.dropTolerance", "100")
assert.Equal(t, float64(100), Params.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds())

params.Save("dataCoord.compaction.clustering.enable", "true")
assert.Equal(t, true, Params.ClusteringCompactionEnable.GetAsBool())
params.Save("dataCoord.compaction.clustering.newDataSizeThreshold", "10")
Expand All @@ -478,8 +483,6 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, int64(10*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize())
params.Save("dataCoord.compaction.clustering.newDataSizeThreshold", "10g")
assert.Equal(t, int64(10*1024*1024*1024), Params.ClusteringCompactionNewDataSizeThreshold.GetAsSize())
params.Save("dataCoord.compaction.clustering.dropTolerance", "86400")
assert.Equal(t, int64(86400), Params.ClusteringCompactionDropTolerance.GetAsInt64())
params.Save("dataCoord.compaction.clustering.maxSegmentSize", "100m")
assert.Equal(t, int64(100*1024*1024), Params.ClusteringCompactionMaxSegmentSize.GetAsSize())
params.Save("dataCoord.compaction.clustering.preferSegmentSize", "10m")
Expand Down

0 comments on commit ce3f836

Please sign in to comment.