Skip to content

Commit

Permalink
disttask: rename dispatcher to scheduler (#49182)
Browse files Browse the repository at this point in the history
close #49125
  • Loading branch information
okJiang authored Dec 13, 2023
1 parent 9322568 commit 64430c3
Show file tree
Hide file tree
Showing 61 changed files with 1,241 additions and 1,240 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,8 @@ mock_lightning: tools/bin/mockgen

gen_mock: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor TaskTable,Pool,TaskExecutor,Extension > pkg/disttask/framework/mock/task_executor_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/dispatcher Dispatcher,CleanUpRoutine,TaskManager > pkg/disttask/framework/mock/dispatcher_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/dispatcher Extension > pkg/disttask/framework/dispatcher/mock/dispatcher_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Scheduler,CleanUpRoutine,TaskManager > pkg/disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler Extension > pkg/disttask/framework/scheduler/mock/scheduler_mock.go
tools/bin/mockgen -package execute github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute SubtaskExecutor > pkg/disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/importinto MiniTaskExecutor > pkg/disttask/importinto/mock/import_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/planner LogicalPlan,PipelineSpec > pkg/disttask/framework/mock/plan_mock.go
Expand Down
8 changes: 4 additions & 4 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ go_library(
srcs = [
"backfilling.go",
"backfilling_clean_s3.go",
"backfilling_dispatcher.go",
"backfilling_dist_executor.go",
"backfilling_dist_scheduler.go",
"backfilling_import_cloud.go",
"backfilling_merge_sort.go",
"backfilling_operators.go",
Expand Down Expand Up @@ -78,9 +78,9 @@ go_library(
"//pkg/ddl/syncer",
"//pkg/ddl/util",
"//pkg/distsql",
"//pkg/disttask/framework/dispatcher",
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/taskexecutor",
"//pkg/disttask/framework/taskexecutor/execute",
Expand Down Expand Up @@ -183,7 +183,7 @@ go_test(
timeout = "moderate",
srcs = [
"attributes_sql_test.go",
"backfilling_dispatcher_test.go",
"backfilling_dist_scheduler_test.go",
"backfilling_test.go",
"bench_test.go",
"cancel_test.go",
Expand Down Expand Up @@ -253,8 +253,8 @@ go_test(
"//pkg/ddl/testutil",
"//pkg/ddl/util",
"//pkg/ddl/util/callback",
"//pkg/disttask/framework/dispatcher",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",
"//pkg/domain",
"//pkg/domain/infosync",
Expand Down
8 changes: 4 additions & 4 deletions pkg/ddl/backfilling_clean_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ import (

"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/dispatcher"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

var _ dispatcher.CleanUpRoutine = (*BackfillCleanUpS3)(nil)
var _ scheduler.CleanUpRoutine = (*BackfillCleanUpS3)(nil)

// BackfillCleanUpS3 implements dispatcher.CleanUpRoutine.
// BackfillCleanUpS3 implements scheduler.CleanUpRoutine.
type BackfillCleanUpS3 struct {
}

func newBackfillCleanUpS3() dispatcher.CleanUpRoutine {
func newBackfillCleanUpS3() scheduler.CleanUpRoutine {
return &BackfillCleanUpS3{}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/disttask/framework/dispatcher"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
Expand All @@ -42,33 +42,33 @@ import (
"go.uber.org/zap"
)

// BackfillingDispatcherExt is an extension of litBackfillDispatcher, exported for test.
type BackfillingDispatcherExt struct {
// BackfillingSchedulerExt is an extension of litBackfillScheduler, exported for test.
type BackfillingSchedulerExt struct {
d *ddl
GlobalSort bool
}

// NewBackfillingDispatcherExt creates a new backfillingDispatcherExt, only used for test now.
func NewBackfillingDispatcherExt(d DDL) (dispatcher.Extension, error) {
// NewBackfillingSchedulerExt creates a new backfillingSchedulerExt, only used for test now.
func NewBackfillingSchedulerExt(d DDL) (scheduler.Extension, error) {
ddl, ok := d.(*ddl)
if !ok {
return nil, errors.New("The getDDL result should be the type of *ddl")
}
return &BackfillingDispatcherExt{
return &BackfillingSchedulerExt{
d: ddl,
}, nil
}

var _ dispatcher.Extension = (*BackfillingDispatcherExt)(nil)
var _ scheduler.Extension = (*BackfillingSchedulerExt)(nil)

// OnTick implements dispatcher.Extension interface.
func (*BackfillingDispatcherExt) OnTick(_ context.Context, _ *proto.Task) {
// OnTick implements scheduler.Extension interface.
func (*BackfillingSchedulerExt) OnTick(_ context.Context, _ *proto.Task) {
}

// OnNextSubtasksBatch generate batch of next step's plan.
func (dsp *BackfillingDispatcherExt) OnNextSubtasksBatch(
func (sch *BackfillingSchedulerExt) OnNextSubtasksBatch(
ctx context.Context,
taskHandle dispatcher.TaskHandle,
taskHandle scheduler.TaskHandle,
task *proto.Task,
serverInfo []*infosync.ServerInfo,
nextStep proto.Step,
Expand All @@ -84,7 +84,7 @@ func (dsp *BackfillingDispatcherExt) OnNextSubtasksBatch(
return nil, err
}
job := &backfillMeta.Job
tblInfo, err := getTblInfo(dsp.d, job)
tblInfo, err := getTblInfo(sch.d, job)
if err != nil {
return nil, err
}
Expand All @@ -96,7 +96,7 @@ func (dsp *BackfillingDispatcherExt) OnNextSubtasksBatch(
if tblInfo.Partition != nil {
return generatePartitionPlan(tblInfo)
}
return generateNonPartitionPlan(dsp.d, tblInfo, job, dsp.GlobalSort, len(serverInfo))
return generateNonPartitionPlan(sch.d, tblInfo, job, sch.GlobalSort, len(serverInfo))
case StepMergeSort:
res, err := generateMergePlan(taskHandle, task, logger)
if err != nil {
Expand All @@ -110,7 +110,7 @@ func (dsp *BackfillingDispatcherExt) OnNextSubtasksBatch(
}
return res, nil
case StepWriteAndIngest:
if dsp.GlobalSort {
if sch.GlobalSort {
prevStep := StepReadIndex
if backfillMeta.UseMergeSort {
prevStep = StepMergeSort
Expand Down Expand Up @@ -149,13 +149,13 @@ func updateMeta(task *proto.Task, taskMeta *BackfillTaskMeta) error {
return nil
}

// GetNextStep implements dispatcher.Extension interface.
func (dsp *BackfillingDispatcherExt) GetNextStep(task *proto.Task) proto.Step {
// GetNextStep implements scheduler.Extension interface.
func (sch *BackfillingSchedulerExt) GetNextStep(task *proto.Task) proto.Step {
switch task.Step {
case proto.StepInit:
return StepReadIndex
case StepReadIndex:
if dsp.GlobalSort {
if sch.GlobalSort {
return StepMergeSort
}
return proto.StepDone
Expand All @@ -175,55 +175,55 @@ func skipMergeSort(stats []external.MultipleFilesStat) bool {
return external.GetMaxOverlappingTotal(stats) <= external.MergeSortOverlapThreshold
}

// OnDone implements dispatcher.Extension interface.
func (*BackfillingDispatcherExt) OnDone(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task) error {
// OnDone implements scheduler.Extension interface.
func (*BackfillingSchedulerExt) OnDone(_ context.Context, _ scheduler.TaskHandle, _ *proto.Task) error {
return nil
}

// GetEligibleInstances implements dispatcher.Extension interface.
func (*BackfillingDispatcherExt) GetEligibleInstances(ctx context.Context, _ *proto.Task) ([]*infosync.ServerInfo, bool, error) {
serverInfos, err := dispatcher.GenerateTaskExecutorNodes(ctx)
// GetEligibleInstances implements scheduler.Extension interface.
func (*BackfillingSchedulerExt) GetEligibleInstances(ctx context.Context, _ *proto.Task) ([]*infosync.ServerInfo, bool, error) {
serverInfos, err := scheduler.GenerateTaskExecutorNodes(ctx)
if err != nil {
return nil, true, err
}
return serverInfos, true, nil
}

// IsRetryableErr implements dispatcher.Extension.IsRetryableErr interface.
func (*BackfillingDispatcherExt) IsRetryableErr(error) bool {
// IsRetryableErr implements scheduler.Extension.IsRetryableErr interface.
func (*BackfillingSchedulerExt) IsRetryableErr(error) bool {
return true
}

// LitBackfillDispatcher wraps BaseDispatcher.
type LitBackfillDispatcher struct {
*dispatcher.BaseDispatcher
// LitBackfillScheduler wraps BaseScheduler.
type LitBackfillScheduler struct {
*scheduler.BaseScheduler
d *ddl
}

func newLitBackfillDispatcher(ctx context.Context, d *ddl, taskMgr dispatcher.TaskManager,
serverID string, task *proto.Task) dispatcher.Dispatcher {
dsp := LitBackfillDispatcher{
d: d,
BaseDispatcher: dispatcher.NewBaseDispatcher(ctx, taskMgr, serverID, task),
func newLitBackfillScheduler(ctx context.Context, d *ddl, taskMgr scheduler.TaskManager,
serverID string, task *proto.Task) scheduler.Scheduler {
sch := LitBackfillScheduler{
d: d,
BaseScheduler: scheduler.NewBaseScheduler(ctx, taskMgr, serverID, task),
}
return &dsp
return &sch
}

// Init implements BaseDispatcher interface.
func (dsp *LitBackfillDispatcher) Init() (err error) {
// Init implements BaseScheduler interface.
func (sch *LitBackfillScheduler) Init() (err error) {
taskMeta := &BackfillTaskMeta{}
if err = json.Unmarshal(dsp.BaseDispatcher.Task.Meta, taskMeta); err != nil {
if err = json.Unmarshal(sch.BaseScheduler.Task.Meta, taskMeta); err != nil {
return errors.Annotate(err, "unmarshal task meta failed")
}
dsp.BaseDispatcher.Extension = &BackfillingDispatcherExt{
d: dsp.d,
sch.BaseScheduler.Extension = &BackfillingSchedulerExt{
d: sch.d,
GlobalSort: len(taskMeta.CloudStorageURI) > 0}
return dsp.BaseDispatcher.Init()
return sch.BaseScheduler.Init()
}

// Close implements BaseDispatcher interface.
func (dsp *LitBackfillDispatcher) Close() {
dsp.BaseDispatcher.Close()
// Close implements BaseScheduler interface.
func (sch *LitBackfillScheduler) Close() {
sch.BaseScheduler.Close()
}

func getTblInfo(d *ddl, job *model.Job) (tblInfo *model.TableInfo, err error) {
Expand Down Expand Up @@ -338,7 +338,7 @@ func calculateRegionBatch(totalRegionCnt int, instanceCnt int, useLocalDisk bool

func generateGlobalSortIngestPlan(
ctx context.Context,
taskHandle dispatcher.TaskHandle,
taskHandle scheduler.TaskHandle,
task *proto.Task,
jobID int64,
cloudStorageURI string,
Expand All @@ -349,7 +349,7 @@ func generateGlobalSortIngestPlan(
if err != nil {
return nil, err
}
instanceIDs, err := dispatcher.GenerateTaskExecutorNodes(ctx)
instanceIDs, err := scheduler.GenerateTaskExecutorNodes(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -409,7 +409,7 @@ func generateGlobalSortIngestPlan(
}

func generateMergePlan(
taskHandle dispatcher.TaskHandle,
taskHandle scheduler.TaskHandle,
task *proto.Task,
logger *zap.Logger,
) ([][]byte, error) {
Expand Down Expand Up @@ -502,7 +502,7 @@ func getRangeSplitter(
}

func getSummaryFromLastStep(
taskHandle dispatcher.TaskHandle,
taskHandle scheduler.TaskHandle,
gTaskID int64,
step proto.Step,
) (startKey, endKey kv.Key, totalKVSize uint64, dataFiles, statFiles []string, err error) {
Expand Down
Loading

0 comments on commit 64430c3

Please sign in to comment.