From 3e17d21d9292bff08cf45f22ee08cacb623cde73 Mon Sep 17 00:00:00 2001 From: yangxuan Date: Thu, 18 Jul 2024 10:08:39 +0800 Subject: [PATCH] enhance: [cp24]Add ut for l0CompactionTask processExecuting See also: #34796 pr: #34800 Signed-off-by: yangxuan --- internal/datacoord/compaction_task_l0.go | 14 +- internal/datacoord/compaction_task_l0_test.go | 155 +++++++++++++++--- 2 files changed, 145 insertions(+), 24 deletions(-) diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index db379c231f78c..575f2c4879935 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -106,10 +106,11 @@ func (t *l0CompactionTask) processExecuting() bool { } switch result.GetState() { case datapb.CompactionTaskState_executing: + // will L0Compaction be timeouted? if t.checkTimeout() { err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) if err != nil { - log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) + log.Warn("l0CompactionTask failed to set task timeout state", zap.Error(err)) return false } return t.processTimeout() @@ -122,12 +123,13 @@ func (t *l0CompactionTask) processExecuting() bool { } if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved)); err != nil { + log.Warn("l0CompactionTask failed to save task meta_saved state", zap.Error(err)) return false } return t.processMetaSaved() case datapb.CompactionTaskState_failed: if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)); err != nil { - log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) + log.Warn("l0CompactionTask failed to set task failed state", zap.Error(err)) return false } return t.processFailed() @@ -349,11 +351,15 @@ func (t *l0CompactionTask) processFailed() bool { func (t *l0CompactionTask) checkTimeout() bool { if t.GetTimeoutInSeconds() > 0 { - diff := time.Since(time.Unix(t.GetStartTime(), 0)).Seconds() + start := time.Unix(t.GetStartTime(), 0) + diff := time.Since(start).Seconds() if diff > float64(t.GetTimeoutInSeconds()) { log.Warn("compaction timeout", + zap.Int64("taskID", t.GetTriggerID()), + zap.Int64("planID", t.GetPlanID()), + zap.Int64("nodeID", t.GetNodeID()), zap.Int32("timeout in seconds", t.GetTimeoutInSeconds()), - zap.Int64("startTime", t.GetStartTime()), + zap.Time("startTime", start), ) return true } diff --git a/internal/datacoord/compaction_task_l0_test.go b/internal/datacoord/compaction_task_l0_test.go index f337017efd99e..92f8a07548692 100644 --- a/internal/datacoord/compaction_task_l0_test.go +++ b/internal/datacoord/compaction_task_l0_test.go @@ -2,17 +2,40 @@ package datacoord import ( "context" + "testing" + "time" "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/merr" ) -func (s *CompactionTaskSuite) TestProcessRefreshPlan_NormalL0() { +func TestL0CompactionTaskSuite(t *testing.T) { + suite.Run(t, new(L0CompactionTaskSuite)) +} + +type L0CompactionTaskSuite struct { + suite.Suite + + mockMeta *MockCompactionMeta + mockSessMgr *MockSessionManager +} + +func (s *L0CompactionTaskSuite) SetupTest() { + s.mockMeta = NewMockCompactionMeta(s.T()) + s.mockSessMgr = NewMockSessionManager(s.T()) +} + +func (s *L0CompactionTaskSuite) SetupSubTest() { + s.SetupTest() +} + +func (s *L0CompactionTaskSuite) TestProcessRefreshPlan_NormalL0() { channel := "Ch-1" deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} @@ -69,7 +92,7 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_NormalL0() { s.ElementsMatch([]int64{200, 201, 202, 100, 101}, segIDs) } -func (s *CompactionTaskSuite) TestProcessRefreshPlan_SegmentNotFoundL0() { +func (s *L0CompactionTaskSuite) TestProcessRefreshPlan_SegmentNotFoundL0() { channel := "Ch-1" s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { return nil @@ -94,7 +117,7 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_SegmentNotFoundL0() { s.ErrorIs(err, merr.ErrSegmentNotFound) } -func (s *CompactionTaskSuite) TestProcessRefreshPlan_SelectZeroSegmentsL0() { +func (s *L0CompactionTaskSuite) TestProcessRefreshPlan_SelectZeroSegmentsL0() { channel := "Ch-1" deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(func(segID int64) *SegmentInfo { @@ -125,7 +148,7 @@ func (s *CompactionTaskSuite) TestProcessRefreshPlan_SelectZeroSegmentsL0() { s.Error(err) } -func generateTestL0Task(state datapb.CompactionTaskState) *l0CompactionTask { +func (s *L0CompactionTaskSuite) generateTestL0Task(state datapb.CompactionTaskState) *l0CompactionTask { return &l0CompactionTask{ CompactionTask: &datapb.CompactionTask{ PlanID: 1, @@ -137,16 +160,14 @@ func generateTestL0Task(state datapb.CompactionTaskState) *l0CompactionTask { State: state, InputSegments: []int64{100, 101}, }, + meta: s.mockMeta, + sessions: s.mockSessMgr, } } -func (s *CompactionTaskSuite) SetupSubTest() { - s.SetupTest() -} - -func (s *CompactionTaskSuite) TestProcessStateTrans() { +func (s *L0CompactionTaskSuite) TestStateTrans() { s.Run("test pipelining needReassignNodeID", func() { - t := generateTestL0Task(datapb.CompactionTaskState_pipelining) + t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) t.NodeID = NullNodeID got := t.Process() s.False(got) @@ -155,12 +176,11 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() { }) s.Run("test pipelining BuildCompactionRequest failed", func() { - t := generateTestL0Task(datapb.CompactionTaskState_pipelining) + t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) t.NodeID = 100 channel := "ch-1" deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} - t.meta = s.mockMeta s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return( []*SegmentInfo{ {SegmentInfo: &datapb.SegmentInfo{ @@ -183,7 +203,6 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() { s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once() s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return() - t.sessions = s.mockSessMgr s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once() got := t.Process() @@ -192,12 +211,11 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() { }) s.Run("test pipelining Compaction failed", func() { - t := generateTestL0Task(datapb.CompactionTaskState_pipelining) + t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) t.NodeID = 100 channel := "ch-1" deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} - t.meta = s.mockMeta s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return( []*SegmentInfo{ {SegmentInfo: &datapb.SegmentInfo{ @@ -219,7 +237,6 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() { }).Twice() s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) - t.sessions = s.mockSessMgr s.mockSessMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, nodeID int64, plan *datapb.CompactionPlan) error { s.Require().EqualValues(t.NodeID, nodeID) return errors.New("mock error") @@ -232,12 +249,11 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() { }) s.Run("test pipelining success", func() { - t := generateTestL0Task(datapb.CompactionTaskState_pipelining) + t := s.generateTestL0Task(datapb.CompactionTaskState_pipelining) t.NodeID = 100 channel := "ch-1" deltaLogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} - t.meta = s.mockMeta s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return( []*SegmentInfo{ {SegmentInfo: &datapb.SegmentInfo{ @@ -259,7 +275,6 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() { }).Twice() s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once() - t.sessions = s.mockSessMgr s.mockSessMgr.EXPECT().Compaction(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, nodeID int64, plan *datapb.CompactionPlan) error { s.Require().EqualValues(t.NodeID, nodeID) return nil @@ -267,6 +282,106 @@ func (s *CompactionTaskSuite) TestProcessStateTrans() { got := t.Process() s.False(got) - s.Equal(datapb.CompactionTaskState_executing, t.State) + s.Equal(datapb.CompactionTaskState_executing, t.GetState()) + }) + + // stay in executing state when GetCompactionPlanResults error except ErrNodeNotFound + s.Run("test executing GetCompactionPlanResult fail NodeNotFound", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_executing) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + + s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).Return(nil, merr.WrapErrNodeNotFound(t.NodeID)).Once() + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once() + + got := t.Process() + s.False(got) + s.Equal(datapb.CompactionTaskState_pipelining, t.GetState()) + s.EqualValues(NullNodeID, t.GetNodeID()) + }) + + // stay in executing state when GetCompactionPlanResults error except ErrNodeNotFound + s.Run("test executing GetCompactionPlanResult fail mock error", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_executing) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + + s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything).Return(nil, errors.New("mock error")).Times(12) + for i := 0; i < 12; i++ { + got := t.Process() + s.False(got) + s.Equal(datapb.CompactionTaskState_executing, t.GetState()) + s.EqualValues(100, t.GetNodeID()) + } + }) + + s.Run("test executing with result executing", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_executing) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + + s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything). + Return(&datapb.CompactionPlanResult{ + PlanID: t.GetPlanID(), + State: datapb.CompactionTaskState_executing, + }, nil).Twice() + + got := t.Process() + s.False(got) + + // test timeout + t.StartTime = time.Now().Add(-time.Hour).Unix() + t.TimeoutInSeconds = 10 + + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil) + s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false). + RunAndReturn(func(inputs []int64, compacting bool) { + s.ElementsMatch(inputs, t.GetInputSegments()) + s.False(compacting) + }).Once() + + got = t.Process() + s.True(got) + s.Equal(datapb.CompactionTaskState_timeout, t.GetState()) + }) + + s.Run("test executing with result executing timeout and updataAndSaveTaskMeta failed", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_executing) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + + s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything). + Return(&datapb.CompactionPlanResult{ + PlanID: t.GetPlanID(), + State: datapb.CompactionTaskState_executing, + }, nil).Once() + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once() + + t.StartTime = time.Now().Add(-time.Hour).Unix() + t.TimeoutInSeconds = 10 + + got := t.Process() + s.False(got) + s.Equal(datapb.CompactionTaskState_executing, t.GetState()) + }) + + s.Run("test executing with result completed", func() { + t := s.generateTestL0Task(datapb.CompactionTaskState_executing) + t.NodeID = 100 + s.Require().True(t.GetNodeID() > 0) + + s.mockSessMgr.EXPECT().GetCompactionPlanResult(t.NodeID, mock.Anything). + Return(&datapb.CompactionPlanResult{ + PlanID: t.GetPlanID(), + State: datapb.CompactionTaskState_completed, + }, nil).Once() + s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(errors.New("mock error")).Once() + + t.StartTime = time.Now().Add(-time.Hour).Unix() + t.TimeoutInSeconds = 10 + + got := t.Process() + s.False(got) + s.Equal(datapb.CompactionTaskState_executing, t.GetState()) }) }