Skip to content

Commit

Permalink
Remove index checking phase for bulk insert (#21578)
Browse files Browse the repository at this point in the history
/kind improvement

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
  • Loading branch information
soothing-rain authored Jan 9, 2023
1 parent 86e626b commit 23ceb9c
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 419 deletions.
12 changes: 10 additions & 2 deletions internal/datacoord/segment_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,10 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu
ret = append(ret, id)
continue
}
if info.State == commonpb.SegmentState_Flushing ||
info.State == commonpb.SegmentState_Flushed {
continue
}
if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil {
return nil, err
}
Expand Down Expand Up @@ -526,7 +530,9 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
continue
}
channelInfo[info.InsertChannel] = append(channelInfo[info.InsertChannel], info)
if info.State == commonpb.SegmentState_Sealed {
if info.State == commonpb.SegmentState_Sealed ||
info.State == commonpb.SegmentState_Flushing ||
info.State == commonpb.SegmentState_Flushed {
continue
}
// change shouldSeal to segment seal policy logic
Expand All @@ -543,7 +549,9 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
for _, policy := range s.channelSealPolicies {
vs := policy(channel, segmentInfos, ts)
for _, info := range vs {
if info.State == commonpb.SegmentState_Sealed {
if info.State == commonpb.SegmentState_Sealed ||
info.State == commonpb.SegmentState_Flushing ||
info.State == commonpb.SegmentState_Flushed {
continue
}
if err := s.meta.SetState(info.GetID(), commonpb.SegmentState_Sealed); err != nil {
Expand Down
175 changes: 17 additions & 158 deletions internal/rootcoord/import_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
)

const (
MaxPendingCount = 5000 // TODO: Make this configurable.
MaxPendingCount = 65536 // TODO: Make this configurable.
delimiter = "/"
)

Expand All @@ -54,16 +54,11 @@ var checkPendingTasksInterval = 60 * 1000
var cleanUpLoopInterval = 5 * 60 * 1000

// flipPersistedTaskInterval is the default interval to loop through tasks and check if their states needs to be
// flipped/updated from `ImportPersisted` to `ImportFlushed`.
// flipped/updated from `ImportPersisted` to `ImportCompleted`.
// default 2 * 1000 milliseconds (2 seconds)
// TODO: Make this configurable.
var flipPersistedTaskInterval = 2 * 1000

// flipFlushedTaskInterval is the default interval to loop through tasks and check if their states needs to be
// flipped/updated from `ImportFlushed` to `ImportCompleted`.
// default 5 * 1000 milliseconds (5 seconds)
var flipFlushedTaskInterval = 5 * 1000

// importManager manager for import tasks
type importManager struct {
ctx context.Context // reserved
Expand All @@ -85,8 +80,6 @@ type importManager struct {
getCollectionName func(collID, partitionID typeutil.UniqueID) (string, string, error)
callMarkSegmentsDropped func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error)
callGetSegmentStates func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error)
callDescribeIndex func(ctx context.Context, colID UniqueID) (*datapb.DescribeIndexResponse, error)
callGetSegmentIndexState func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*datapb.SegmentIndexState, error)
callUnsetIsImportingState func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error)
}

Expand All @@ -97,8 +90,6 @@ func newImportManager(ctx context.Context, client kv.TxnKV,
markSegmentsDropped func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error),
getSegmentStates func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error),
getCollectionName func(collID, partitionID typeutil.UniqueID) (string, string, error),
describeIndex func(ctx context.Context, colID UniqueID) (*datapb.DescribeIndexResponse, error),
getSegmentIndexState func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*datapb.SegmentIndexState, error),
unsetIsImportingState func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error)) *importManager {
mgr := &importManager{
ctx: ctx,
Expand All @@ -115,8 +106,6 @@ func newImportManager(ctx context.Context, client kv.TxnKV,
callMarkSegmentsDropped: markSegmentsDropped,
callGetSegmentStates: getSegmentStates,
getCollectionName: getCollectionName,
callDescribeIndex: describeIndex,
callGetSegmentIndexState: getSegmentIndexState,
callUnsetIsImportingState: unsetIsImportingState,
}
return mgr
Expand Down Expand Up @@ -158,9 +147,7 @@ func (m *importManager) sendOutTasksLoop(wg *sync.WaitGroup) {
func (m *importManager) flipTaskStateLoop(wg *sync.WaitGroup) {
defer wg.Done()
flipPersistedTicker := time.NewTicker(time.Duration(flipPersistedTaskInterval) * time.Millisecond)
flipFlushedTicker := time.NewTicker(time.Duration(flipFlushedTaskInterval) * time.Millisecond)
defer flipPersistedTicker.Stop()
defer flipFlushedTicker.Stop()
for {
select {
case <-m.ctx.Done():
Expand All @@ -171,11 +158,6 @@ func (m *importManager) flipTaskStateLoop(wg *sync.WaitGroup) {
if err := m.loadAndFlipPersistedTasks(m.ctx); err != nil {
log.Error("failed to flip ImportPersisted task", zap.Error(err))
}
case <-flipFlushedTicker.C:
log.Debug("start trying to flip ImportFlushed task")
if err := m.loadAndFlipFlushedTasks(m.ctx); err != nil {
log.Error("failed to flip ImportPersisted task", zap.Error(err))
}
}
}
}
Expand Down Expand Up @@ -285,7 +267,7 @@ func (m *importManager) sendOutTasks(ctx context.Context) error {
}

// loadAndFlipPersistedTasks checks every import task in `ImportPersisted` state and flips their import state to
// `ImportFlushed` if eligible.
// `ImportCompleted` if eligible.
func (m *importManager) loadAndFlipPersistedTasks(ctx context.Context) error {
var importTasks []*datapb.ImportTaskInfo
var err error
Expand All @@ -295,9 +277,9 @@ func (m *importManager) loadAndFlipPersistedTasks(ctx context.Context) error {
}

for _, task := range importTasks {
// Checking if ImportPersisted --> ImportFlushed ready.
// Checking if ImportPersisted --> ImportCompleted ready.
if task.GetState().GetStateCode() == commonpb.ImportState_ImportPersisted {
log.Info("<ImportPersisted> task found, checking if it is eligible to become <ImportFlushed>",
log.Info("<ImportPersisted> task found, checking if it is eligible to become <ImportCompleted>",
zap.Int64("task ID", task.GetId()))
importTask := m.getTaskState(task.GetId())

Expand All @@ -312,37 +294,6 @@ func (m *importManager) loadAndFlipPersistedTasks(ctx context.Context) error {
return nil
}

// loadAndFlipFlushedTasks checks every import task in `ImportFlushed` state and flips their import state to
// `ImportComplete` if eligible.
func (m *importManager) loadAndFlipFlushedTasks(ctx context.Context) error {
var importTasks []*datapb.ImportTaskInfo
var err error
if importTasks, err = m.loadFromTaskStore(false); err != nil {
log.Error("failed to load from task store", zap.Error(err))
return err
}

for _, task := range importTasks {
if task.GetState().GetStateCode() == commonpb.ImportState_ImportFlushed {
log.Info("<ImportFlushed> task found, checking if it is eligible to become <ImportCompleted>",
zap.Int64("task ID", task.GetId()))
importTask := m.getTaskState(task.GetId())

// TODO: if collection or partition has been dropped before the task complete,
// we need to set the task to failed, because the checkIndexingDone() cannot know
// whether the collection has been dropped.

// if this method failed, skip this task, try again in next round
if err = m.flipTaskIndexState(ctx, importTask); err != nil {
log.Error("failed to flip task index state",
zap.Int64("task ID", task.GetId()),
zap.Error(err))
}
}
}
return nil
}

func (m *importManager) flipTaskFlushedState(ctx context.Context, importTask *milvuspb.GetImportStateResponse, dataNodeID int64) error {
ok, err := m.checkFlushDone(ctx, importTask.GetSegmentIds())
if err != nil {
Expand All @@ -362,62 +313,32 @@ func (m *importManager) flipTaskFlushedState(ctx context.Context, importTask *mi
zap.Int64("task ID", importTask.GetId()))

}()
if err := m.setImportTaskState(importTask.GetId(), commonpb.ImportState_ImportFlushed); err != nil {
log.Error("failed to set import task state",
zap.Int64("task ID", importTask.GetId()),
zap.Any("target state", commonpb.ImportState_ImportFlushed),
zap.Error(err))
return err
}
if err = m.sendOutTasks(m.ctx); err != nil {
log.Error("fail to send out import task to DataNodes",
zap.Int64("task ID", importTask.GetId()))
// Unset isImporting flag.
if m.callUnsetIsImportingState == nil {
log.Error("callUnsetIsImportingState function of importManager is nil")
return fmt.Errorf("failed to describe index: segment state method of import manager is nil")
}
}
return nil
}

func (m *importManager) flipTaskIndexState(ctx context.Context, importTask *milvuspb.GetImportStateResponse) error {
ok, err := m.checkIndexingDone(ctx, importTask.GetCollectionId(), importTask.GetSegmentIds())
if err != nil {
log.Error("an error occurred while checking index state of segments",
zap.Int64("task ID", importTask.GetId()),
zap.Error(err))
return err
}
if ok {
_, err := m.callUnsetIsImportingState(ctx, &datapb.UnsetIsImportingStateRequest{
SegmentIds: importTask.GetSegmentIds(),
})
if err := m.setImportTaskState(importTask.GetId(), commonpb.ImportState_ImportCompleted); err != nil {
log.Error("failed to set import task state",
zap.Int64("task ID", importTask.GetId()),
zap.Any("target state", commonpb.ImportState_ImportCompleted),
zap.Error(err))
return err
}
log.Info("indexes are successfully built and the import task has complete!",
zap.Int64("task ID", importTask.GetId()))
log.Info("now start unsetting isImporting state of segments",
zap.Int64("task ID", importTask.GetId()),
zap.Int64s("segment IDs", importTask.GetSegmentIds()))
// Remove the `isImport` states of these segments only when the import task reaches `ImportState_ImportCompleted` state.
if m.callUnsetIsImportingState == nil {
log.Error("callUnsetIsImportingState function of importManager is nil")
return fmt.Errorf("failed to describe index: segment state method of import manager is nil")
}
status, err := m.callUnsetIsImportingState(ctx, &datapb.UnsetIsImportingStateRequest{
SegmentIds: importTask.GetSegmentIds(),
})
if err != nil {
log.Error("failed to unset importing state of all segments (could be partial failure)",
zap.Error(err))
return err
}
if status.GetErrorCode() != commonpb.ErrorCode_Success {
log.Error("failed to unset importing state of all segments (could be partial failure)",
zap.Error(errors.New(status.GetReason())))
return errors.New(status.GetReason())
// Start working on new bulk insert tasks.
if err = m.sendOutTasks(m.ctx); err != nil {
log.Error("fail to send out import task to DataNodes",
zap.Int64("task ID", importTask.GetId()))
}
}

return nil
}

Expand Down Expand Up @@ -446,68 +367,6 @@ func (m *importManager) checkFlushDone(ctx context.Context, segIDs []UniqueID) (
return true, nil
}

// checkIndexingDone checks if indexes are successfully built on segments in `allSegmentIDs`.
// It returns error on errors. It returns true if indexes are successfully built on all segments and returns false otherwise.
func (m *importManager) checkIndexingDone(ctx context.Context, collID UniqueID, allSegmentIDs []UniqueID) (bool, error) {
if m.callDescribeIndex == nil {
log.Error("callDescribeIndex function of importManager is nil")
return false, fmt.Errorf("failed to describe index: describe index method of import manager is nil")
}

// Check if collection has indexed fields.
var descIdxResp *datapb.DescribeIndexResponse
var err error
if descIdxResp, err = m.callDescribeIndex(ctx, collID); err != nil {
log.Error("failed to describe index",
zap.Int64("collection ID", collID),
zap.Error(err))
return false, err
}
if descIdxResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success &&
descIdxResp.GetStatus().GetErrorCode() != commonpb.ErrorCode_IndexNotExist {
log.Error("failed to describe index",
zap.Int64("collection ID", collID),
zap.String("reason", descIdxResp.GetStatus().GetReason()))
return false, errors.New(descIdxResp.GetStatus().GetReason())
}
log.Info("index info retrieved for collection",
zap.Int64("collection ID", collID),
zap.Any("index info", descIdxResp.GetIndexInfos()))
if descIdxResp.GetStatus().GetErrorCode() == commonpb.ErrorCode_IndexNotExist ||
len(descIdxResp.GetIndexInfos()) == 0 {
log.Info("index doesn't exist for collection",
zap.Int64("collection ID", collID))
return true, nil
}
indexedSegmentCount := len(allSegmentIDs)
for _, indexInfo := range descIdxResp.GetIndexInfos() {
states, err := m.callGetSegmentIndexState(ctx, collID, indexInfo.GetIndexName(), allSegmentIDs)
if err != nil {
log.Error("failed to get index state in checkIndexingDone", zap.Error(err))
return false, err
}

// Count the # of segments with finished index.
ct := 0
for _, s := range states {
if s.State == commonpb.IndexState_Finished {
ct++
}
}

if ct < indexedSegmentCount {
indexedSegmentCount = ct
}
}

log.Info("segment indexing state checked",
zap.Int64s("segments checked", allSegmentIDs),
zap.Int("# of segments with complete index", indexedSegmentCount),
zap.Int64("collection ID", collID),
)
return len(allSegmentIDs) == indexedSegmentCount, nil
}

func (m *importManager) isRowbased(files []string) (bool, error) {
isRowBased := false
for _, filePath := range files {
Expand Down Expand Up @@ -910,7 +769,7 @@ func (m *importManager) getTaskState(tID int64) *milvuspb.GetImportStateResponse
// other in-progress tasks as failed, when `load2Mem` is set to `true`.
// loadFromTaskStore instead returns a list of all import tasks if `load2Mem` is set to `false`.
func (m *importManager) loadFromTaskStore(load2Mem bool) ([]*datapb.ImportTaskInfo, error) {
log.Info("import manager starts loading from Etcd")
log.Debug("import manager starts loading from Etcd")
_, v, err := m.taskStore.LoadWithPrefix(Params.RootCoordCfg.ImportTaskSubPath.GetValue())
if err != nil {
log.Error("import manager failed to load from Etcd", zap.Error(err))
Expand Down
Loading

0 comments on commit 23ceb9c

Please sign in to comment.