diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 69a0357e6c6f7..3f82030bb9863 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -589,14 +589,7 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com sessions: c.sessions, } case datapb.CompactionType_ClusteringCompaction: - task = &clusteringCompactionTask{ - CompactionTask: t, - allocator: c.allocator, - meta: c.meta, - sessions: c.sessions, - handler: c.handler, - analyzeScheduler: c.analyzeScheduler, - } + task = newClusteringCompactionTask(t, c.allocator, c.meta, c.sessions, c.handler, c.analyzeScheduler) default: return nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type") } diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 19d7890528a80..3470dff6f3ac4 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -40,10 +40,6 @@ import ( var _ CompactionTask = (*clusteringCompactionTask)(nil) -const ( - taskMaxRetryTimes = int32(3) -) - type clusteringCompactionTask struct { *datapb.CompactionTask plan *datapb.CompactionPlan @@ -55,6 +51,20 @@ type clusteringCompactionTask struct { sessions SessionManager handler Handler analyzeScheduler *taskScheduler + + maxRetryTimes int32 +} + +func newClusteringCompactionTask(t *datapb.CompactionTask, allocator allocator, meta CompactionMeta, session SessionManager, handler Handler, analyzeScheduler *taskScheduler) *clusteringCompactionTask { + return &clusteringCompactionTask{ + CompactionTask: t, + allocator: allocator, + meta: meta, + sessions: session, + handler: handler, + analyzeScheduler: analyzeScheduler, + maxRetryTimes: 3, + } } func (t *clusteringCompactionTask) Process() bool { @@ -63,12 +73,15 @@ func (t *clusteringCompactionTask) Process() bool { err := t.retryableProcess() if err != nil { log.Warn("fail in process task", zap.Error(err)) - if merr.IsRetryableErr(err) && t.RetryTimes < taskMaxRetryTimes { + if merr.IsRetryableErr(err) && t.RetryTimes < t.maxRetryTimes { // retry in next Process - t.updateAndSaveTaskMeta(setRetryTimes(t.RetryTimes + 1)) + err = t.updateAndSaveTaskMeta(setRetryTimes(t.RetryTimes + 1)) } else { log.Error("task fail with unretryable reason or meet max retry times", zap.Error(err)) - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error())) + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error())) + } + if err != nil { + log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err)) } } // task state update, refresh retry times count @@ -80,16 +93,20 @@ func (t *clusteringCompactionTask) Process() bool { metrics.DataCoordCompactionLatency. WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), fmt.Sprint(t.CollectionID), t.Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState). Observe(float64(lastStateDuration)) - t.updateAndSaveTaskMeta(setRetryTimes(0), setLastStateStartTime(ts)) + updateOps := []compactionTaskOpt{setRetryTimes(0), setLastStateStartTime(ts)} if t.State == datapb.CompactionTaskState_completed { - t.updateAndSaveTaskMeta(setEndTime(ts)) + updateOps = append(updateOps, setEndTime(ts)) elapse := ts - t.StartTime log.Info("clustering compaction task total elapse", zap.Int64("elapse", elapse)) metrics.DataCoordCompactionLatency. WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), fmt.Sprint(t.CollectionID), t.Channel, datapb.CompactionType_ClusteringCompaction.String(), "total"). Observe(float64(elapse)) } + err = t.updateAndSaveTaskMeta(updateOps...) + if err != nil { + log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err)) + } } log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState)) return t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned @@ -183,8 +200,6 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP func (t *clusteringCompactionTask) processPipelining() error { log := log.With(zap.Int64("triggerID", t.TriggerID), zap.Int64("collectionID", t.GetCollectionID()), zap.Int64("planID", t.GetPlanID())) - ts := time.Now().UnixMilli() - t.updateAndSaveTaskMeta(setStartTime(ts)) var operators []UpdateOperator for _, segID := range t.InputSegments { operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L2)) @@ -218,8 +233,7 @@ func (t *clusteringCompactionTask) processExecuting() error { if errors.Is(err, merr.ErrNodeNotFound) { log.Warn("GetCompactionPlanResult fail", zap.Error(err)) // setNodeID(NullNodeID) to trigger reassign node ID - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) - return nil + return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) } return err } @@ -230,7 +244,6 @@ func (t *clusteringCompactionTask) processExecuting() error { result := t.result if len(result.GetSegments()) == 0 { log.Warn("illegal compaction results, this should not happen") - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) return merr.WrapErrCompactionResult("compaction result is empty") } @@ -260,8 +273,10 @@ func (t *clusteringCompactionTask) processExecuting() error { return nil case datapb.CompactionTaskState_failed: return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) + default: + log.Error("not support compaction task state", zap.String("state", result.GetState().String())) + return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) } - return nil } func (t *clusteringCompactionTask) processMetaSaved() error { @@ -382,8 +397,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error { log.Warn("gcPartitionStatsInfo fail", zap.Error(err)) } - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) - return nil + return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) } func (t *clusteringCompactionTask) doAnalyze() error { @@ -409,9 +423,8 @@ func (t *clusteringCompactionTask) doAnalyze() error { State: indexpb.JobState_JobStateInit, }, }) - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing)) log.Info("submit analyze task", zap.Int64("planID", t.GetPlanID()), zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("collectionID", t.GetCollectionID()), zap.Int64("id", t.GetAnalyzeTaskID())) - return nil + return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing)) } func (t *clusteringCompactionTask) doCompact() error { @@ -445,21 +458,18 @@ func (t *clusteringCompactionTask) doCompact() error { t.plan, err = t.BuildCompactionRequest() if err != nil { log.Warn("Failed to BuildCompactionRequest", zap.Error(err)) - return merr.WrapErrBuildCompactionRequestFail(err) // retryable + return err } err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan()) if err != nil { if errors.Is(err, merr.ErrDataNodeSlotExhausted) { log.Warn("fail to notify compaction tasks to DataNode because the node slots exhausted") - t.updateAndSaveTaskMeta(setNodeID(NullNodeID)) - return nil + return t.updateAndSaveTaskMeta(setNodeID(NullNodeID)) } log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) - return err + return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) } - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) - return nil + return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) } func (t *clusteringCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask { diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index 39e375e5bc764..8923b551d906a 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -19,8 +19,12 @@ package datacoord import ( "context" "fmt" + "testing" + "time" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "go.uber.org/atomic" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -28,22 +32,66 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/merr" ) -func (s *CompactionTaskSuite) TestClusteringCompactionSegmentMetaChange() { - channel := "Ch-1" +func TestClusteringCompactionTaskSuite(t *testing.T) { + suite.Run(t, new(ClusteringCompactionTaskSuite)) +} + +type ClusteringCompactionTaskSuite struct { + suite.Suite + + mockID atomic.Int64 + mockAlloc *NMockAllocator + meta *meta + mockSessMgr *MockSessionManager + handler *NMockHandler + session *MockSessionManager +} + +func (s *ClusteringCompactionTaskSuite) SetupTest() { cm := storage.NewLocalChunkManager(storage.RootPath("")) catalog := datacoord.NewCatalog(NewMetaMemoryKV(), "", "") meta, err := newMeta(context.TODO(), catalog, cm) s.NoError(err) - meta.AddSegment(context.TODO(), &SegmentInfo{ + s.meta = meta + + s.mockSessMgr = NewMockSessionManager(s.T()) + + s.mockID.Store(time.Now().UnixMilli()) + s.mockAlloc = NewNMockAllocator(s.T()) + s.mockAlloc.EXPECT().allocN(mock.Anything).RunAndReturn(func(x int64) (int64, int64, error) { + start := s.mockID.Load() + end := s.mockID.Add(int64(x)) + return start, end, nil + }).Maybe() + s.mockAlloc.EXPECT().allocID(mock.Anything).RunAndReturn(func(ctx context.Context) (int64, error) { + end := s.mockID.Add(1) + return end, nil + }).Maybe() + + s.handler = NewNMockHandler(s.T()) + s.handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Maybe() + + s.session = NewMockSessionManager(s.T()) +} + +func (s *ClusteringCompactionTaskSuite) SetupSubTest() { + s.SetupTest() +} + +func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChange() { + channel := "Ch-1" + + s.meta.AddSegment(context.TODO(), &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ ID: 101, State: commonpb.SegmentState_Flushed, Level: datapb.SegmentLevel_L1, }, }) - meta.AddSegment(context.TODO(), &SegmentInfo{ + s.meta.AddSegment(context.TODO(), &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ ID: 102, State: commonpb.SegmentState_Flushed, @@ -52,8 +100,6 @@ func (s *CompactionTaskSuite) TestClusteringCompactionSegmentMetaChange() { }, }) session := NewSessionManagerImpl() - alloc := NewNMockAllocator(s.T()) - alloc.EXPECT().allocN(mock.Anything).Return(100, 200, nil) schema := ConstructScalarClusteringSchema("TestClusteringCompactionTask", 32, true) pk := &schemapb.FieldSchema{ @@ -83,22 +129,22 @@ func (s *CompactionTaskSuite) TestClusteringCompactionSegmentMetaChange() { InputSegments: []int64{101, 102}, ResultSegments: []int64{1000, 1100}, }, - meta: meta, + meta: s.meta, sessions: session, - allocator: alloc, + allocator: s.mockAlloc, } task.processPipelining() - seg11 := meta.GetSegment(101) + seg11 := s.meta.GetSegment(101) s.Equal(datapb.SegmentLevel_L2, seg11.Level) - seg21 := meta.GetSegment(102) + seg21 := s.meta.GetSegment(102) s.Equal(datapb.SegmentLevel_L2, seg21.Level) s.Equal(int64(10000), seg21.PartitionStatsVersion) task.ResultSegments = []int64{103, 104} // fake some compaction result segment - meta.AddSegment(context.TODO(), &SegmentInfo{ + s.meta.AddSegment(context.TODO(), &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ ID: 103, State: commonpb.SegmentState_Flushed, @@ -107,7 +153,7 @@ func (s *CompactionTaskSuite) TestClusteringCompactionSegmentMetaChange() { PartitionStatsVersion: 10001, }, }) - meta.AddSegment(context.TODO(), &SegmentInfo{ + s.meta.AddSegment(context.TODO(), &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ ID: 104, State: commonpb.SegmentState_Flushed, @@ -119,20 +165,198 @@ func (s *CompactionTaskSuite) TestClusteringCompactionSegmentMetaChange() { task.processFailedOrTimeout() - seg12 := meta.GetSegment(101) + seg12 := s.meta.GetSegment(101) s.Equal(datapb.SegmentLevel_L1, seg12.Level) - seg22 := meta.GetSegment(102) + seg22 := s.meta.GetSegment(102) s.Equal(datapb.SegmentLevel_L2, seg22.Level) s.Equal(int64(10000), seg22.PartitionStatsVersion) - seg32 := meta.GetSegment(103) + seg32 := s.meta.GetSegment(103) s.Equal(datapb.SegmentLevel_L1, seg32.Level) s.Equal(int64(0), seg32.PartitionStatsVersion) - seg42 := meta.GetSegment(104) + seg42 := s.meta.GetSegment(104) s.Equal(datapb.SegmentLevel_L1, seg42.Level) s.Equal(int64(0), seg42.PartitionStatsVersion) } +func (s *ClusteringCompactionTaskSuite) generateBasicTask() *clusteringCompactionTask { + schema := ConstructScalarClusteringSchema("TestClusteringCompactionTask", 32, true) + pk := &schemapb.FieldSchema{ + FieldID: 100, + Name: Int64Field, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + AutoID: true, + IsClusteringKey: true, + } + + task := &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + PlanID: 1, + TriggerID: 19530, + CollectionID: 1, + PartitionID: 10, + Type: datapb.CompactionType_ClusteringCompaction, + NodeID: 1, + State: datapb.CompactionTaskState_pipelining, + Schema: schema, + ClusteringKeyField: pk, + InputSegments: []int64{101, 102}, + ResultSegments: []int64{1000, 1100}, + }, + meta: s.meta, + handler: s.handler, + sessions: s.session, + allocator: s.mockAlloc, + } + return task +} + +func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() { + task := s.generateBasicTask() + task.maxRetryTimes = 3 + // process pipelining fail + s.Equal(false, task.Process()) + s.Equal(int32(1), task.RetryTimes) + s.Equal(false, task.Process()) + s.Equal(int32(2), task.RetryTimes) + s.Equal(false, task.Process()) + s.Equal(int32(3), task.RetryTimes) + s.Equal(datapb.CompactionTaskState_pipelining, task.GetState()) + s.Equal(false, task.Process()) + s.Equal(int32(0), task.RetryTimes) + s.Equal(datapb.CompactionTaskState_failed, task.GetState()) +} + +func (s *ClusteringCompactionTaskSuite) TestProcessStateChange() { + task := s.generateBasicTask() + + // process pipelining fail + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_failed, task.GetState()) + + // process pipelining succeed + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + PartitionStatsVersion: 10000, + }, + }) + + s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(nil) + task.State = datapb.CompactionTaskState_pipelining + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_executing, task.GetState()) + + // process executing + s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, merr.WrapErrNodeNotFound(1)).Once() + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_pipelining, task.GetState()) + + // repipelining + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_pipelining, task.GetState()) + task.NodeID = 1 + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_executing, task.GetState()) + + // process executing + s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(nil, nil).Once() + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_executing, task.GetState()) + s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ + State: datapb.CompactionTaskState_executing, + }, nil).Once() + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_executing, task.GetState()) + s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ + State: datapb.CompactionTaskState_completed, + Segments: []*datapb.CompactionSegment{ + { + SegmentID: 1000, + }, + { + SegmentID: 1001, + }, + }, + }, nil).Once() + s.Equal(false, task.Process()) + s.Equal(datapb.CompactionTaskState_indexing, task.GetState()) +} + +func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() { + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 101, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, + }) + s.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 102, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L2, + PartitionStatsVersion: 10000, + }, + }) + task := s.generateBasicTask() + s.session.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).Return(merr.WrapErrDataNodeSlotExhausted()) + task.State = datapb.CompactionTaskState_pipelining + s.NoError(task.doCompact()) + s.Equal(int64(NullNodeID), task.GetNodeID()) +} + +func (s *ClusteringCompactionTaskSuite) TestProcessExecutingState() { + task := s.generateBasicTask() + s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ + State: datapb.CompactionTaskState_failed, + }, nil).Once() + s.NoError(task.processExecuting()) + s.Equal(datapb.CompactionTaskState_failed, task.GetState()) + + s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ + State: datapb.CompactionTaskState_indexing, + }, nil).Once() + s.NoError(task.processExecuting()) + s.Equal(datapb.CompactionTaskState_failed, task.GetState()) + + s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ + State: datapb.CompactionTaskState_pipelining, + }, nil).Once() + s.NoError(task.processExecuting()) + s.Equal(datapb.CompactionTaskState_failed, task.GetState()) + + s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ + State: datapb.CompactionTaskState_completed, + }, nil).Once() + s.Error(task.processExecuting()) + s.Equal(datapb.CompactionTaskState_failed, task.GetState()) + + s.session.EXPECT().GetCompactionPlanResult(mock.Anything, mock.Anything).Return(&datapb.CompactionPlanResult{ + State: datapb.CompactionTaskState_completed, + Segments: []*datapb.CompactionSegment{ + { + SegmentID: 1000, + }, + { + SegmentID: 1001, + }, + }, + }, nil).Once() + s.Error(task.processExecuting()) + s.Equal(datapb.CompactionTaskState_failed, task.GetState()) +} + const ( Int64Field = "int64Field" FloatVecField = "floatVecField"