Skip to content

Commit

Permalink
enhance: Ensure ImportV2 waits for the index to be built and refine s…
Browse files Browse the repository at this point in the history
…ome logic (#31629) (#31733)

Feature Introduced:
1. Ensure ImportV2 waits for the index to be built

Enhancements Introduced:
1. Utilization of local time for timeout ts instead of allocating ts
from rootcoord.
2. Enhanced input file length check for binlog import.
3. Removal of duplicated manager in datanode.
4. Renaming of executor to scheduler in datanode.
5. Utilization of a thread pool in the scheduler in datanode.

issue: #28521

pr: #31629

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper authored Apr 1, 2024
1 parent 609674c commit 808a944
Show file tree
Hide file tree
Showing 21 changed files with 306 additions and 252 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ dataCoord:
filesPerPreImportTask: 2 # The maximum number of files allowed per pre-import task.
taskRetention: 10800 # The retention period in seconds for tasks in the Completed or Failed state.
maxImportFileNumPerReq: 1024 # The maximum number of files allowed per single import request.
waitForIndex: true # Indicates whether the import operation waits for the completion of index building.

enableGarbageCollection: true
gc:
Expand Down
78 changes: 37 additions & 41 deletions internal/datacoord/import_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,12 @@ type ImportChecker interface {
}

type importChecker struct {
meta *meta
broker broker.Broker
cluster Cluster
alloc allocator
sm Manager
imeta ImportMeta
buildIndexCh chan UniqueID
meta *meta
broker broker.Broker
cluster Cluster
alloc allocator
sm Manager
imeta ImportMeta

closeOnce sync.Once
closeChan chan struct{}
Expand All @@ -57,17 +56,15 @@ func NewImportChecker(meta *meta,
alloc allocator,
sm Manager,
imeta ImportMeta,
buildIndexCh chan UniqueID,
) ImportChecker {
return &importChecker{
meta: meta,
broker: broker,
cluster: cluster,
alloc: alloc,
sm: sm,
imeta: imeta,
buildIndexCh: buildIndexCh,
closeChan: make(chan struct{}),
meta: meta,
broker: broker,
cluster: cluster,
alloc: alloc,
sm: sm,
imeta: imeta,
closeChan: make(chan struct{}),
}
}

Expand Down Expand Up @@ -241,64 +238,63 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
}

func (c *importChecker) checkImportingJob(job ImportJob) {
log := log.With(zap.Int64("jobID", job.GetJobID()),
zap.Int64("collectionID", job.GetCollectionID()))
tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID()))
for _, t := range tasks {
if t.GetState() != datapb.ImportTaskStateV2_Completed {
return
}
}

unfinished := make([]int64, 0)
for _, task := range tasks {
segmentIDs := task.(*importTask).GetSegmentIDs()
for _, segmentID := range segmentIDs {
segment := c.meta.GetSegment(segmentID)
if segment == nil {
log.Warn("cannot find segment, may be compacted", WrapTaskLog(task, zap.Int64("segmentID", segmentID))...)
continue
}
if segment.GetIsImporting() {
unfinished = append(unfinished, segmentID)
}
}
}
segmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 {
return t.(*importTask).GetSegmentIDs()
})

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := c.sm.FlushImportSegments(ctx, job.GetCollectionID(), unfinished)
if err != nil {
log.Warn("flush imported segments failed", zap.Int64("jobID", job.GetJobID()),
zap.Int64("collectionID", job.GetCollectionID()), zap.Int64s("segments", unfinished), zap.Error(err))
// Verify completion of index building for imported segments.
unindexed := c.meta.indexMeta.GetUnindexedSegments(job.GetCollectionID(), segmentIDs)
if Params.DataCoordCfg.WaitForIndex.GetAsBool() && len(unindexed) > 0 {
log.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed))
return
}

unfinished := lo.Filter(segmentIDs, func(segmentID int64, _ int) bool {
segment := c.meta.GetSegment(segmentID)
if segment == nil {
log.Warn("cannot find segment, may be compacted", zap.Int64("segmentID", segmentID))
return false
}
return segment.GetIsImporting()
})

channels, err := c.meta.GetSegmentsChannels(unfinished)
if err != nil {
log.Warn("get segments channels failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err))
log.Warn("get segments channels failed", zap.Error(err))
return
}
for _, segmentID := range unfinished {
c.buildIndexCh <- segmentID // accelerate index building
channelCP := c.meta.GetChannelCheckpoint(channels[segmentID])
if channelCP == nil {
log.Warn("nil channel checkpoint", zap.Int64("jobID", job.GetJobID()))
log.Warn("nil channel checkpoint")
return
}
op1 := UpdateStartPosition([]*datapb.SegmentStartPosition{{StartPosition: channelCP, SegmentID: segmentID}})
op2 := UpdateDmlPosition(segmentID, channelCP)
op3 := UpdateIsImporting(segmentID, false)
err = c.meta.UpdateSegmentsInfo(op1, op2, op3)
if err != nil {
log.Warn("update import segment failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err))
log.Warn("update import segment failed", zap.Error(err))
return
}
}

completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00")
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime))
if err != nil {
log.Warn("failed to update job state to Completed", zap.Int64("jobID", job.GetJobID()), zap.Error(err))
log.Warn("failed to update job state to Completed", zap.Error(err))
return
}
log.Info("import job completed")
}

