Skip to content

Commit

Permalink
disttask: define TaskType/TaskState/Step instead of string/int64 (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
okJiang authored Oct 16, 2023
1 parent 476d9b3 commit b3dcf59
Show file tree
Hide file tree
Showing 38 changed files with 242 additions and 222 deletions.
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (h *backfillingDispatcherExt) OnNextSubtasksBatch(
ctx context.Context,
taskHandle dispatcher.TaskHandle,
gTask *proto.Task,
step int64,
step proto.Step,
) (taskMeta [][]byte, err error) {
var gTaskMeta BackfillGlobalMeta
if err := json.Unmarshal(gTask.Meta, &gTaskMeta); err != nil {
Expand Down Expand Up @@ -109,7 +109,7 @@ func (h *backfillingDispatcherExt) OnNextSubtasksBatch(
func (*backfillingDispatcherExt) GetNextStep(
taskHandle dispatcher.TaskHandle,
task *proto.Task,
) int64 {
) proto.Step {
switch task.Step {
case proto.StepInit:
return proto.StepOne
Expand Down
8 changes: 4 additions & 4 deletions pkg/ddl/backfilling_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestBackfillingDispatcher(t *testing.T) {
"PARTITION p1 VALUES LESS THAN (100),\n" +
"PARTITION p2 VALUES LESS THAN (1000),\n" +
"PARTITION p3 VALUES LESS THAN MAXVALUE\n);")
gTask := createAddIndexGlobalTask(t, dom, "test", "tp1", ddl.BackfillTaskType)
gTask := createAddIndexGlobalTask(t, dom, "test", "tp1", proto.Backfill)
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp1"))
require.NoError(t, err)
tblInfo := tbl.Meta()
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestBackfillingDispatcher(t *testing.T) {
/// 2. test non partition table.
// 2.1 empty table
tk.MustExec("create table t1(id int primary key, v int)")
gTask = createAddIndexGlobalTask(t, dom, "test", "t1", ddl.BackfillTaskType)
gTask = createAddIndexGlobalTask(t, dom, "test", "t1", proto.Backfill)
metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step)
require.NoError(t, err)
require.Equal(t, 0, len(metas))
Expand All @@ -97,7 +97,7 @@ func TestBackfillingDispatcher(t *testing.T) {
tk.MustExec("insert into t2 values (), (), (), (), (), ()")
tk.MustExec("insert into t2 values (), (), (), (), (), ()")
tk.MustExec("insert into t2 values (), (), (), (), (), ()")
gTask = createAddIndexGlobalTask(t, dom, "test", "t2", ddl.BackfillTaskType)
gTask = createAddIndexGlobalTask(t, dom, "test", "t2", proto.Backfill)
// 2.2.1 stepInit
gTask.Step = dsp.GetNextStep(nil, gTask)
metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step)
Expand All @@ -118,7 +118,7 @@ func TestBackfillingDispatcher(t *testing.T) {
require.Equal(t, 0, len(metas))
}

func createAddIndexGlobalTask(t *testing.T, dom *domain.Domain, dbName, tblName string, taskType string) *proto.Task {
func createAddIndexGlobalTask(t *testing.T, dom *domain.Domain, dbName, tblName string, taskType proto.TaskType) *proto.Task {
db, ok := dom.InfoSchema().SchemaByName(model.NewCIStr(dbName))
require.True(t, ok)
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr(dbName), model.NewCIStr(tblName))
Expand Down
5 changes: 1 addition & 4 deletions pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type BackfillSubTaskMeta struct {

// NewBackfillSubtaskExecutor creates a new backfill subtask executor.
func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl,
bc ingest.BackendCtx, stage int64, summary *execute.Summary) (execute.SubtaskExecutor, error) {
bc ingest.BackendCtx, stage proto.Step, summary *execute.Summary) (execute.SubtaskExecutor, error) {
bgm := &BackfillGlobalMeta{}
err := json.Unmarshal(taskMeta, bgm)
if err != nil {
Expand Down Expand Up @@ -96,9 +96,6 @@ func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl,
}
}

// BackfillTaskType is the type of backfill task.
const BackfillTaskType = "backfill"

type backfillDistScheduler struct {
*scheduler.BaseScheduler
d *ddl
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
ddlJobCh: make(chan struct{}, 100),
}

scheduler.RegisterTaskType(BackfillTaskType,
scheduler.RegisterTaskType(proto.Backfill,
func(ctx context.Context, id string, task *proto.Task, taskTable scheduler.TaskTable) scheduler.Scheduler {
return newBackfillDistScheduler(ctx, id, task, taskTable, d)
}, scheduler.WithSummary,
Expand All @@ -702,11 +702,11 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
if err != nil {
logutil.BgLogger().Warn("NewBackfillingDispatcherExt failed", zap.String("category", "ddl"), zap.Error(err))
} else {
dispatcher.RegisterDispatcherFactory(BackfillTaskType,
dispatcher.RegisterDispatcherFactory(proto.Backfill,
func(ctx context.Context, taskMgr *storage.TaskManager, serverID string, task *proto.Task) dispatcher.Dispatcher {
return newLitBackfillDispatcher(ctx, taskMgr, serverID, task, backFillDsp)
})
dispatcher.RegisterDispatcherCleanUpFactory(BackfillTaskType, newBackfillCleanUpS3)
dispatcher.RegisterDispatcherCleanUpFactory(proto.Backfill, newBackfillCleanUpS3)
}

// Register functions for enable/disable ddl when changing system variable `tidb_enable_ddl`.
Expand Down
44 changes: 23 additions & 21 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,32 +382,34 @@ func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error {
startTS = txn.StartTS()
return nil
})
if err == nil {
jobTasks := make([]*model.Job, 0, len(tasks))
for i, task := range tasks {
job := task.job
job.Version = currentVersion
job.StartTS = startTS
job.ID = ids[i]
setJobStateToQueueing(job)
if err != nil {
return errors.Trace(err)
}

if d.stateSyncer.IsUpgradingState() && !hasSysDB(job) {
if err = pauseRunningJob(sess.NewSession(se), job, model.AdminCommandBySystem); err != nil {
logutil.BgLogger().Warn("pause user DDL by system failed", zap.String("category", "ddl-upgrading"), zap.Stringer("job", job), zap.Error(err))
task.cacheErr = err
continue
}
logutil.BgLogger().Info("pause user DDL by system successful", zap.String("category", "ddl-upgrading"), zap.Stringer("job", job))
}
jobTasks := make([]*model.Job, 0, len(tasks))
for i, task := range tasks {
job := task.job
job.Version = currentVersion
job.StartTS = startTS
job.ID = ids[i]
setJobStateToQueueing(job)

jobTasks = append(jobTasks, job)
injectModifyJobArgFailPoint(job)
if d.stateSyncer.IsUpgradingState() && !hasSysDB(job) {
if err = pauseRunningJob(sess.NewSession(se), job, model.AdminCommandBySystem); err != nil {
logutil.BgLogger().Warn("pause user DDL by system failed", zap.String("category", "ddl-upgrading"), zap.Stringer("job", job), zap.Error(err))
task.cacheErr = err
continue
}
logutil.BgLogger().Info("pause user DDL by system successful", zap.String("category", "ddl-upgrading"), zap.Stringer("job", job))
}

se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
err = insertDDLJobs2Table(sess.NewSession(se), true, jobTasks...)
jobTasks = append(jobTasks, job)
injectModifyJobArgFailPoint(job)
}
return errors.Trace(err)

se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)

return errors.Trace(insertDDLJobs2Table(sess.NewSession(se), true, jobTasks...))
}

func injectFailPointForGetJob(job *model.Job) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2049,7 +2049,7 @@ func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error {
return errors.New("do not support merge index")
}

taskType := BackfillTaskType
taskType := proto.Backfill
taskKey := fmt.Sprintf("ddl/%s/%d", taskType, reorgInfo.Job.ID)
g, ctx := errgroup.WithContext(context.Background())
done := make(chan struct{})
Expand Down
58 changes: 29 additions & 29 deletions pkg/disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ var (
// Then we can use dispatcher's function in Dispatcher interface.
type TaskHandle interface {
// GetPreviousSchedulerIDs gets previous scheduler IDs.
GetPreviousSchedulerIDs(_ context.Context, taskID int64, step int64) ([]string, error)
GetPreviousSchedulerIDs(_ context.Context, taskID int64, step proto.Step) ([]string, error)
// GetPreviousSubtaskMetas gets previous subtask metas.
GetPreviousSubtaskMetas(taskID int64, step int64) ([][]byte, error)
GetPreviousSubtaskMetas(taskID int64, step proto.Step) ([][]byte, error)
storage.SessionExecutor
}

Expand Down Expand Up @@ -105,7 +105,7 @@ var MockOwnerChange func()
// NewBaseDispatcher creates a new BaseDispatcher.
func NewBaseDispatcher(ctx context.Context, taskMgr *storage.TaskManager, serverID string, task *proto.Task) *BaseDispatcher {
logCtx := logutil.WithFields(context.Background(), zap.Int64("task-id", task.ID),
zap.String("task-type", task.Type))
zap.Stringer("task-type", task.Type))
return &BaseDispatcher{
ctx: ctx,
taskMgr: taskMgr,
Expand All @@ -128,7 +128,7 @@ func (*BaseDispatcher) Init() error {
// ExecuteTask implements the Dispatcher interface.
func (d *BaseDispatcher) ExecuteTask() {
logutil.Logger(d.logCtx).Info("execute one task",
zap.String("state", d.Task.State), zap.Uint64("concurrency", d.Task.Concurrency))
zap.Stringer("state", d.Task.State), zap.Uint64("concurrency", d.Task.Concurrency))
d.scheduleTask()
}

Expand Down Expand Up @@ -214,7 +214,7 @@ func (d *BaseDispatcher) scheduleTask() {
err = d.onRunning()
case proto.TaskStateSucceed, proto.TaskStateReverted, proto.TaskStateFailed:
if err := d.onFinished(); err != nil {
logutil.Logger(d.logCtx).Error("schedule task meet error", zap.String("state", d.Task.State), zap.Error(err))
logutil.Logger(d.logCtx).Error("schedule task meet error", zap.Stringer("state", d.Task.State), zap.Error(err))
}
return
}
Expand All @@ -235,14 +235,14 @@ func (d *BaseDispatcher) scheduleTask() {

// handle task in cancelling state, dispatch revert subtasks.
func (d *BaseDispatcher) onCancelling() error {
logutil.Logger(d.logCtx).Info("on cancelling state", zap.String("state", d.Task.State), zap.Int64("stage", d.Task.Step))
logutil.Logger(d.logCtx).Info("on cancelling state", zap.Stringer("state", d.Task.State), zap.Int64("stage", int64(d.Task.Step)))
errs := []error{errors.New("cancel")}
return d.onErrHandlingStage(errs)
}

// handle task in pausing state, cancel all running subtasks.
func (d *BaseDispatcher) onPausing() error {
logutil.Logger(d.logCtx).Info("on pausing state", zap.String("state", d.Task.State), zap.Int64("stage", d.Task.Step))
logutil.Logger(d.logCtx).Info("on pausing state", zap.Stringer("state", d.Task.State), zap.Int64("stage", int64(d.Task.Step)))
cnt, err := d.taskMgr.GetSubtaskInStatesCnt(d.Task.ID, proto.TaskStateRunning, proto.TaskStatePending)
if err != nil {
logutil.Logger(d.logCtx).Warn("check task failed", zap.Error(err))
Expand All @@ -252,7 +252,7 @@ func (d *BaseDispatcher) onPausing() error {
logutil.Logger(d.logCtx).Info("all running subtasks paused, update the task to paused state")
return d.updateTask(proto.TaskStatePaused, nil, RetrySQLTimes)
}
logutil.Logger(d.logCtx).Debug("on pausing state, this task keeps current state", zap.String("state", d.Task.State))
logutil.Logger(d.logCtx).Debug("on pausing state, this task keeps current state", zap.Stringer("state", d.Task.State))
return nil
}

Expand All @@ -261,7 +261,7 @@ var MockDMLExecutionOnPausedState func(task *proto.Task)

// handle task in paused state
func (d *BaseDispatcher) onPaused() error {
logutil.Logger(d.logCtx).Info("on paused state", zap.String("state", d.Task.State), zap.Int64("stage", d.Task.Step))
logutil.Logger(d.logCtx).Info("on paused state", zap.Stringer("state", d.Task.State), zap.Int64("stage", int64(d.Task.Step)))
failpoint.Inject("mockDMLExecutionOnPausedState", func(val failpoint.Value) {
if val.(bool) {
MockDMLExecutionOnPausedState(d.Task)
Expand All @@ -275,7 +275,7 @@ var TestSyncChan = make(chan struct{})

// handle task in resuming state
func (d *BaseDispatcher) onResuming() error {
logutil.Logger(d.logCtx).Info("on resuming state", zap.String("state", d.Task.State), zap.Int64("stage", d.Task.Step))
logutil.Logger(d.logCtx).Info("on resuming state", zap.Stringer("state", d.Task.State), zap.Int64("stage", int64(d.Task.Step)))
cnt, err := d.taskMgr.GetSubtaskInStatesCnt(d.Task.ID, proto.TaskStatePaused)
if err != nil {
logutil.Logger(d.logCtx).Warn("check task failed", zap.Error(err))
Expand All @@ -296,7 +296,7 @@ func (d *BaseDispatcher) onResuming() error {

// handle task in reverting state, check all revert subtasks finished.
func (d *BaseDispatcher) onReverting() error {
logutil.Logger(d.logCtx).Debug("on reverting state", zap.String("state", d.Task.State), zap.Int64("stage", d.Task.Step))
logutil.Logger(d.logCtx).Debug("on reverting state", zap.Stringer("state", d.Task.State), zap.Int64("stage", int64(d.Task.Step)))
cnt, err := d.taskMgr.GetSubtaskInStatesCnt(d.Task.ID, proto.TaskStateRevertPending, proto.TaskStateReverting)
if err != nil {
logutil.Logger(d.logCtx).Warn("check task failed", zap.Error(err))
Expand All @@ -309,20 +309,20 @@ func (d *BaseDispatcher) onReverting() error {
}
// Wait all subtasks in this stage finished.
d.OnTick(d.ctx, d.Task)
logutil.Logger(d.logCtx).Debug("on reverting state, this task keeps current state", zap.String("state", d.Task.State))
logutil.Logger(d.logCtx).Debug("on reverting state, this task keeps current state", zap.Stringer("state", d.Task.State))
return nil
}

// handle task in pending state, dispatch subtasks.
func (d *BaseDispatcher) onPending() error {
logutil.Logger(d.logCtx).Debug("on pending state", zap.String("state", d.Task.State), zap.Int64("stage", d.Task.Step))
logutil.Logger(d.logCtx).Debug("on pending state", zap.Stringer("state", d.Task.State), zap.Int64("stage", int64(d.Task.Step)))
return d.onNextStage()
}

// handle task in running state, check all running subtasks finished.
// If subtasks finished, run into the next stage.
func (d *BaseDispatcher) onRunning() error {
logutil.Logger(d.logCtx).Debug("on running state", zap.String("state", d.Task.State), zap.Int64("stage", d.Task.Step))
logutil.Logger(d.logCtx).Debug("on running state", zap.Stringer("state", d.Task.State), zap.Int64("stage", int64(d.Task.Step)))
subTaskErrs, err := d.taskMgr.CollectSubTaskError(d.Task.ID)
if err != nil {
logutil.Logger(d.logCtx).Warn("collect subtask error failed", zap.Error(err))
Expand All @@ -348,13 +348,13 @@ func (d *BaseDispatcher) onRunning() error {
}
// Wait all subtasks in this stage finished.
d.OnTick(d.ctx, d.Task)
logutil.Logger(d.logCtx).Debug("on running state, this task keeps current state", zap.String("state", d.Task.State))
logutil.Logger(d.logCtx).Debug("on running state, this task keeps current state", zap.Stringer("state", d.Task.State))
return nil
}

func (d *BaseDispatcher) onFinished() error {
metrics.UpdateMetricsForFinishTask(d.Task)
logutil.Logger(d.logCtx).Debug("schedule task, task is finished", zap.String("state", d.Task.State))
logutil.Logger(d.logCtx).Debug("schedule task, task is finished", zap.Stringer("state", d.Task.State))
return d.taskMgr.TransferSubTasks2History(d.Task.ID)
}

Expand Down Expand Up @@ -418,10 +418,10 @@ func (d *BaseDispatcher) replaceDeadNodesIfAny() error {
}

// updateTask update the task in tidb_global_task table.
func (d *BaseDispatcher) updateTask(taskState string, newSubTasks []*proto.Subtask, retryTimes int) (err error) {
func (d *BaseDispatcher) updateTask(taskState proto.TaskState, newSubTasks []*proto.Subtask, retryTimes int) (err error) {
prevState := d.Task.State
d.Task.State = taskState
logutil.BgLogger().Info("task state transform", zap.String("from", prevState), zap.String("to", taskState))
logutil.BgLogger().Info("task state transform", zap.Stringer("from", prevState), zap.Stringer("to", taskState))
if !VerifyTaskStateTransform(prevState, taskState) {
return errors.Errorf("invalid task state transform, from %s to %s", prevState, taskState)
}
Expand All @@ -440,14 +440,14 @@ func (d *BaseDispatcher) updateTask(taskState string, newSubTasks []*proto.Subta
break
}
if i%10 == 0 {
logutil.Logger(d.logCtx).Warn("updateTask first failed", zap.String("from", prevState), zap.String("to", d.Task.State),
logutil.Logger(d.logCtx).Warn("updateTask first failed", zap.Stringer("from", prevState), zap.Stringer("to", d.Task.State),
zap.Int("retry times", i), zap.Error(err))
}
time.Sleep(RetrySQLInterval)
}
if err != nil && retryTimes != nonRetrySQLTime {
logutil.Logger(d.logCtx).Warn("updateTask failed",
zap.String("from", prevState), zap.String("to", d.Task.State), zap.Int("retry times", retryTimes), zap.Error(err))
zap.Stringer("from", prevState), zap.Stringer("to", d.Task.State), zap.Int("retry times", retryTimes), zap.Error(err))
}
return err
}
Expand Down Expand Up @@ -494,8 +494,8 @@ func (d *BaseDispatcher) onNextStage() (err error) {

nextStep := d.GetNextStep(d, d.Task)
logutil.Logger(d.logCtx).Info("onNextStage",
zap.Int64("current-step", d.Task.Step),
zap.Int64("next-step", nextStep))
zap.Int64("current-step", int64(d.Task.Step)),
zap.Int64("next-step", int64(nextStep)))

// 1. Adjust the global task's concurrency.
if d.Task.State == proto.TaskStatePending {
Expand Down Expand Up @@ -529,7 +529,7 @@ func (d *BaseDispatcher) onNextStage() (err error) {
logutil.Logger(d.logCtx).Info("all subtasks dispatched and processed, finish the task")
} else {
logutil.Logger(d.logCtx).Info("move to next stage",
zap.Int64("from", currStep), zap.Int64("to", d.Task.Step))
zap.Int64("from", int64(currStep)), zap.Int64("to", int64(d.Task.Step)))
}
d.Task.StateUpdateTime = time.Now().UTC()
err = d.updateTask(taskState, nil, RetrySQLTimes)
Expand Down Expand Up @@ -565,8 +565,8 @@ func (d *BaseDispatcher) onNextStage() (err error) {
return nil
}

func (d *BaseDispatcher) dispatchSubTask(subtaskStep int64, metas [][]byte) error {
logutil.Logger(d.logCtx).Info("dispatch subtasks", zap.String("state", d.Task.State), zap.Int64("step", d.Task.Step), zap.Uint64("concurrency", d.Task.Concurrency), zap.Int("subtasks", len(metas)))
func (d *BaseDispatcher) dispatchSubTask(subtaskStep proto.Step, metas [][]byte) error {
logutil.Logger(d.logCtx).Info("dispatch subtasks", zap.Stringer("state", d.Task.State), zap.Int64("step", int64(d.Task.Step)), zap.Uint64("concurrency", d.Task.Concurrency), zap.Int("subtasks", len(metas)))

// select all available TiDB nodes for task.
serverNodes, err := d.GetEligibleInstances(d.ctx, d.Task)
Expand Down Expand Up @@ -603,7 +603,7 @@ func (d *BaseDispatcher) dispatchSubTask(subtaskStep int64, metas [][]byte) erro
}

func (d *BaseDispatcher) handlePlanErr(err error) error {
logutil.Logger(d.logCtx).Warn("generate plan failed", zap.Error(err), zap.String("state", d.Task.State))
logutil.Logger(d.logCtx).Warn("generate plan failed", zap.Error(err), zap.Stringer("state", d.Task.State))
if d.IsRetryableErr(err) {
return err
}
Expand Down Expand Up @@ -683,10 +683,10 @@ func (d *BaseDispatcher) GetAllSchedulerIDs(ctx context.Context, task *proto.Tas
}

// GetPreviousSubtaskMetas get subtask metas from specific step.
func (d *BaseDispatcher) GetPreviousSubtaskMetas(taskID int64, step int64) ([][]byte, error) {
func (d *BaseDispatcher) GetPreviousSubtaskMetas(taskID int64, step proto.Step) ([][]byte, error) {
previousSubtasks, err := d.taskMgr.GetSucceedSubtasksByStep(taskID, step)
if err != nil {
logutil.Logger(d.logCtx).Warn("get previous succeed subtask failed", zap.Int64("step", step))
logutil.Logger(d.logCtx).Warn("get previous succeed subtask failed", zap.Int64("step", int64(step)))
return nil, err
}
previousSubtaskMetas := make([][]byte, 0, len(previousSubtasks))
Expand All @@ -697,7 +697,7 @@ func (d *BaseDispatcher) GetPreviousSubtaskMetas(taskID int64, step int64) ([][]
}

// GetPreviousSchedulerIDs gets scheduler IDs that run previous step.
func (d *BaseDispatcher) GetPreviousSchedulerIDs(_ context.Context, taskID int64, step int64) ([]string, error) {
func (d *BaseDispatcher) GetPreviousSchedulerIDs(_ context.Context, taskID int64, step proto.Step) ([]string, error) {
return d.taskMgr.GetSchedulerIDsByTaskIDAndStep(taskID, step)
}

Expand Down
Loading

0 comments on commit b3dcf59

Please sign in to comment.