diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 9cd864171f079..383611545501f 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -16,7 +16,6 @@ go_library( "backfilling_dispatcher.go", "backfilling_dist_scheduler.go", "backfilling_import_cloud.go", - "backfilling_import_local.go", "backfilling_merge_sort.go", "backfilling_operators.go", "backfilling_proto.go", @@ -130,7 +129,6 @@ go_library( "//pkg/util/collate", "//pkg/util/dbterror", "//pkg/util/dbterror/exeerrors", - "//pkg/util/disttask", "//pkg/util/domainutil", "//pkg/util/filter", "//pkg/util/gcutil", diff --git a/pkg/ddl/backfilling_dispatcher.go b/pkg/ddl/backfilling_dispatcher.go index cb235c2ceb045..d80953b792cb4 100644 --- a/pkg/ddl/backfilling_dispatcher.go +++ b/pkg/ddl/backfilling_dispatcher.go @@ -34,10 +34,9 @@ import ( "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/table" - disttaskutil "github.com/pingcap/tidb/pkg/util/disttask" - "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" @@ -45,9 +44,8 @@ import ( // BackfillingDispatcherExt is an extension of litBackfillDispatcher, exported for test. type BackfillingDispatcherExt struct { - d *ddl - previousSchedulerIDs []string - GlobalSort bool + d *ddl + GlobalSort bool } // NewBackfillingDispatcherExt creates a new backfillingDispatcherExt, only used for test now. @@ -97,7 +95,12 @@ func (dsp *BackfillingDispatcherExt) OnNextSubtasksBatch( if tblInfo.Partition != nil { return generatePartitionPlan(tblInfo) } - return generateNonPartitionPlan(dsp.d, tblInfo, job) + is, err := dsp.GetEligibleInstances(ctx, gTask) + if err != nil { + return nil, err + } + instanceCnt := len(is) + return generateNonPartitionPlan(dsp.d, tblInfo, job, dsp.GlobalSort, instanceCnt) case StepMergeSort: res, err := generateMergePlan(taskHandle, gTask, logger) if err != nil { @@ -135,11 +138,7 @@ func (dsp *BackfillingDispatcherExt) OnNextSubtasksBatch( prevStep, logger) } - // for partition table, no subtasks for write and ingest step. - if tblInfo.Partition != nil { - return nil, nil - } - return generateIngestTaskPlan(ctx, dsp, taskHandle, gTask) + return nil, nil default: return nil, nil } @@ -163,7 +162,7 @@ func (dsp *BackfillingDispatcherExt) GetNextStep(task *proto.Task) proto.Step { if dsp.GlobalSort { return StepMergeSort } - return StepWriteAndIngest + return proto.StepDone case StepMergeSort: return StepWriteAndIngest case StepWriteAndIngest: @@ -196,21 +195,11 @@ func (*BackfillingDispatcherExt) OnErrStage(_ context.Context, _ dispatcher.Task } // GetEligibleInstances implements dispatcher.Extension interface. -func (dsp *BackfillingDispatcherExt) GetEligibleInstances(ctx context.Context, _ *proto.Task) ([]*infosync.ServerInfo, error) { +func (*BackfillingDispatcherExt) GetEligibleInstances(ctx context.Context, _ *proto.Task) ([]*infosync.ServerInfo, error) { serverInfos, err := dispatcher.GenerateSchedulerNodes(ctx) if err != nil { return nil, err } - if len(dsp.previousSchedulerIDs) > 0 { - // Only the nodes that executed step one can have step two. - involvedServerInfos := make([]*infosync.ServerInfo, 0, len(serverInfos)) - for _, id := range dsp.previousSchedulerIDs { - if idx := disttaskutil.FindServerInfo(serverInfos, id); idx >= 0 { - involvedServerInfos = append(involvedServerInfos, serverInfos[idx]) - } - } - return involvedServerInfos, nil - } return serverInfos, nil } @@ -286,7 +275,8 @@ func generatePartitionPlan(tblInfo *model.TableInfo) (metas [][]byte, err error) return subTaskMetas, nil } -func generateNonPartitionPlan(d *ddl, tblInfo *model.TableInfo, job *model.Job) (metas [][]byte, err error) { +func generateNonPartitionPlan( + d *ddl, tblInfo *model.TableInfo, job *model.Job, useCloud bool, instanceCnt int) (metas [][]byte, err error) { tbl, err := getTable(d.store, job.SchemaID, tblInfo) if err != nil { return nil, err @@ -309,8 +299,15 @@ func generateNonPartitionPlan(d *ddl, tblInfo *model.TableInfo, job *model.Job) return nil, err } - subTaskMetas := make([][]byte, 0, 100) - regionBatch := 20 + regionBatch := 100 + if !useCloud { + // Make subtask large enough to reduce the overhead of local/global flush. + quota := variable.DDLDiskQuota.Load() + regionBatch = int(int64(quota) / int64(config.SplitRegionSize)) + } + regionBatch = min(regionBatch, len(recordRegionMetas)/instanceCnt) + + subTaskMetas := make([][]byte, 0, 4) sort.Slice(recordRegionMetas, func(i, j int) bool { return bytes.Compare(recordRegionMetas[i].StartKey(), recordRegionMetas[j].StartKey()) < 0 }) @@ -341,42 +338,6 @@ func generateNonPartitionPlan(d *ddl, tblInfo *model.TableInfo, job *model.Job) return subTaskMetas, nil } -func generateIngestTaskPlan( - ctx context.Context, - h *BackfillingDispatcherExt, - taskHandle dispatcher.TaskHandle, - gTask *proto.Task, -) ([][]byte, error) { - // We dispatch dummy subtasks because the rest data in local engine will be imported - // in the initialization of subtask executor. - var ingestSubtaskCnt int - if intest.InTest && taskHandle == nil { - serverNodes, err := dispatcher.GenerateSchedulerNodes(ctx) - if err != nil { - return nil, err - } - ingestSubtaskCnt = len(serverNodes) - } else { - schedulerIDs, err := taskHandle.GetPreviousSchedulerIDs(ctx, gTask.ID, gTask.Step) - if err != nil { - return nil, err - } - h.previousSchedulerIDs = schedulerIDs - ingestSubtaskCnt = len(schedulerIDs) - } - - subTaskMetas := make([][]byte, 0, ingestSubtaskCnt) - dummyMeta := &BackfillSubTaskMeta{} - metaBytes, err := json.Marshal(dummyMeta) - if err != nil { - return nil, err - } - for i := 0; i < ingestSubtaskCnt; i++ { - subTaskMetas = append(subTaskMetas, metaBytes) - } - return subTaskMetas, nil -} - func generateGlobalSortIngestPlan( ctx context.Context, taskHandle dispatcher.TaskHandle, diff --git a/pkg/ddl/backfilling_dispatcher_test.go b/pkg/ddl/backfilling_dispatcher_test.go index 8093e6a52f610..87ed8bc88fac0 100644 --- a/pkg/ddl/backfilling_dispatcher_test.go +++ b/pkg/ddl/backfilling_dispatcher_test.go @@ -80,12 +80,6 @@ func TestBackfillingDispatcherLocalMode(t *testing.T) { // 1.2 test partition table OnNextSubtasksBatch after StepReadIndex gTask.State = proto.TaskStateRunning gTask.Step = dsp.GetNextStep(gTask) - require.Equal(t, ddl.StepWriteAndIngest, gTask.Step) - // for partition table, we will not generate subtask for StepWriteAndIngest. - metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step) - require.NoError(t, err) - require.Len(t, metas, 0) - gTask.Step = dsp.GetNextStep(gTask) require.Equal(t, proto.StepDone, gTask.Step) metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step) require.NoError(t, err) @@ -123,11 +117,6 @@ func TestBackfillingDispatcherLocalMode(t *testing.T) { // 2.2.2 StepReadIndex gTask.State = proto.TaskStateRunning gTask.Step = dsp.GetNextStep(gTask) - require.Equal(t, ddl.StepWriteAndIngest, gTask.Step) - metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step) - require.NoError(t, err) - require.Equal(t, 1, len(metas)) - gTask.Step = dsp.GetNextStep(gTask) require.Equal(t, proto.StepDone, gTask.Step) metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step) require.NoError(t, err) @@ -267,7 +256,7 @@ func TestGetNextStep(t *testing.T) { ext := &ddl.BackfillingDispatcherExt{} // 1. local mode - for _, nextStep := range []proto.Step{ddl.StepReadIndex, ddl.StepWriteAndIngest} { + for _, nextStep := range []proto.Step{ddl.StepReadIndex, proto.StepDone} { require.Equal(t, nextStep, ext.GetNextStep(task)) task.Step = nextStep } diff --git a/pkg/ddl/backfilling_dist_scheduler.go b/pkg/ddl/backfilling_dist_scheduler.go index afe756be494d3..8971cb5440c09 100644 --- a/pkg/ddl/backfilling_dist_scheduler.go +++ b/pkg/ddl/backfilling_dist_scheduler.go @@ -93,7 +93,7 @@ func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl, if len(bgm.CloudStorageURI) > 0 { return newCloudImportExecutor(&bgm.Job, jobMeta.ID, indexInfos[0], tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI) } - return newImportFromLocalStepExecutor(jobMeta.ID, indexInfos, tbl.(table.PhysicalTable), bc), nil + return nil, errors.Errorf("local import does not have write & ingest step") default: return nil, errors.Errorf("unknown step %d for job %d", stage, jobMeta.ID) } diff --git a/pkg/ddl/backfilling_import_local.go b/pkg/ddl/backfilling_import_local.go deleted file mode 100644 index 032f249261f94..0000000000000 --- a/pkg/ddl/backfilling_import_local.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ddl - -import ( - "context" - - "github.com/pingcap/tidb/br/pkg/lightning/common" - "github.com/pingcap/tidb/pkg/ddl/ingest" - "github.com/pingcap/tidb/pkg/disttask/framework/proto" - "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/table" - "github.com/pingcap/tidb/pkg/util/logutil" -) - -type localImportExecutor struct { - jobID int64 - indexes []*model.IndexInfo - ptbl table.PhysicalTable - bc ingest.BackendCtx -} - -func newImportFromLocalStepExecutor( - jobID int64, - indexes []*model.IndexInfo, - ptbl table.PhysicalTable, - bc ingest.BackendCtx, -) *localImportExecutor { - return &localImportExecutor{ - jobID: jobID, - indexes: indexes, - ptbl: ptbl, - bc: bc, - } -} - -func (i *localImportExecutor) Init(ctx context.Context) error { - logutil.Logger(ctx).Info("local import executor init subtask exec env") - for _, index := range i.indexes { - _, _, err := i.bc.Flush(index.ID, ingest.FlushModeForceGlobal) - if err != nil { - if common.ErrFoundDuplicateKeys.Equal(err) { - err = convertToKeyExistsErr(err, index, i.ptbl.Meta()) - return err - } - } - } - return nil -} - -func (*localImportExecutor) RunSubtask(ctx context.Context, _ *proto.Subtask) error { - logutil.Logger(ctx).Info("local import executor run subtask") - return nil -} - -func (*localImportExecutor) Cleanup(ctx context.Context) error { - logutil.Logger(ctx).Info("local import executor cleanup subtask exec env") - return nil -} - -func (*localImportExecutor) OnFinished(ctx context.Context, _ *proto.Subtask) error { - logutil.Logger(ctx).Info("local import executor finish subtask") - return nil -} - -func (*localImportExecutor) Rollback(ctx context.Context) error { - logutil.Logger(ctx).Info("local import executor rollback subtask") - return nil -} diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index f900cc229dccb..694c71997fb4b 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -734,13 +734,9 @@ func (s *indexWriteResultSink) flush() error { failpoint.Inject("mockFlushError", func(_ failpoint.Value) { failpoint.Return(errors.New("mock flush error")) }) - flushMode := ingest.FlushModeForceLocalAndCheckDiskQuota - if s.tbl.GetPartitionedTable() != nil { - flushMode = ingest.FlushModeForceGlobal - } for _, index := range s.indexes { idxInfo := index.Meta() - _, _, err := s.backendCtx.Flush(idxInfo.ID, flushMode) + _, _, err := s.backendCtx.Flush(idxInfo.ID, ingest.FlushModeForceGlobal) if err != nil { if common.ErrFoundDuplicateKeys.Equal(err) { err = convertToKeyExistsErr(err, idxInfo, s.tbl.Meta()) diff --git a/pkg/ddl/backfilling_proto.go b/pkg/ddl/backfilling_proto.go index 79e3d800065f1..488124ba62ffe 100644 --- a/pkg/ddl/backfilling_proto.go +++ b/pkg/ddl/backfilling_proto.go @@ -20,7 +20,7 @@ import "github.com/pingcap/tidb/pkg/disttask/framework/proto" // the initial step is StepInit(-1) // steps are processed in the following order: // - local sort: -// StepInit -> StepReadIndex -> StepWriteAndIngest -> StepDone +// StepInit -> StepReadIndex -> StepDone // - global sort: // StepInit -> StepReadIndex -> StepMergeSort -> StepWriteAndIngest -> StepDone const (