func (c *importChecker) tryFailingTasks(job ImportJob) {
Expand Down
5 changes: 1 addition & 4 deletions internal/datacoord/import_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ func (s *ImportCheckerSuite) SetupTest() {

broker := broker2.NewMockBroker(s.T())
sm := NewMockManager(s.T())
buildIndexCh := make(chan UniqueID, 1024)

checker := NewImportChecker(meta, broker, cluster, alloc, sm, imeta, buildIndexCh).(*importChecker)
checker := NewImportChecker(meta, broker, cluster, alloc, sm, imeta).(*importChecker)
s.checker = checker

job := &importJob{
Expand Down Expand Up @@ -178,8 +177,6 @@ func (s *ImportCheckerSuite) TestCheckJob() {
s.Equal(true, segment.GetIsImporting())
}
}
sm := s.checker.sm.(*MockManager)
sm.EXPECT().FlushImportSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil)
Expand Down
26 changes: 18 additions & 8 deletions internal/datacoord/import_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
Expand All @@ -45,6 +46,8 @@ type importScheduler struct {
alloc allocator
imeta ImportMeta

buildIndexCh chan UniqueID

closeOnce sync.Once
closeChan chan struct{}
}
Expand All @@ -53,13 +56,15 @@ func NewImportScheduler(meta *meta,
cluster Cluster,
alloc allocator,
imeta ImportMeta,
buildIndexCh chan UniqueID,
) ImportScheduler {
return &importScheduler{
meta: meta,
cluster: cluster,
alloc: alloc,
imeta: imeta,
closeChan: make(chan struct{}),
meta: meta,
cluster: cluster,
alloc: alloc,
imeta: imeta,
buildIndexCh: buildIndexCh,
closeChan: make(chan struct{}),
}
}

Expand Down Expand Up @@ -157,7 +162,7 @@ func (s *importScheduler) peekSlots() map[int64]int64 {
}(nodeID)
}
wg.Wait()
log.Info("peek slots done", zap.Any("nodeSlots", nodeSlots))
log.Debug("peek slots done", zap.Any("nodeSlots", nodeSlots))
return nodeSlots
}

Expand Down Expand Up @@ -289,12 +294,17 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
WrapTaskLog(task, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...)
return
}
op := ReplaceBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), nil)
err = s.meta.UpdateSegmentsInfo(op)
op1 := ReplaceBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), nil)
op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed)
err = s.meta.UpdateSegmentsInfo(op1, op2)
if err != nil {
log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.Error(err))...)
return
}
select {
case s.buildIndexCh <- info.GetSegmentID(): // accelerate index building:
default:
}
}
completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00")
err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateCompleteTime(completeTime))
Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/import_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func (s *ImportSchedulerSuite) SetupTest() {
})
s.imeta, err = NewImportMeta(s.catalog)
s.NoError(err)
s.scheduler = NewImportScheduler(s.meta, s.cluster, s.alloc, s.imeta).(*importScheduler)
buildIndexCh := make(chan UniqueID, 1024)
s.scheduler = NewImportScheduler(s.meta, s.cluster, s.alloc, s.imeta, buildIndexCh).(*importScheduler)
}

