Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: fix wrongly add subtask when state transform fail #46259

Merged
merged 5 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion disttask/framework/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 10,
shard_count = 11,
deps = [
"//disttask/framework/dispatcher",
"//disttask/framework/proto",
Expand Down
25 changes: 14 additions & 11 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,16 @@
return errors.Errorf("invalid task state transform, from %s to %s", prevState, taskState)
}

failpoint.Inject("cancelBeforeUpdate", func() {
err := d.taskMgr.CancelGlobalTask(d.task.ID)
if err != nil {
logutil.Logger(d.logCtx).Error("cancel task failed", zap.Error(err))
}

Check warning on line 226 in disttask/framework/dispatcher/dispatcher.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/dispatcher/dispatcher.go#L225-L226

Added lines #L225 - L226 were not covered by tests
})
var retryable bool
for i := 0; i < retryTimes; i++ {
err = d.taskMgr.UpdateGlobalTaskAndAddSubTasks(d.task, newSubTasks, prevState)
if err == nil {
retryable, err = d.taskMgr.UpdateGlobalTaskAndAddSubTasks(d.task, newSubTasks, prevState)
if err == nil || !retryable {
break
}
if i%10 == 0 {
Expand Down Expand Up @@ -262,10 +269,6 @@
return err
}

if len(instanceIDs) == 0 {
return d.updateTask(proto.TaskStateReverted, nil, retrySQLTimes)
}

subTasks := make([]*proto.Subtask, 0, len(instanceIDs))
for _, id := range instanceIDs {
subTasks = append(subTasks, proto.NewSubtask(task.ID, task.Type, id, meta))
Expand Down Expand Up @@ -343,7 +346,6 @@
logutil.Logger(d.logCtx).Debug("create subtasks", zap.String("instanceID", instanceID))
subTasks = append(subTasks, proto.NewSubtask(task.ID, task.Type, instanceID, meta))
}

return d.updateTask(proto.TaskStateRunning, subTasks, retrySQLTimes)
}

Expand Down Expand Up @@ -412,7 +414,7 @@
}

// VerifyTaskStateTransform verifies whether the task state transform is valid.
func VerifyTaskStateTransform(oldState, newState string) bool {
func VerifyTaskStateTransform(from, to string) bool {
rules := map[string][]string{
proto.TaskStatePending: {
proto.TaskStateRunning,
Expand Down Expand Up @@ -454,13 +456,14 @@
proto.TaskStateRevertPending: {},
proto.TaskStateReverted: {},
}
logutil.BgLogger().Info("task state transform", zap.String("from", from), zap.String("to", to))

if oldState == newState {
if from == to {
return true
}

for _, state := range rules[oldState] {
if state == newState {
for _, state := range rules[from] {
if state == to {
return true
}
}
Expand Down
12 changes: 12 additions & 0 deletions disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,15 @@ func TestOwnerChange(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/disttask/framework/dispatcher/mockOwnerChange"))
distContext.Close()
}

func TestFrameworkCancelThenSubmitSubTask(t *testing.T) {
defer dispatcher.ClearTaskFlowHandle()
defer scheduler.ClearSchedulers()
var m sync.Map
RegisterTaskMeta(&m)
distContext := testkit.NewDistExecutionContext(t, 3)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/disttask/framework/dispatcher/cancelBeforeUpdate", "return()"))
DispatchTaskAndCheckFail("😊", t, &m)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/disttask/framework/dispatcher/cancelBeforeUpdate"))
distContext.Close()
}
12 changes: 8 additions & 4 deletions disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ func TestGlobalTaskTable(t *testing.T) {

prevState := task.State
task.State = proto.TaskStateRunning
err = gm.UpdateGlobalTaskAndAddSubTasks(task, nil, prevState)
retryable, err := gm.UpdateGlobalTaskAndAddSubTasks(task, nil, prevState)
require.NoError(t, err)
require.Equal(t, true, retryable)

task5, err := gm.GetGlobalTasksInStates(proto.TaskStateRunning)
require.NoError(t, err)
Expand Down Expand Up @@ -253,8 +254,9 @@ func TestBothGlobalAndSubTaskTable(t *testing.T) {
Meta: []byte("m2"),
},
}
err = sm.UpdateGlobalTaskAndAddSubTasks(task, subTasks, prevState)
retryable, err := sm.UpdateGlobalTaskAndAddSubTasks(task, subTasks, prevState)
require.NoError(t, err)
require.Equal(t, true, retryable)

task, err = sm.GetGlobalTaskByID(1)
require.NoError(t, err)
Expand Down Expand Up @@ -291,8 +293,9 @@ func TestBothGlobalAndSubTaskTable(t *testing.T) {
Meta: []byte("m4"),
},
}
err = sm.UpdateGlobalTaskAndAddSubTasks(task, subTasks, prevState)
retryable, err = sm.UpdateGlobalTaskAndAddSubTasks(task, subTasks, prevState)
require.NoError(t, err)
require.Equal(t, true, retryable)

task, err = sm.GetGlobalTaskByID(1)
require.NoError(t, err)
Expand Down Expand Up @@ -322,8 +325,9 @@ func TestBothGlobalAndSubTaskTable(t *testing.T) {
}()
prevState = task.State
task.State = proto.TaskStateFailed
err = sm.UpdateGlobalTaskAndAddSubTasks(task, subTasks, prevState)
retryable, err = sm.UpdateGlobalTaskAndAddSubTasks(task, subTasks, prevState)
require.EqualError(t, err, "updateTaskErr")
require.Equal(t, true, retryable)

task, err = sm.GetGlobalTaskByID(1)
require.NoError(t, err)
Expand Down
12 changes: 10 additions & 2 deletions disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,14 +468,20 @@
}

// UpdateGlobalTaskAndAddSubTasks update the global task and add new subtasks
func (stm *TaskManager) UpdateGlobalTaskAndAddSubTasks(gTask *proto.Task, subtasks []*proto.Subtask, prevState string) error {
return stm.WithNewTxn(stm.ctx, func(se sessionctx.Context) error {
func (stm *TaskManager) UpdateGlobalTaskAndAddSubTasks(gTask *proto.Task, subtasks []*proto.Subtask, prevState string) (bool, error) {
retryable := true

Check warning on line 472 in disttask/framework/storage/task_table.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/storage/task_table.go#L471-L472

Added lines #L471 - L472 were not covered by tests
err := stm.WithNewTxn(stm.ctx, func(se sessionctx.Context) error {
_, err := ExecSQL(stm.ctx, se, "update mysql.tidb_global_task set state = %?, dispatcher_id = %?, step = %?, state_update_time = %?, concurrency = %?, meta = %?, error = %? where id = %? and state = %?",
gTask.State, gTask.DispatcherID, gTask.Step, gTask.StateUpdateTime.UTC().String(), gTask.Concurrency, gTask.Meta, serializeErr(gTask.Error), gTask.ID, prevState)
if err != nil {
return err
}

if se.GetSessionVars().StmtCtx.AffectedRows() == 0 {
retryable = false

Check warning on line 481 in disttask/framework/storage/task_table.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/storage/task_table.go#L480-L481

Added lines #L480 - L481 were not covered by tests
return errors.New("invalid task state transform, state already changed")
}

failpoint.Inject("MockUpdateTaskErr", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(errors.New("updateTaskErr"))
Expand All @@ -498,6 +504,8 @@

return nil
})

return retryable, err

Check warning on line 508 in disttask/framework/storage/task_table.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/storage/task_table.go#L508

Added line #L508 was not covered by tests
}

func serializeErr(err error) []byte {
Expand Down