diff --git a/pkg/disttask/framework/planner/plan.go b/pkg/disttask/framework/planner/plan.go index fae9b2b9b9ede..e3d94a7c95a10 100644 --- a/pkg/disttask/framework/planner/plan.go +++ b/pkg/disttask/framework/planner/plan.go @@ -37,6 +37,7 @@ type PlanCtx struct { PreviousSubtaskMetas map[proto.Step][][]byte GlobalSort bool NextTaskStep proto.Step + ExecuteNodesCnt int } // LogicalPlan represents a logical plan in distribute framework. diff --git a/pkg/disttask/importinto/dispatcher.go b/pkg/disttask/importinto/dispatcher.go index b141f1e46c730..ab8fd81bec0ee 100644 --- a/pkg/disttask/importinto/dispatcher.go +++ b/pkg/disttask/importinto/dispatcher.go @@ -299,12 +299,18 @@ func (dsp *ImportDispatcherExt) OnNextSubtasksBatch( return nil, errors.Errorf("unknown step %d", gTask.Step) } + eligibleInstances, err := dsp.GetEligibleInstances(ctx, gTask) + if err != nil { + logger.Warn("failed to get eligible instances", zap.Error(err)) + } + planCtx := planner.PlanCtx{ Ctx: ctx, TaskID: gTask.ID, PreviousSubtaskMetas: previousSubtaskMetas, GlobalSort: dsp.GlobalSort, NextTaskStep: nextStep, + ExecuteNodesCnt: len(eligibleInstances), } logicalPlan := &LogicalPlan{} if err := logicalPlan.FromTaskMeta(gTask.Meta); err != nil { diff --git a/pkg/disttask/importinto/planner.go b/pkg/disttask/importinto/planner.go index 6ea23054b4766..53ffe2e8c3665 100644 --- a/pkg/disttask/importinto/planner.go +++ b/pkg/disttask/importinto/planner.go @@ -109,7 +109,7 @@ func (p *LogicalPlan) ToPhysicalPlan(planCtx planner.PlanCtx) (*planner.Physical // we only generate needed plans for the next step. switch planCtx.NextTaskStep { case StepImport, StepEncodeAndSort: - specs, err := generateImportSpecs(planCtx.Ctx, p) + specs, err := generateImportSpecs(planCtx, p) if err != nil { return nil, err } @@ -249,7 +249,7 @@ func buildController(plan *importer.Plan, stmt string) (*importer.LoadDataContro return controller, nil } -func generateImportSpecs(ctx context.Context, p *LogicalPlan) ([]planner.PipelineSpec, error) { +func generateImportSpecs(pCtx planner.PlanCtx, p *LogicalPlan) ([]planner.PipelineSpec, error) { var chunkMap map[int32][]Chunk if len(p.ChunkMap) > 0 { chunkMap = p.ChunkMap @@ -258,11 +258,12 @@ func generateImportSpecs(ctx context.Context, p *LogicalPlan) ([]planner.Pipelin if err2 != nil { return nil, err2 } - if err2 = controller.InitDataFiles(ctx); err2 != nil { + if err2 = controller.InitDataFiles(pCtx.Ctx); err2 != nil { return nil, err2 } - engineCheckpoints, err2 := controller.PopulateChunks(ctx) + controller.SetExecuteNodeCnt(pCtx.ExecuteNodesCnt) + engineCheckpoints, err2 := controller.PopulateChunks(pCtx.Ctx) if err2 != nil { return nil, err2 } diff --git a/pkg/executor/importer/BUILD.bazel b/pkg/executor/importer/BUILD.bazel index 07f6fb8622569..5e1afc2d8332b 100644 --- a/pkg/executor/importer/BUILD.bazel +++ b/pkg/executor/importer/BUILD.bazel @@ -88,7 +88,7 @@ go_test( embed = [":importer"], flaky = True, race = "on", - shard_count = 19, + shard_count = 20, deps = [ "//br/pkg/errors", "//br/pkg/lightning/backend/encode", diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index 4d4cc844f85f3..2dd931e828658 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -264,6 +264,8 @@ type LoadDataController struct { dataFiles []*mydump.SourceFileMeta // GlobalSortStore is used to store sorted data when using global sort. GlobalSortStore storage.ExternalStorage + // ExecuteNodesCnt is the count of execute nodes. + ExecuteNodesCnt int } func getImportantSysVars(sctx sessionctx.Context) map[string]string { @@ -452,10 +454,11 @@ func NewLoadDataController(plan *Plan, tbl table.Table, astArgs *ASTArgs) (*Load fullTableName := tbl.Meta().Name.String() logger := log.L().With(zap.String("table", fullTableName)) c := &LoadDataController{ - Plan: plan, - ASTArgs: astArgs, - Table: tbl, - logger: logger, + Plan: plan, + ASTArgs: astArgs, + Table: tbl, + logger: logger, + ExecuteNodesCnt: 1, } if err := c.checkFieldParams(); err != nil { return nil, err diff --git a/pkg/executor/importer/table_import.go b/pkg/executor/importer/table_import.go index ace1df765e66c..016c688aeb1f4 100644 --- a/pkg/executor/importer/table_import.go +++ b/pkg/executor/importer/table_import.go @@ -299,12 +299,11 @@ func (ti *TableImporter) getKVEncoder(chunk *checkpoints.ChunkCheckpoint) (KVEnc return NewTableKVEncoder(cfg, ti) } -func (e *LoadDataController) getAdjustedMaxEngineSize() int64 { +func (e *LoadDataController) calculateSubtaskCnt() int { // we want to split data files into subtask of size close to MaxEngineSize to reduce range overlap, // and evenly distribute them to subtasks. - // so we adjust MaxEngineSize to make sure each subtask has a similar amount of data to import. - // we calculate subtask count first by round(TotalFileSize / maxEngineSize), then adjust maxEngineSize - // + // we calculate subtask count first by round(TotalFileSize / maxEngineSize) + // AllocateEngineIDs is using ceil() to calculate subtask count, engine size might be too small in some case, // such as 501G data, maxEngineSize will be about 250G, so we don't relay on it. // see https://github.com/pingcap/tidb/blob/b4183e1dc9bb01fb81d3aa79ca4b5b74387c6c2a/br/pkg/lightning/mydump/region.go#L109 @@ -315,13 +314,33 @@ func (e *LoadDataController) getAdjustedMaxEngineSize() int64 { // [750, 1250) 2 [375, 625) // [1250, 1750) 3 [416, 583) // [1750, 2250) 4 [437, 562) - maxEngineSize := int64(e.MaxEngineSize) + var ( + subtaskCount float64 + maxEngineSize = int64(e.MaxEngineSize) + ) if e.TotalFileSize <= maxEngineSize { - return e.TotalFileSize + subtaskCount = 1 + } else { + subtaskCount = math.Round(float64(e.TotalFileSize) / float64(e.MaxEngineSize)) + } + + // for global sort task, since there is no overlap, + // we make sure subtask count is a multiple of execute nodes count + if e.IsGlobalSort() && e.ExecuteNodesCnt > 0 { + subtaskCount = math.Ceil(subtaskCount/float64(e.ExecuteNodesCnt)) * float64(e.ExecuteNodesCnt) } - subtaskCount := math.Round(float64(e.TotalFileSize) / float64(maxEngineSize)) - adjusted := math.Ceil(float64(e.TotalFileSize) / subtaskCount) - return int64(adjusted) + return int(subtaskCount) +} + +func (e *LoadDataController) getAdjustedMaxEngineSize() int64 { + subtaskCount := e.calculateSubtaskCnt() + // we adjust MaxEngineSize to make sure each subtask has a similar amount of data to import. + return int64(math.Ceil(float64(e.TotalFileSize) / float64(subtaskCount))) +} + +// SetExecuteNodeCnt sets the execute node count. +func (e *LoadDataController) SetExecuteNodeCnt(cnt int) { + e.ExecuteNodesCnt = cnt } // PopulateChunks populates chunks from table regions. diff --git a/pkg/executor/importer/table_import_test.go b/pkg/executor/importer/table_import_test.go index 3fc5688a7d0db..e72b6c009f91c 100644 --- a/pkg/executor/importer/table_import_test.go +++ b/pkg/executor/importer/table_import_test.go @@ -80,29 +80,92 @@ func TestPrepareSortDir(t *testing.T) { require.Nil(t, info) } +func TestCalculateSubtaskCnt(t *testing.T) { + tests := []struct { + totalSize int64 + maxEngineSize config.ByteSize + executeNodeCnt int + cloudStorageURL string + want int + }{ + {1, 500, 0, "", 1}, + {499, 500, 1, "", 1}, + {500, 500, 2, "", 1}, + {749, 500, 3, "", 1}, + {750, 500, 4, "", 2}, + {1249, 500, 5, "", 2}, + {1250, 500, 6, "", 3}, + {100, 30, 7, "", 3}, + + {1, 500, 0, "url", 1}, + {499, 500, 1, "url", 1}, + {500, 500, 2, "url", 2}, + {749, 500, 3, "url", 3}, + {750, 500, 4, "url", 4}, + {1249, 500, 5, "url", 5}, + {1250, 500, 6, "url", 6}, + {100, 30, 2, "url", 4}, + {400, 99, 3, "url", 6}, + {500, 100, 5, "url", 5}, + {500, 200, 5, "url", 5}, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("%d/%d", tt.totalSize, tt.maxEngineSize), func(t *testing.T) { + e := &LoadDataController{ + Plan: &Plan{ + MaxEngineSize: tt.maxEngineSize, + TotalFileSize: tt.totalSize, + CloudStorageURI: tt.cloudStorageURL, + }, + ExecuteNodesCnt: tt.executeNodeCnt, + } + if got := e.calculateSubtaskCnt(); got != tt.want { + t.Errorf("calculateSubtaskCnt() = %v, want %v", got, tt.want) + } + }) + } +} + func TestLoadDataControllerGetAdjustedMaxEngineSize(t *testing.T) { tests := []struct { - totalSize int64 - maxEngineSize config.ByteSize - want int64 + totalSize int64 + maxEngineSize config.ByteSize + executeNodeCnt int + cloudStorageURL string + want int64 }{ - {1, 500, 1}, - {499, 500, 499}, - {500, 500, 500}, - {749, 500, 749}, - {750, 500, 375}, - {1249, 500, 625}, - {1250, 500, 417}, + {1, 500, 0, "", 1}, + {499, 500, 1, "", 499}, + {500, 500, 2, "", 500}, + {749, 500, 3, "", 749}, + {750, 500, 4, "", 375}, + {1249, 500, 5, "", 625}, + {1250, 500, 6, "", 417}, // ceil(100/3) - {100, 30, 34}, + {100, 30, 7, "", 34}, + + {1, 500, 0, "url", 1}, + {499, 500, 1, "url", 499}, + {500, 500, 2, "url", 250}, + {749, 500, 3, "url", 250}, + {750, 500, 4, "url", 188}, + {1249, 500, 5, "url", 250}, + {1250, 500, 6, "url", 209}, + {100, 30, 2, "url", 25}, + {400, 99, 3, "url", 67}, + {500, 100, 5, "url", 100}, + {500, 200, 5, "url", 100}, + {500, 100, 1, "url", 100}, } for _, tt := range tests { t.Run(fmt.Sprintf("%d/%d", tt.totalSize, tt.maxEngineSize), func(t *testing.T) { e := &LoadDataController{ Plan: &Plan{ - MaxEngineSize: tt.maxEngineSize, - TotalFileSize: tt.totalSize, + MaxEngineSize: tt.maxEngineSize, + TotalFileSize: tt.totalSize, + CloudStorageURI: tt.cloudStorageURL, }, + ExecuteNodesCnt: tt.executeNodeCnt, } if got := e.getAdjustedMaxEngineSize(); got != tt.want { t.Errorf("getAdjustedMaxEngineSize() = %v, want %v", got, tt.want)