func (s *ImportSchedulerSuite) TestProcessPreImport() {
Expand Down
13 changes: 9 additions & 4 deletions internal/datacoord/import_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package datacoord

import (
"context"
"fmt"
"path"
"sort"
"time"
Expand Down Expand Up @@ -349,7 +350,7 @@ func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) (float32, i
if totalSegment != 0 {
completedProgress = float32(unsetIsImportingSegment) / float32(totalSegment)
}
return importingProgress*0.8 + completedProgress*0.2, importedRows, totalRows
return importingProgress*0.5 + completedProgress*0.5, importedRows, totalRows
}

func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalpb.ImportJobState, int64, int64, string) {
Expand All @@ -361,11 +362,11 @@ func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalp

case internalpb.ImportJobState_PreImporting:
progress := getPreImportingProgress(jobID, imeta)
return 10 + int64(progress*40), internalpb.ImportJobState_Importing, 0, 0, ""
return 10 + int64(progress*30), internalpb.ImportJobState_Importing, 0, 0, ""

case internalpb.ImportJobState_Importing:
progress, importedRows, totalRows := getImportingProgress(jobID, imeta, meta)
return 10 + 40 + int64(progress*50), internalpb.ImportJobState_Importing, importedRows, totalRows, ""
return 10 + 30 + int64(progress*60), internalpb.ImportJobState_Importing, importedRows, totalRows, ""

case internalpb.ImportJobState_Completed:
totalRows := int64(0)
Expand Down Expand Up @@ -428,9 +429,13 @@ func DropImportTask(task ImportTask, cluster Cluster, tm ImportMeta) error {
}

func ListBinlogsAndGroupBySegment(ctx context.Context, cm storage.ChunkManager, importFile *internalpb.ImportFile) ([]*internalpb.ImportFile, error) {
if len(importFile.GetPaths()) < 1 {
if len(importFile.GetPaths()) == 0 {
return nil, merr.WrapErrImportFailed("no insert binlogs to import")
}
if len(importFile.GetPaths()) > 2 {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("too many input paths for binlog import. "+
"Valid paths length should be one or two, but got paths:%s", importFile.GetPaths()))
}

insertPrefix := importFile.GetPaths()[0]
segmentInsertPaths, _, err := cm.ListWithPrefix(ctx, insertPrefix, false)
Expand Down
75 changes: 45 additions & 30 deletions internal/datacoord/import_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,38 +322,53 @@ func TestImportUtil_ListBinlogsAndGroupBySegment(t *testing.T) {
deltaPrefix = "mock-delta-binlog-prefix"
)

segmentInsertPaths := []string{
// segment 435978159261483008
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008",
// segment 435978159261483009
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009",
}
t.Run("normal case", func(t *testing.T) {
segmentInsertPaths := []string{
// segment 435978159261483008
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483008",
// segment 435978159261483009
"backup/bak1/data/insert_log/435978159196147009/435978159196147010/435978159261483009",
}

segmentDeltaPaths := []string{
"backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483008",
"backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483009",
}
segmentDeltaPaths := []string{
"backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483008",
"backup/bak1/data/delta_log/435978159196147009/435978159196147010/435978159261483009",
}

ctx := context.Background()
cm := mocks2.NewChunkManager(t)
cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil)
cm.EXPECT().ListWithPrefix(mock.Anything, deltaPrefix, mock.Anything).Return(segmentDeltaPaths, nil, nil)
cm := mocks2.NewChunkManager(t)
cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil)
cm.EXPECT().ListWithPrefix(mock.Anything, deltaPrefix, mock.Anything).Return(segmentDeltaPaths, nil, nil)

file := &internalpb.ImportFile{
Id: 1,
Paths: []string{insertPrefix, deltaPrefix},
}
file := &internalpb.ImportFile{
Id: 1,
Paths: []string{insertPrefix, deltaPrefix},
}

files, err := ListBinlogsAndGroupBySegment(ctx, cm, file)
assert.NoError(t, err)
assert.Equal(t, 2, len(files))
for _, f := range files {
assert.Equal(t, 2, len(f.GetPaths()))
for _, p := range f.GetPaths() {
segmentID := path.Base(p)
assert.True(t, segmentID == "435978159261483008" || segmentID == "435978159261483009")
files, err := ListBinlogsAndGroupBySegment(context.Background(), cm, file)
assert.NoError(t, err)
assert.Equal(t, 2, len(files))
for _, f := range files {
assert.Equal(t, 2, len(f.GetPaths()))
for _, p := range f.GetPaths() {
segmentID := path.Base(p)
assert.True(t, segmentID == "435978159261483008" || segmentID == "435978159261483009")
}
}
}
})

t.Run("invalid input", func(t *testing.T) {
file := &internalpb.ImportFile{
Paths: []string{},
}
_, err := ListBinlogsAndGroupBySegment(context.Background(), nil, file)
assert.Error(t, err)
t.Logf("%s", err)

file.Paths = []string{insertPrefix, deltaPrefix, "dummy_prefix"}
_, err = ListBinlogsAndGroupBySegment(context.Background(), nil, file)
assert.Error(t, err)
t.Logf("%s", err)
})
}

func TestImportUtil_GetImportProgress(t *testing.T) {
Expand Down Expand Up @@ -517,15 +532,15 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
assert.NoError(t, err)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta)
assert.Equal(t, int64(10+40), progress)
assert.Equal(t, int64(10+30), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state)
assert.Equal(t, "", reason)

// importing state, segmentImportedRows/totalRows = 0.5
err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing))
assert.NoError(t, err)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta)
assert.Equal(t, int64(10+40+40*0.5), progress)
assert.Equal(t, int64(10+30+30*0.5), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state)
assert.Equal(t, "", reason)

Expand All @@ -547,7 +562,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
err = meta.UpdateSegmentsInfo(UpdateImportedRows(22, 100))
assert.NoError(t, err)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta)
assert.Equal(t, int64(float32(10+40+40+10*2/6)), progress)
assert.Equal(t, int64(float32(10+30+30+30*2/6)), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state)
assert.Equal(t, "", reason)

Expand Down
Loading

0 comments on commit 808a944

Please sign in to comment.