Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: update task in framework #46913

Merged
merged 13 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ func (h *backfillingDispatcherExt) OnNextSubtasksBatch(ctx context.Context,
// Only redact when the task is complete.
if len(taskMeta) == 0 && useExtStore {
redactCloudStorageURI(ctx, gTask, &gTaskMeta)
if err := taskHandle.UpdateTask(gTask.State, nil, dispatcher.RetrySQLTimes); err != nil {
logutil.Logger(ctx).Error("failed to UpdateTask", zap.Error(err))
}
}
}()

Expand Down
41 changes: 9 additions & 32 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ var (
type TaskHandle interface {
// GetPreviousSubtaskMetas gets previous subtask metas.
GetPreviousSubtaskMetas(taskID int64, step int64) ([][]byte, error)
// UpdateTask update the task in tidb_global_task table.
UpdateTask(taskState string, newSubTasks []*proto.Subtask, retryTimes int) error
storage.SessionExecutor
}

Expand Down Expand Up @@ -203,7 +201,7 @@ func (d *BaseDispatcher) onReverting() error {
if cnt == 0 {
// Finish the rollback step.
logutil.Logger(d.logCtx).Info("all reverting tasks finished, update the task to reverted state")
return d.UpdateTask(proto.TaskStateReverted, nil, RetrySQLTimes)
return d.updateTask(proto.TaskStateReverted, nil, RetrySQLTimes)
}
// Wait all subtasks in this stage finished.
d.OnTick(d.ctx, d.Task)
Expand Down Expand Up @@ -243,7 +241,7 @@ func (d *BaseDispatcher) onRunning() error {
if d.Finished(d.Task) {
d.Task.StateUpdateTime = time.Now().UTC()
logutil.Logger(d.logCtx).Info("all subtasks dispatched and processed, finish the task")
err := d.UpdateTask(proto.TaskStateSucceed, nil, RetrySQLTimes)
err := d.updateTask(proto.TaskStateSucceed, nil, RetrySQLTimes)
if err != nil {
logutil.Logger(d.logCtx).Warn("update task failed", zap.Error(err))
return err
Expand Down Expand Up @@ -320,29 +318,8 @@ func (d *BaseDispatcher) replaceDeadNodesIfAny() error {
return nil
}

func (d *BaseDispatcher) addSubtasks(subtasks []*proto.Subtask) (err error) {
for i := 0; i < RetrySQLTimes; i++ {
err = d.taskMgr.AddSubTasks(d.Task, subtasks)
if err == nil {
break
}
if i%10 == 0 {
logutil.Logger(d.logCtx).Warn("addSubtasks failed", zap.String("state", d.Task.State), zap.Int64("step", d.Task.Step),
zap.Int("subtask cnt", len(subtasks)),
zap.Int("retry times", i), zap.Error(err))
}
time.Sleep(RetrySQLInterval)
}
if err != nil {
logutil.Logger(d.logCtx).Warn("addSubtasks failed", zap.String("state", d.Task.State), zap.Int64("step", d.Task.Step),
zap.Int("subtask cnt", len(subtasks)),
zap.Int("retry times", RetrySQLTimes), zap.Error(err))
}
return err
}

// UpdateTask update the task in tidb_global_task table.
func (d *BaseDispatcher) UpdateTask(taskState string, newSubTasks []*proto.Subtask, retryTimes int) (err error) {
// updateTask update the task in tidb_global_task table.
func (d *BaseDispatcher) updateTask(taskState string, newSubTasks []*proto.Subtask, retryTimes int) (err error) {
prevState := d.Task.State
d.Task.State = taskState
if !VerifyTaskStateTransform(prevState, taskState) {
Expand Down Expand Up @@ -398,7 +375,7 @@ func (d *BaseDispatcher) dispatchSubTask4Revert(meta []byte) error {
for _, id := range instanceIDs {
subTasks = append(subTasks, proto.NewSubtask(d.Task.ID, d.Task.Type, id, meta))
}
return d.UpdateTask(proto.TaskStateReverting, subTasks, RetrySQLTimes)
return d.updateTask(proto.TaskStateReverting, subTasks, RetrySQLTimes)
}

func (d *BaseDispatcher) onNextStage() error {
Expand All @@ -416,15 +393,15 @@ func (d *BaseDispatcher) onNextStage() error {
d.Task.Concurrency = MaxSubtaskConcurrency
}
d.Task.StateUpdateTime = time.Now().UTC()
if err := d.UpdateTask(proto.TaskStateRunning, nil, RetrySQLTimes); err != nil {
if err := d.updateTask(proto.TaskStateRunning, nil, RetrySQLTimes); err != nil {
return err
}
} else if d.StageFinished(d.Task) {
// 2. when previous stage finished, update to next stage.
d.Task.Step++
logutil.Logger(d.logCtx).Info("previous stage finished, run into next stage", zap.Int64("from", d.Task.Step-1), zap.Int64("to", d.Task.Step))
d.Task.StateUpdateTime = time.Now().UTC()
err := d.UpdateTask(proto.TaskStateRunning, nil, RetrySQLTimes)
err := d.updateTask(proto.TaskStateRunning, nil, RetrySQLTimes)
if err != nil {
return err
}
Expand Down Expand Up @@ -493,7 +470,7 @@ func (d *BaseDispatcher) dispatchSubTask(metas [][]byte) error {
logutil.Logger(d.logCtx).Debug("create subtasks", zap.String("instanceID", instanceID))
subTasks = append(subTasks, proto.NewSubtask(d.Task.ID, d.Task.Type, instanceID, meta))
}
return d.addSubtasks(subTasks)
return d.updateTask(d.Task.State, subTasks, RetrySQLTimes)
}

func (d *BaseDispatcher) handlePlanErr(err error) error {
Expand All @@ -503,7 +480,7 @@ func (d *BaseDispatcher) handlePlanErr(err error) error {
}
d.Task.Error = err
// state transform: pending -> failed.
return d.UpdateTask(proto.TaskStateFailed, nil, RetrySQLTimes)
return d.updateTask(proto.TaskStateFailed, nil, RetrySQLTimes)
}

// GenerateSchedulerNodes generate a eligible TiDB nodes.
Expand Down
35 changes: 21 additions & 14 deletions disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,32 +640,39 @@ func (stm *TaskManager) UpdateGlobalTaskAndAddSubTasks(gTask *proto.Task, subtas
return err
}
if se.GetSessionVars().StmtCtx.AffectedRows() == 0 {
retryable = false
return errors.New("invalid task state transform, state already changed")
rs, err := ExecSQL(stm.ctx, se, "select id from mysql.tidb_global_task where id = %? and state = %?", gTask.ID, prevState)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe gtask has no update(currently add index/import into will always change gtask), we need to check if the task have changed its state by admin cancel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why query it again when StmtCtx.AffectedRows() is 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why query it again when StmtCtx.AffectedRows() is 0?

If some onNextStage didn't update gtask, the affectedrows() return 0. But if the state didn't change by other admin command, it's safe to insert subtasks.
This case will not occurred for current add index and import into.

if err != nil {
return err
}
// state have changed.
if len(rs) == 0 {
retryable = false
return errors.New("invalid task state transform, state already changed")
}
}

failpoint.Inject("MockUpdateTaskErr", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(errors.New("updateTaskErr"))
}
})
if len(subtasks) > 0 {
subtaskState := proto.TaskStatePending
if gTask.State == proto.TaskStateReverting {
subtaskState = proto.TaskStateRevertPending
}

subtaskState := proto.TaskStatePending
if gTask.State == proto.TaskStateReverting {
subtaskState = proto.TaskStateRevertPending
}

for _, subtask := range subtasks {
// TODO: insert subtasks in batch
_, err = ExecSQL(stm.ctx, se, `insert into mysql.tidb_background_subtask
for _, subtask := range subtasks {
// TODO: insert subtasks in batch
_, err = ExecSQL(stm.ctx, se, `insert into mysql.tidb_background_subtask
(step, task_key, exec_id, meta, state, type, checkpoint, summary)
values (%?, %?, %?, %?, %?, %?, %?, %?)`,
gTask.Step, gTask.ID, subtask.SchedulerID, subtask.Meta, subtaskState, proto.Type2Int(subtask.Type), []byte{}, "{}")
if err != nil {
return err
gTask.Step, gTask.ID, subtask.SchedulerID, subtask.Meta, subtaskState, proto.Type2Int(subtask.Type), []byte{}, "{}")
if err != nil {
return err
}
}
}

return nil
})

Expand Down
9 changes: 0 additions & 9 deletions disttask/importinto/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,6 @@ func (dsp *ImportDispatcherExt) OnNextSubtasksBatch(ctx context.Context, taskHan
if err := updateResult(taskHandle, gTask, taskMeta); err != nil {
return nil, err
}
if err := taskHandle.UpdateTask(gTask.State, nil, dispatcher.RetrySQLTimes); err != nil {
return nil, err
}
logger.Info("move to post-process step ", zap.Any("result", taskMeta.Result))
case StepPostProcess + 1:
return nil, nil
Expand Down Expand Up @@ -572,9 +569,6 @@ func (dsp *ImportDispatcherExt) finishJob(ctx context.Context, logger *zap.Logge
taskHandle dispatcher.TaskHandle, gTask *proto.Task, taskMeta *TaskMeta) error {
dsp.unregisterTask(ctx, gTask)
redactSensitiveInfo(gTask, taskMeta)
if err := taskHandle.UpdateTask(gTask.State, nil, dispatcher.RetrySQLTimes); err != nil {
return err
}
summary := &importer.JobSummary{ImportedRows: taskMeta.Result.LoadedRowCnt}
// retry for 3+6+12+24+(30-4)*30 ~= 825s ~= 14 minutes
backoffer := backoff.NewExponential(dispatcher.RetrySQLInterval, 2, dispatcher.RetrySQLMaxInterval)
Expand All @@ -593,9 +587,6 @@ func (dsp *ImportDispatcherExt) failJob(ctx context.Context, taskHandle dispatch
dsp.switchTiKV2NormalMode(ctx, gTask, logger)
dsp.unregisterTask(ctx, gTask)
redactSensitiveInfo(gTask, taskMeta)
if err := taskHandle.UpdateTask(gTask.State, nil, dispatcher.RetrySQLTimes); err != nil {
return err
}
// retry for 3+6+12+24+(30-4)*30 ~= 825s ~= 14 minutes
backoffer := backoff.NewExponential(dispatcher.RetrySQLInterval, 2, dispatcher.RetrySQLMaxInterval)
return handle.RunWithRetry(ctx, dispatcher.RetrySQLTimes, backoffer, logger,
Expand Down