From 50da48a30d07a25a0680d4d9a5e5b0b54f811428 Mon Sep 17 00:00:00 2001 From: Ted Xu Date: Fri, 18 Oct 2024 11:37:24 +0800 Subject: [PATCH] enhance: adding mix compaction first prioritizer (#36956) Signed-off-by: Ted Xu --- configs/milvus.yaml | 6 ++++- internal/datacoord/compaction_queue.go | 19 ++++++++++++--- internal/datacoord/compaction_queue_test.go | 26 +++++++++++++++++++-- pkg/util/paramtable/component_param.go | 7 ++++-- 4 files changed, 50 insertions(+), 8 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 7990e46bc331d..c24c0201871a5 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -554,7 +554,11 @@ dataCoord: # This configuration takes effect only when dataCoord.enableCompaction is set as true. enableAutoCompaction: true indexBasedCompaction: true - taskPrioritizer: default # compaction task prioritizer, options: [default, level]. Default is FIFO, level is prioritized by level: L0 compactions first, then mix compactions, then major compactions. + # compaction task prioritizer, options: [default, level, mix]. + # default is FIFO. + # level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions. + # mix is prioritized by level: mix compactions first, then L0 compactions, then clustering compactions. + taskPrioritizer: default rpcTimeout: 10 maxParallelTaskNum: 10 workerMaxParallelTaskNum: 2 diff --git a/internal/datacoord/compaction_queue.go b/internal/datacoord/compaction_queue.go index c9b71017c4b66..c633c1665b64c 100644 --- a/internal/datacoord/compaction_queue.go +++ b/internal/datacoord/compaction_queue.go @@ -168,11 +168,22 @@ var ( return 10 case datapb.CompactionType_ClusteringCompaction: return 100 - case datapb.CompactionType_MinorCompaction: - case datapb.CompactionType_MajorCompaction: + default: + return 1000 + } + } + + MixFirstPrioritizer Prioritizer = func(task CompactionTask) int { + switch task.GetType() { + case datapb.CompactionType_Level0DeleteCompaction: + return 10 + case datapb.CompactionType_MixCompaction: + return 1 + case datapb.CompactionType_ClusteringCompaction: + return 100 + default: return 1000 } - return 0xffff } ) @@ -181,6 +192,8 @@ func getPrioritizer() Prioritizer { switch p { case "level": return LevelPrioritizer + case "mix": + return MixFirstPrioritizer default: return DefaultPrioritizer } diff --git a/internal/datacoord/compaction_queue_test.go b/internal/datacoord/compaction_queue_test.go index 794520d047673..ed05e9c875d20 100644 --- a/internal/datacoord/compaction_queue_test.go +++ b/internal/datacoord/compaction_queue_test.go @@ -43,7 +43,7 @@ func TestCompactionQueue(t *testing.T) { t3 := &clusteringCompactionTask{ CompactionTask: &datapb.CompactionTask{ PlanID: 2, - Type: datapb.CompactionType_MajorCompaction, + Type: datapb.CompactionType_ClusteringCompaction, }, } @@ -88,7 +88,29 @@ func TestCompactionQueue(t *testing.T) { assert.Equal(t, datapb.CompactionType_MixCompaction, task.GetType()) task, err = cq.Dequeue() assert.NoError(t, err) - assert.Equal(t, datapb.CompactionType_MajorCompaction, task.GetType()) + assert.Equal(t, datapb.CompactionType_ClusteringCompaction, task.GetType()) + }) + + t.Run("mix first prioritizer", func(t *testing.T) { + cq := NewCompactionQueue(3, MixFirstPrioritizer) + err := cq.Enqueue(t1) + assert.NoError(t, err) + err = cq.Enqueue(t2) + assert.NoError(t, err) + err = cq.Enqueue(t3) + assert.NoError(t, err) + err = cq.Enqueue(&mixCompactionTask{}) + assert.Error(t, err) + + task, err := cq.Dequeue() + assert.NoError(t, err) + assert.Equal(t, datapb.CompactionType_MixCompaction, task.GetType()) + task, err = cq.Dequeue() + assert.NoError(t, err) + assert.Equal(t, datapb.CompactionType_Level0DeleteCompaction, task.GetType()) + task, err = cq.Dequeue() + assert.NoError(t, err) + assert.Equal(t, datapb.CompactionType_ClusteringCompaction, task.GetType()) }) t.Run("update prioritizer", func(t *testing.T) { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 337b0fef365e7..8fdeb738244b8 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3465,8 +3465,11 @@ This configuration takes effect only when dataCoord.enableCompaction is set as t Key: "dataCoord.compaction.taskPrioritizer", Version: "2.5.0", DefaultValue: "default", - Doc: "compaction task prioritizer, options: [default, level]. Default is FIFO, level is prioritized by level: L0 compactions first, then mix compactions, then major compactions.", - Export: true, + Doc: `compaction task prioritizer, options: [default, level, mix]. +default is FIFO. +level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions. +mix is prioritized by level: mix compactions first, then L0 compactions, then clustering compactions.`, + Export: true, } p.CompactionTaskPrioritizer.Init(base.mgr)