diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index b02a7c0f2bcd0..4889dc1b05b6b 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -436,6 +436,10 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu ret = append(ret, id) continue } + if info.State == commonpb.SegmentState_Flushing || + info.State == commonpb.SegmentState_Flushed { + continue + } if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil { return nil, err } @@ -526,7 +530,9 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error { continue } channelInfo[info.InsertChannel] = append(channelInfo[info.InsertChannel], info) - if info.State == commonpb.SegmentState_Sealed { + if info.State == commonpb.SegmentState_Sealed || + info.State == commonpb.SegmentState_Flushing || + info.State == commonpb.SegmentState_Flushed { continue } // change shouldSeal to segment seal policy logic @@ -543,7 +549,9 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error { for _, policy := range s.channelSealPolicies { vs := policy(channel, segmentInfos, ts) for _, info := range vs { - if info.State == commonpb.SegmentState_Sealed { + if info.State == commonpb.SegmentState_Sealed || + info.State == commonpb.SegmentState_Flushing || + info.State == commonpb.SegmentState_Flushed { continue } if err := s.meta.SetState(info.GetID(), commonpb.SegmentState_Sealed); err != nil { diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index 1316df2da36aa..9e32bd3aaf1ba 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -40,7 +40,7 @@ import ( ) const ( - MaxPendingCount = 5000 // TODO: Make this configurable. + MaxPendingCount = 65536 // TODO: Make this configurable. delimiter = "/" ) @@ -54,16 +54,11 @@ var checkPendingTasksInterval = 60 * 1000 var cleanUpLoopInterval = 5 * 60 * 1000 // flipPersistedTaskInterval is the default interval to loop through tasks and check if their states needs to be -// flipped/updated from `ImportPersisted` to `ImportFlushed`. +// flipped/updated from `ImportPersisted` to `ImportCompleted`. // default 2 * 1000 milliseconds (2 seconds) // TODO: Make this configurable. var flipPersistedTaskInterval = 2 * 1000 -// flipFlushedTaskInterval is the default interval to loop through tasks and check if their states needs to be -// flipped/updated from `ImportFlushed` to `ImportCompleted`. -// default 5 * 1000 milliseconds (5 seconds) -var flipFlushedTaskInterval = 5 * 1000 - // importManager manager for import tasks type importManager struct { ctx context.Context // reserved @@ -85,8 +80,6 @@ type importManager struct { getCollectionName func(collID, partitionID typeutil.UniqueID) (string, string, error) callMarkSegmentsDropped func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error) callGetSegmentStates func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) - callDescribeIndex func(ctx context.Context, colID UniqueID) (*datapb.DescribeIndexResponse, error) - callGetSegmentIndexState func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*datapb.SegmentIndexState, error) callUnsetIsImportingState func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) } @@ -97,8 +90,6 @@ func newImportManager(ctx context.Context, client kv.TxnKV, markSegmentsDropped func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error), getSegmentStates func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error), getCollectionName func(collID, partitionID typeutil.UniqueID) (string, string, error), - describeIndex func(ctx context.Context, colID UniqueID) (*datapb.DescribeIndexResponse, error), - getSegmentIndexState func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*datapb.SegmentIndexState, error), unsetIsImportingState func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error)) *importManager { mgr := &importManager{ ctx: ctx, @@ -115,8 +106,6 @@ func newImportManager(ctx context.Context, client kv.TxnKV, callMarkSegmentsDropped: markSegmentsDropped, callGetSegmentStates: getSegmentStates, getCollectionName: getCollectionName, - callDescribeIndex: describeIndex, - callGetSegmentIndexState: getSegmentIndexState, callUnsetIsImportingState: unsetIsImportingState, } return mgr @@ -158,9 +147,7 @@ func (m *importManager) sendOutTasksLoop(wg *sync.WaitGroup) { func (m *importManager) flipTaskStateLoop(wg *sync.WaitGroup) { defer wg.Done() flipPersistedTicker := time.NewTicker(time.Duration(flipPersistedTaskInterval) * time.Millisecond) - flipFlushedTicker := time.NewTicker(time.Duration(flipFlushedTaskInterval) * time.Millisecond) defer flipPersistedTicker.Stop() - defer flipFlushedTicker.Stop() for { select { case <-m.ctx.Done(): @@ -171,11 +158,6 @@ func (m *importManager) flipTaskStateLoop(wg *sync.WaitGroup) { if err := m.loadAndFlipPersistedTasks(m.ctx); err != nil { log.Error("failed to flip ImportPersisted task", zap.Error(err)) } - case <-flipFlushedTicker.C: - log.Debug("start trying to flip ImportFlushed task") - if err := m.loadAndFlipFlushedTasks(m.ctx); err != nil { - log.Error("failed to flip ImportPersisted task", zap.Error(err)) - } } } } @@ -285,7 +267,7 @@ func (m *importManager) sendOutTasks(ctx context.Context) error { } // loadAndFlipPersistedTasks checks every import task in `ImportPersisted` state and flips their import state to -// `ImportFlushed` if eligible. +// `ImportCompleted` if eligible. func (m *importManager) loadAndFlipPersistedTasks(ctx context.Context) error { var importTasks []*datapb.ImportTaskInfo var err error @@ -295,9 +277,9 @@ func (m *importManager) loadAndFlipPersistedTasks(ctx context.Context) error { } for _, task := range importTasks { - // Checking if ImportPersisted --> ImportFlushed ready. + // Checking if ImportPersisted --> ImportCompleted ready. if task.GetState().GetStateCode() == commonpb.ImportState_ImportPersisted { - log.Info(" task found, checking if it is eligible to become ", + log.Info(" task found, checking if it is eligible to become ", zap.Int64("task ID", task.GetId())) importTask := m.getTaskState(task.GetId()) @@ -312,37 +294,6 @@ func (m *importManager) loadAndFlipPersistedTasks(ctx context.Context) error { return nil } -// loadAndFlipFlushedTasks checks every import task in `ImportFlushed` state and flips their import state to -// `ImportComplete` if eligible. -func (m *importManager) loadAndFlipFlushedTasks(ctx context.Context) error { - var importTasks []*datapb.ImportTaskInfo - var err error - if importTasks, err = m.loadFromTaskStore(false); err != nil { - log.Error("failed to load from task store", zap.Error(err)) - return err - } - - for _, task := range importTasks { - if task.GetState().GetStateCode() == commonpb.ImportState_ImportFlushed { - log.Info(" task found, checking if it is eligible to become ", - zap.Int64("task ID", task.GetId())) - importTask := m.getTaskState(task.GetId()) - - // TODO: if collection or partition has been dropped before the task complete, - // we need to set the task to failed, because the checkIndexingDone() cannot know - // whether the collection has been dropped. - - // if this method failed, skip this task, try again in next round - if err = m.flipTaskIndexState(ctx, importTask); err != nil { - log.Error("failed to flip task index state", - zap.Int64("task ID", task.GetId()), - zap.Error(err)) - } - } - } - return nil -} - func (m *importManager) flipTaskFlushedState(ctx context.Context, importTask *milvuspb.GetImportStateResponse, dataNodeID int64) error { ok, err := m.checkFlushDone(ctx, importTask.GetSegmentIds()) if err != nil { @@ -362,30 +313,14 @@ func (m *importManager) flipTaskFlushedState(ctx context.Context, importTask *mi zap.Int64("task ID", importTask.GetId())) }() - if err := m.setImportTaskState(importTask.GetId(), commonpb.ImportState_ImportFlushed); err != nil { - log.Error("failed to set import task state", - zap.Int64("task ID", importTask.GetId()), - zap.Any("target state", commonpb.ImportState_ImportFlushed), - zap.Error(err)) - return err - } - if err = m.sendOutTasks(m.ctx); err != nil { - log.Error("fail to send out import task to DataNodes", - zap.Int64("task ID", importTask.GetId())) + // Unset isImporting flag. + if m.callUnsetIsImportingState == nil { + log.Error("callUnsetIsImportingState function of importManager is nil") + return fmt.Errorf("failed to describe index: segment state method of import manager is nil") } - } - return nil -} - -func (m *importManager) flipTaskIndexState(ctx context.Context, importTask *milvuspb.GetImportStateResponse) error { - ok, err := m.checkIndexingDone(ctx, importTask.GetCollectionId(), importTask.GetSegmentIds()) - if err != nil { - log.Error("an error occurred while checking index state of segments", - zap.Int64("task ID", importTask.GetId()), - zap.Error(err)) - return err - } - if ok { + _, err := m.callUnsetIsImportingState(ctx, &datapb.UnsetIsImportingStateRequest{ + SegmentIds: importTask.GetSegmentIds(), + }) if err := m.setImportTaskState(importTask.GetId(), commonpb.ImportState_ImportCompleted); err != nil { log.Error("failed to set import task state", zap.Int64("task ID", importTask.GetId()), @@ -393,31 +328,17 @@ func (m *importManager) flipTaskIndexState(ctx context.Context, importTask *milv zap.Error(err)) return err } - log.Info("indexes are successfully built and the import task has complete!", - zap.Int64("task ID", importTask.GetId())) - log.Info("now start unsetting isImporting state of segments", - zap.Int64("task ID", importTask.GetId()), - zap.Int64s("segment IDs", importTask.GetSegmentIds())) - // Remove the `isImport` states of these segments only when the import task reaches `ImportState_ImportCompleted` state. - if m.callUnsetIsImportingState == nil { - log.Error("callUnsetIsImportingState function of importManager is nil") - return fmt.Errorf("failed to describe index: segment state method of import manager is nil") - } - status, err := m.callUnsetIsImportingState(ctx, &datapb.UnsetIsImportingStateRequest{ - SegmentIds: importTask.GetSegmentIds(), - }) if err != nil { log.Error("failed to unset importing state of all segments (could be partial failure)", zap.Error(err)) return err } - if status.GetErrorCode() != commonpb.ErrorCode_Success { - log.Error("failed to unset importing state of all segments (could be partial failure)", - zap.Error(errors.New(status.GetReason()))) - return errors.New(status.GetReason()) + // Start working on new bulk insert tasks. + if err = m.sendOutTasks(m.ctx); err != nil { + log.Error("fail to send out import task to DataNodes", + zap.Int64("task ID", importTask.GetId())) } } - return nil } @@ -446,68 +367,6 @@ func (m *importManager) checkFlushDone(ctx context.Context, segIDs []UniqueID) ( return true, nil } -// checkIndexingDone checks if indexes are successfully built on segments in `allSegmentIDs`. -// It returns error on errors. It returns true if indexes are successfully built on all segments and returns false otherwise. -func (m *importManager) checkIndexingDone(ctx context.Context, collID UniqueID, allSegmentIDs []UniqueID) (bool, error) { - if m.callDescribeIndex == nil { - log.Error("callDescribeIndex function of importManager is nil") - return false, fmt.Errorf("failed to describe index: describe index method of import manager is nil") - } - - // Check if collection has indexed fields. - var descIdxResp *datapb.DescribeIndexResponse - var err error - if descIdxResp, err = m.callDescribeIndex(ctx, collID); err != nil { - log.Error("failed to describe index", - zap.Int64("collection ID", collID), - zap.Error(err)) - return false, err - } - if descIdxResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success && - descIdxResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_IndexNotExist { - log.Error("failed to describe index", - zap.Int64("collection ID", collID), - zap.String("reason", descIdxResp.GetStatus().GetReason())) - return false, errors.New(descIdxResp.GetStatus().GetReason()) - } - log.Info("index info retrieved for collection", - zap.Int64("collection ID", collID), - zap.Any("index info", descIdxResp.GetIndexInfos())) - if descIdxResp.GetStatus().GetErrorCode() == commonpb.ErrorCode_IndexNotExist || - len(descIdxResp.GetIndexInfos()) == 0 { - log.Info("index doesn't exist for collection", - zap.Int64("collection ID", collID)) - return true, nil - } - indexedSegmentCount := len(allSegmentIDs) - for _, indexInfo := range descIdxResp.GetIndexInfos() { - states, err := m.callGetSegmentIndexState(ctx, collID, indexInfo.GetIndexName(), allSegmentIDs) - if err != nil { - log.Error("failed to get index state in checkIndexingDone", zap.Error(err)) - return false, err - } - - // Count the # of segments with finished index. - ct := 0 - for _, s := range states { - if s.State == commonpb.IndexState_Finished { - ct++ - } - } - - if ct < indexedSegmentCount { - indexedSegmentCount = ct - } - } - - log.Info("segment indexing state checked", - zap.Int64s("segments checked", allSegmentIDs), - zap.Int("# of segments with complete index", indexedSegmentCount), - zap.Int64("collection ID", collID), - ) - return len(allSegmentIDs) == indexedSegmentCount, nil -} - func (m *importManager) isRowbased(files []string) (bool, error) { isRowBased := false for _, filePath := range files { @@ -910,7 +769,7 @@ func (m *importManager) getTaskState(tID int64) *milvuspb.GetImportStateResponse // other in-progress tasks as failed, when `load2Mem` is set to `true`. // loadFromTaskStore instead returns a list of all import tasks if `load2Mem` is set to `false`. func (m *importManager) loadFromTaskStore(load2Mem bool) ([]*datapb.ImportTaskInfo, error) { - log.Info("import manager starts loading from Etcd") + log.Debug("import manager starts loading from Etcd") _, v, err := m.taskStore.LoadWithPrefix(Params.RootCoordCfg.ImportTaskSubPath.GetValue()) if err != nil { log.Error("import manager failed to load from Etcd", zap.Error(err)) diff --git a/internal/rootcoord/import_manager_test.go b/internal/rootcoord/import_manager_test.go index fbc5858f92117..543217767b7ae 100644 --- a/internal/rootcoord/import_manager_test.go +++ b/internal/rootcoord/import_manager_test.go @@ -71,7 +71,7 @@ func TestImportManager_NewImportManager(t *testing.T) { ti3 := &datapb.ImportTaskInfo{ Id: 300, State: &datapb.ImportTaskState{ - StateCode: commonpb.ImportState_ImportFlushed, + StateCode: commonpb.ImportState_ImportCompleted, }, CreateTs: time.Now().Unix() - 100, } @@ -119,7 +119,7 @@ func TestImportManager_NewImportManager(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) assert.NotNil(t, mgr) // there are 2 tasks read from store, one is pending, the other is persisted. @@ -151,7 +151,7 @@ func TestImportManager_NewImportManager(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) defer cancel() - mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) assert.NotNil(t, mgr) mgr.init(context.TODO()) var wgLoop sync.WaitGroup @@ -170,7 +170,7 @@ func TestImportManager_NewImportManager(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) defer cancel() - mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) assert.NotNil(t, mgr) assert.Panics(t, func() { mgr.init(context.TODO()) @@ -187,7 +187,7 @@ func TestImportManager_NewImportManager(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) defer cancel() - mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) assert.NotNil(t, mgr) mgr.init(context.TODO()) }) @@ -201,7 +201,7 @@ func TestImportManager_NewImportManager(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) defer cancel() - mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) assert.NotNil(t, mgr) mgr.init(context.TODO()) func() { @@ -221,7 +221,7 @@ func TestImportManager_NewImportManager(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) assert.NotNil(t, mgr) mgr.init(ctx) var wgLoop sync.WaitGroup @@ -279,7 +279,7 @@ func TestImportManager_TestSetImportTaskState(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - mgr := newImportManager(ctx, mockKv, idAlloc, nil, nil, nil, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockKv, idAlloc, nil, nil, nil, nil, nil) assert.NotNil(t, mgr) _, err := mgr.loadFromTaskStore(true) assert.NoError(t, err) @@ -381,7 +381,7 @@ func TestImportManager_TestEtcdCleanUp(t *testing.T) { } ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) assert.NotNil(t, mgr) _, err = mgr.loadFromTaskStore(true) assert.NoError(t, err) @@ -433,7 +433,7 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { ti3 := &datapb.ImportTaskInfo{ Id: 300, State: &datapb.ImportTaskState{ - StateCode: commonpb.ImportState_ImportFlushed, + StateCode: commonpb.ImportState_ImportCompleted, Segments: []int64{204, 205, 206}, }, CreateTs: time.Now().Unix() - 100, @@ -477,45 +477,6 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { }, nil } - callDescribeIndex := func(ctx context.Context, colID UniqueID) (*datapb.DescribeIndexResponse, error) { - return &datapb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - IndexInfos: []*datapb.IndexInfo{ - {}, - }, - }, nil - } - callGetSegmentIndexState := func(ctx context.Context, collID UniqueID, indexName string, - segIDs []UniqueID) ([]*datapb.SegmentIndexState, error) { - return []*datapb.SegmentIndexState{ - { - SegmentID: 201, - State: commonpb.IndexState_Finished, - }, - { - SegmentID: 202, - State: commonpb.IndexState_Finished, - }, - { - SegmentID: 203, - State: commonpb.IndexState_Finished, - }, - { - SegmentID: 204, - State: commonpb.IndexState_Finished, - }, - { - SegmentID: 205, - State: commonpb.IndexState_Finished, - }, - { - SegmentID: 206, - State: commonpb.IndexState_Finished, - }, - }, nil - } callUnsetIsImportingState := func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) { return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -523,7 +484,6 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { } flipPersistedTaskInterval = 20 - flipFlushedTaskInterval = 50 var wg sync.WaitGroup wg.Add(1) t.Run("normal case", func(t *testing.T) { @@ -531,7 +491,7 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, - callGetSegmentStates, nil, callDescribeIndex, callGetSegmentIndexState, callUnsetIsImportingState) + callGetSegmentStates, nil, callUnsetIsImportingState) assert.NotNil(t, mgr) var wgLoop sync.WaitGroup wgLoop.Add(1) @@ -545,15 +505,8 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - callDescribeIndex = func(ctx context.Context, colID UniqueID) (*datapb.DescribeIndexResponse, error) { - return &datapb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - }, nil - } mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, - callGetSegmentStates, nil, callDescribeIndex, callGetSegmentIndexState, callUnsetIsImportingState) + callGetSegmentStates, nil, callUnsetIsImportingState) assert.NotNil(t, mgr) var wgLoop sync.WaitGroup wgLoop.Add(1) @@ -567,15 +520,8 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - callDescribeIndex = func(ctx context.Context, colID UniqueID) (*datapb.DescribeIndexResponse, error) { - return &datapb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IndexNotExist, - }, - }, nil - } mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, - callGetSegmentStates, nil, callDescribeIndex, callGetSegmentIndexState, callUnsetIsImportingState) + callGetSegmentStates, nil, callUnsetIsImportingState) assert.NotNil(t, mgr) var wgLoop sync.WaitGroup wgLoop.Add(1) @@ -612,7 +558,7 @@ func TestImportManager_ImportJob(t *testing.T) { }, nil } // nil request - mgr := newImportManager(context.TODO(), mockKv, idAlloc, nil, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr := newImportManager(context.TODO(), mockKv, idAlloc, nil, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp := mgr.importJob(context.TODO(), nil, colID, 0) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) @@ -641,7 +587,7 @@ func TestImportManager_ImportJob(t *testing.T) { // row-based case, task count equal to file count // since the importServiceFunc return error, tasks will be kept in pending list rowReq.Files = []string{"f1.json"} - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp = mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, len(rowReq.Files), len(mgr.pendingTasks)) assert.Equal(t, 0, len(mgr.workingTasks)) @@ -654,7 +600,7 @@ func TestImportManager_ImportJob(t *testing.T) { // column-based case, one quest one task // since the importServiceFunc return error, tasks will be kept in pending list - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp = mgr.importJob(context.TODO(), colReq, colID, 0) assert.Equal(t, 1, len(mgr.pendingTasks)) assert.Equal(t, 0, len(mgr.workingTasks)) @@ -668,13 +614,13 @@ func TestImportManager_ImportJob(t *testing.T) { } // row-based case, since the importServiceFunc return success, tasks will be sent to working list - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp = mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, 0, len(mgr.pendingTasks)) assert.Equal(t, len(rowReq.Files), len(mgr.workingTasks)) // column-based case, since the importServiceFunc return success, tasks will be sent to working list - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp = mgr.importJob(context.TODO(), colReq, colID, 0) assert.Equal(t, 0, len(mgr.pendingTasks)) assert.Equal(t, 1, len(mgr.workingTasks)) @@ -698,7 +644,7 @@ func TestImportManager_ImportJob(t *testing.T) { // row-based case, since the importServiceFunc return success for 1 task // the first task is sent to working list, and 1 task left in pending list - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp = mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, 0, len(mgr.pendingTasks)) assert.Equal(t, 1, len(mgr.workingTasks)) @@ -781,7 +727,7 @@ func TestImportManager_AllDataNodesBusy(t *testing.T) { } // each data node owns one task - mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) for i := 0; i < len(dnList); i++ { resp := mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) @@ -790,7 +736,7 @@ func TestImportManager_AllDataNodesBusy(t *testing.T) { } // all data nodes are busy, new task waiting in pending list - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp := mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) assert.Equal(t, len(rowReq.Files), len(mgr.pendingTasks)) @@ -798,7 +744,7 @@ func TestImportManager_AllDataNodesBusy(t *testing.T) { // now all data nodes are free again, new task is executed instantly count = 0 - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp = mgr.importJob(context.TODO(), colReq, colID, 0) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) assert.Equal(t, 0, len(mgr.pendingTasks)) @@ -862,7 +808,7 @@ func TestImportManager_TaskState(t *testing.T) { } // add 3 tasks, their ID is 10000, 10001, 10002, make sure updateTaskInfo() works correctly - mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) mgr.importJob(context.TODO(), rowReq, colID, 0) rowReq.Files = []string{"f2.json"} mgr.importJob(context.TODO(), rowReq, colID, 0) @@ -976,7 +922,7 @@ func TestImportManager_AllocFail(t *testing.T) { }, }, nil } - mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp := mgr.importJob(context.TODO(), rowReq, colID, 0) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) assert.Equal(t, 0, len(mgr.pendingTasks)) @@ -1044,7 +990,7 @@ func TestImportManager_ListAllTasks(t *testing.T) { } mockKv := memkv.NewMemoryKV() - mgr := newImportManager(context.TODO(), mockKv, idAlloc, fn, callMarkSegmentsDropped, callGetSegmentStates, getCollectionName, nil, nil, nil) + mgr := newImportManager(context.TODO(), mockKv, idAlloc, fn, callMarkSegmentsDropped, callGetSegmentStates, getCollectionName, nil) // add 10 tasks for collection1, id from 1 to 10 file1 := "f1.json" @@ -1229,167 +1175,3 @@ func TestImportManager_isRowbased(t *testing.T) { assert.Nil(t, err) assert.False(t, rb) } - -func TestImportManager_checkIndexingDone(t *testing.T) { - ctx := context.Background() - - mgr := &importManager{ - callDescribeIndex: func(ctx context.Context, colID UniqueID) (*datapb.DescribeIndexResponse, error) { - return nil, errors.New("error") - }, - callGetSegmentIndexState: func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*datapb.SegmentIndexState, error) { - return nil, errors.New("error") - }, - callGetSegmentStates: func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { - return nil, errors.New("error") - }, - } - - segmentsID := []typeutil.UniqueID{1, 2, 3} - - done, err := mgr.checkFlushDone(ctx, segmentsID) - assert.False(t, done) - assert.Error(t, err) - // check index of 3 segments - // callDescribeIndex() failed - done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) - assert.False(t, done) - assert.Error(t, err) - - mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*datapb.DescribeIndexResponse, error) { - return &datapb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - }, nil - } - - done, err = mgr.checkFlushDone(ctx, segmentsID) - assert.False(t, done) - assert.Error(t, err) - // callDescribeIndex() unexpected error - done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) - assert.False(t, done) - assert.Error(t, err) - - mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*datapb.DescribeIndexResponse, error) { - return &datapb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IndexNotExist, - }, - }, nil - } - mgr.callGetSegmentStates = func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { - return &datapb.GetSegmentStatesResponse{ - States: []*datapb.SegmentStateInfo{ - { - SegmentID: 1, - State: commonpb.SegmentState_Flushed, - }, - { - SegmentID: 1, - State: commonpb.SegmentState_Flushed, - }, - { - SegmentID: 1, - State: commonpb.SegmentState_Flushed, - }, - }, - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - }, nil - } - - done, err = mgr.checkFlushDone(ctx, segmentsID) - assert.True(t, done) - assert.NoError(t, err) - // callDescribeIndex() index not exist - done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) - assert.True(t, done) - assert.Nil(t, err) - - mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*datapb.DescribeIndexResponse, error) { - return &datapb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - IndexInfos: []*datapb.IndexInfo{ - { - State: commonpb.IndexState_Finished, - }, - }, - }, nil - } - - done, err = mgr.checkFlushDone(ctx, segmentsID) - assert.True(t, done) - assert.NoError(t, err) - // callGetSegmentIndexState() failed - done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) - assert.False(t, done) - assert.Error(t, err) - - mgr.callGetSegmentIndexState = func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*datapb.SegmentIndexState, error) { - return []*datapb.SegmentIndexState{ - { - State: commonpb.IndexState_Finished, - }, - }, nil - } - - done, err = mgr.checkFlushDone(ctx, segmentsID) - assert.True(t, done) - assert.NoError(t, err) - // only 1 segment indexed - done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) - assert.False(t, done) - assert.Nil(t, err) - - mgr.callGetSegmentIndexState = func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*datapb.SegmentIndexState, error) { - return []*datapb.SegmentIndexState{ - { - State: commonpb.IndexState_Finished, - }, - { - State: commonpb.IndexState_Finished, - }, - { - State: commonpb.IndexState_Finished, - }, - }, nil - } - - done, err = mgr.checkFlushDone(ctx, segmentsID) - assert.True(t, done) - assert.NoError(t, err) - // all segments indexed - done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) - assert.True(t, done) - assert.Nil(t, err) - - mgr.callGetSegmentStates = func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { - return &datapb.GetSegmentStatesResponse{ - States: []*datapb.SegmentStateInfo{ - { - SegmentID: 1, - State: commonpb.SegmentState_Flushed, - }, - { - SegmentID: 1, - State: commonpb.SegmentState_Flushed, - }, - { - SegmentID: 1, - State: commonpb.SegmentState_Importing, - }, - }, - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - }, nil - } - done, err = mgr.checkFlushDone(ctx, segmentsID) - assert.False(t, done) - assert.NoError(t, err) -} diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index d0472cf43d603..ac7a1e2ec0ce4 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -412,8 +412,6 @@ func (c *Core) initImportManager() error { f.NewMarkSegmentsDroppedFunc(), f.NewGetSegmentStatesFunc(), f.NewGetCollectionNameFunc(), - f.NewDescribeIndexFunc(), - f.NewGetSegmentIndexStateFunc(), f.NewUnsetIsImportingStateFunc(), ) c.importManager.init(c.ctx) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index fb91169c66e1a..9b6cdd6407434 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -934,7 +934,7 @@ func TestCore_GetImportState(t *testing.T) { t.Run("normal case", func(t *testing.T) { ctx := context.Background() c := newTestCore(withHealthyCode()) - c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil, nil, nil) + c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil) resp, err := c.GetImportState(ctx, &milvuspb.GetImportStateRequest{ Task: 100, }) @@ -1018,7 +1018,7 @@ func TestCore_ListImportTasks(t *testing.T) { ctx := context.Background() c := newTestCore(withHealthyCode(), withMeta(meta)) - c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil, nil, nil) + c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil) // list all tasks resp, err := c.ListImportTasks(ctx, &milvuspb.ListImportTasksRequest{}) @@ -1166,14 +1166,6 @@ func TestCore_ReportImport(t *testing.T) { }, nil } - callDescribeIndex := func(ctx context.Context, colID UniqueID) (*datapb.DescribeIndexResponse, error) { - return &datapb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IndexNotExist, - }, - }, nil - } - callUnsetIsImportingState := func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) { return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -1191,7 +1183,7 @@ func TestCore_ReportImport(t *testing.T) { t.Run("report complete import with task not found", func(t *testing.T) { ctx := context.Background() c := newTestCore(withHealthyCode()) - c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{ TaskId: 101, State: commonpb.ImportState_ImportCompleted, @@ -1203,7 +1195,7 @@ func TestCore_ReportImport(t *testing.T) { t.Run("report import started state", func(t *testing.T) { ctx := context.Background() c := newTestCore(withHealthyCode()) - c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) + c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) c.importManager.loadFromTaskStore(true) c.importManager.sendOutTasks(ctx) resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{ @@ -1226,8 +1218,7 @@ func TestCore_ReportImport(t *testing.T) { withTtSynchronizer(ticker), withDataCoord(dc)) c.broker = newServerBroker(c) - c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, - callDescribeIndex, nil, callUnsetIsImportingState) + c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, callUnsetIsImportingState) c.importManager.loadFromTaskStore(true) c.importManager.sendOutTasks(ctx)