Skip to content

Commit

Permalink
importinto: calculate subtask count for global sort (#48169) (#48258)
Browse files Browse the repository at this point in the history
ref #47572
  • Loading branch information
ti-chi-bot authored Nov 3, 2023
1 parent d3d5972 commit aa93091
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 31 deletions.
1 change: 1 addition & 0 deletions pkg/disttask/framework/planner/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type PlanCtx struct {
PreviousSubtaskMetas map[proto.Step][][]byte
GlobalSort bool
NextTaskStep proto.Step
ExecuteNodesCnt int
}

// LogicalPlan represents a logical plan in distribute framework.
Expand Down
6 changes: 6 additions & 0 deletions pkg/disttask/importinto/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,18 @@ func (dsp *ImportDispatcherExt) OnNextSubtasksBatch(
return nil, errors.Errorf("unknown step %d", gTask.Step)
}

eligibleInstances, err := dsp.GetEligibleInstances(ctx, gTask)
if err != nil {
logger.Warn("failed to get eligible instances", zap.Error(err))
}

planCtx := planner.PlanCtx{
Ctx: ctx,
TaskID: gTask.ID,
PreviousSubtaskMetas: previousSubtaskMetas,
GlobalSort: dsp.GlobalSort,
NextTaskStep: nextStep,
ExecuteNodesCnt: len(eligibleInstances),
}
logicalPlan := &LogicalPlan{}
if err := logicalPlan.FromTaskMeta(gTask.Meta); err != nil {
Expand Down
9 changes: 5 additions & 4 deletions pkg/disttask/importinto/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (p *LogicalPlan) ToPhysicalPlan(planCtx planner.PlanCtx) (*planner.Physical
// we only generate needed plans for the next step.
switch planCtx.NextTaskStep {
case StepImport, StepEncodeAndSort:
specs, err := generateImportSpecs(planCtx.Ctx, p)
specs, err := generateImportSpecs(planCtx, p)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -249,7 +249,7 @@ func buildController(plan *importer.Plan, stmt string) (*importer.LoadDataContro
return controller, nil
}

func generateImportSpecs(ctx context.Context, p *LogicalPlan) ([]planner.PipelineSpec, error) {
func generateImportSpecs(pCtx planner.PlanCtx, p *LogicalPlan) ([]planner.PipelineSpec, error) {
var chunkMap map[int32][]Chunk
if len(p.ChunkMap) > 0 {
chunkMap = p.ChunkMap
Expand All @@ -258,11 +258,12 @@ func generateImportSpecs(ctx context.Context, p *LogicalPlan) ([]planner.Pipelin
if err2 != nil {
return nil, err2
}
if err2 = controller.InitDataFiles(ctx); err2 != nil {
if err2 = controller.InitDataFiles(pCtx.Ctx); err2 != nil {
return nil, err2
}

engineCheckpoints, err2 := controller.PopulateChunks(ctx)
controller.SetExecuteNodeCnt(pCtx.ExecuteNodesCnt)
engineCheckpoints, err2 := controller.PopulateChunks(pCtx.Ctx)
if err2 != nil {
return nil, err2
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ go_test(
embed = [":importer"],
flaky = True,
race = "on",
shard_count = 19,
shard_count = 20,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/backend/encode",
Expand Down
11 changes: 7 additions & 4 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ type LoadDataController struct {
dataFiles []*mydump.SourceFileMeta
// GlobalSortStore is used to store sorted data when using global sort.
GlobalSortStore storage.ExternalStorage
// ExecuteNodesCnt is the count of execute nodes.
ExecuteNodesCnt int
}

func getImportantSysVars(sctx sessionctx.Context) map[string]string {
Expand Down Expand Up @@ -452,10 +454,11 @@ func NewLoadDataController(plan *Plan, tbl table.Table, astArgs *ASTArgs) (*Load
fullTableName := tbl.Meta().Name.String()
logger := log.L().With(zap.String("table", fullTableName))
c := &LoadDataController{
Plan: plan,
ASTArgs: astArgs,
Table: tbl,
logger: logger,
Plan: plan,
ASTArgs: astArgs,
Table: tbl,
logger: logger,
ExecuteNodesCnt: 1,
}
if err := c.checkFieldParams(); err != nil {
return nil, err
Expand Down
37 changes: 28 additions & 9 deletions pkg/executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,11 @@ func (ti *TableImporter) getKVEncoder(chunk *checkpoints.ChunkCheckpoint) (KVEnc
return NewTableKVEncoder(cfg, ti)
}

func (e *LoadDataController) getAdjustedMaxEngineSize() int64 {
func (e *LoadDataController) calculateSubtaskCnt() int {
// we want to split data files into subtask of size close to MaxEngineSize to reduce range overlap,
// and evenly distribute them to subtasks.
// so we adjust MaxEngineSize to make sure each subtask has a similar amount of data to import.
// we calculate subtask count first by round(TotalFileSize / maxEngineSize), then adjust maxEngineSize
//
// we calculate subtask count first by round(TotalFileSize / maxEngineSize)

// AllocateEngineIDs is using ceil() to calculate subtask count, engine size might be too small in some case,
// such as 501G data, maxEngineSize will be about 250G, so we don't relay on it.
// see https://github.com/pingcap/tidb/blob/b4183e1dc9bb01fb81d3aa79ca4b5b74387c6c2a/br/pkg/lightning/mydump/region.go#L109
Expand All @@ -315,13 +314,33 @@ func (e *LoadDataController) getAdjustedMaxEngineSize() int64 {
// [750, 1250) 2 [375, 625)
// [1250, 1750) 3 [416, 583)
// [1750, 2250) 4 [437, 562)
maxEngineSize := int64(e.MaxEngineSize)
var (
subtaskCount float64
maxEngineSize = int64(e.MaxEngineSize)
)
if e.TotalFileSize <= maxEngineSize {
return e.TotalFileSize
subtaskCount = 1
} else {
subtaskCount = math.Round(float64(e.TotalFileSize) / float64(e.MaxEngineSize))
}

// for global sort task, since there is no overlap,
// we make sure subtask count is a multiple of execute nodes count
if e.IsGlobalSort() && e.ExecuteNodesCnt > 0 {
subtaskCount = math.Ceil(subtaskCount/float64(e.ExecuteNodesCnt)) * float64(e.ExecuteNodesCnt)
}
subtaskCount := math.Round(float64(e.TotalFileSize) / float64(maxEngineSize))
adjusted := math.Ceil(float64(e.TotalFileSize) / subtaskCount)
return int64(adjusted)
return int(subtaskCount)
}

func (e *LoadDataController) getAdjustedMaxEngineSize() int64 {
subtaskCount := e.calculateSubtaskCnt()
// we adjust MaxEngineSize to make sure each subtask has a similar amount of data to import.
return int64(math.Ceil(float64(e.TotalFileSize) / float64(subtaskCount)))
}

// SetExecuteNodeCnt sets the execute node count.
func (e *LoadDataController) SetExecuteNodeCnt(cnt int) {
e.ExecuteNodesCnt = cnt
}

// PopulateChunks populates chunks from table regions.
Expand Down
89 changes: 76 additions & 13 deletions pkg/executor/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,29 +80,92 @@ func TestPrepareSortDir(t *testing.T) {
require.Nil(t, info)
}

func TestCalculateSubtaskCnt(t *testing.T) {
tests := []struct {
totalSize int64
maxEngineSize config.ByteSize
executeNodeCnt int
cloudStorageURL string
want int
}{
{1, 500, 0, "", 1},
{499, 500, 1, "", 1},
{500, 500, 2, "", 1},
{749, 500, 3, "", 1},
{750, 500, 4, "", 2},
{1249, 500, 5, "", 2},
{1250, 500, 6, "", 3},
{100, 30, 7, "", 3},

{1, 500, 0, "url", 1},
{499, 500, 1, "url", 1},
{500, 500, 2, "url", 2},
{749, 500, 3, "url", 3},
{750, 500, 4, "url", 4},
{1249, 500, 5, "url", 5},
{1250, 500, 6, "url", 6},
{100, 30, 2, "url", 4},
{400, 99, 3, "url", 6},
{500, 100, 5, "url", 5},
{500, 200, 5, "url", 5},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%d/%d", tt.totalSize, tt.maxEngineSize), func(t *testing.T) {
e := &LoadDataController{
Plan: &Plan{
MaxEngineSize: tt.maxEngineSize,
TotalFileSize: tt.totalSize,
CloudStorageURI: tt.cloudStorageURL,
},
ExecuteNodesCnt: tt.executeNodeCnt,
}
if got := e.calculateSubtaskCnt(); got != tt.want {
t.Errorf("calculateSubtaskCnt() = %v, want %v", got, tt.want)
}
})
}
}

func TestLoadDataControllerGetAdjustedMaxEngineSize(t *testing.T) {
tests := []struct {
totalSize int64
maxEngineSize config.ByteSize
want int64
totalSize int64
maxEngineSize config.ByteSize
executeNodeCnt int
cloudStorageURL string
want int64
}{
{1, 500, 1},
{499, 500, 499},
{500, 500, 500},
{749, 500, 749},
{750, 500, 375},
{1249, 500, 625},
{1250, 500, 417},
{1, 500, 0, "", 1},
{499, 500, 1, "", 499},
{500, 500, 2, "", 500},
{749, 500, 3, "", 749},
{750, 500, 4, "", 375},
{1249, 500, 5, "", 625},
{1250, 500, 6, "", 417},
// ceil(100/3)
{100, 30, 34},
{100, 30, 7, "", 34},

{1, 500, 0, "url", 1},
{499, 500, 1, "url", 499},
{500, 500, 2, "url", 250},
{749, 500, 3, "url", 250},
{750, 500, 4, "url", 188},
{1249, 500, 5, "url", 250},
{1250, 500, 6, "url", 209},
{100, 30, 2, "url", 25},
{400, 99, 3, "url", 67},
{500, 100, 5, "url", 100},
{500, 200, 5, "url", 100},
{500, 100, 1, "url", 100},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%d/%d", tt.totalSize, tt.maxEngineSize), func(t *testing.T) {
e := &LoadDataController{
Plan: &Plan{
MaxEngineSize: tt.maxEngineSize,
TotalFileSize: tt.totalSize,
MaxEngineSize: tt.maxEngineSize,
TotalFileSize: tt.totalSize,
CloudStorageURI: tt.cloudStorageURL,
},
ExecuteNodesCnt: tt.executeNodeCnt,
}
if got := e.getAdjustedMaxEngineSize(); got != tt.want {
t.Errorf("getAdjustedMaxEngineSize() = %v, want %v", got, tt.want)
Expand Down

0 comments on commit aa93091

Please sign in to comment.