Skip to content

Commit

Permalink
disttask: fix flaky test TestParallelCancel (pingcap#55668)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Aug 26, 2024
1 parent ac9916a commit 44f100f
Showing 1 changed file with 27 additions and 22 deletions.
49 changes: 27 additions & 22 deletions pkg/disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel,
}()

// 3s
cnt := 60
checkGetRunningTaskCnt := func(expected int) {
require.Eventually(t, func() bool {
return sch.GetRunningTaskCnt() == expected
Expand Down Expand Up @@ -236,21 +235,16 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel,

// test DetectTaskLoop
checkGetTaskState := func(expectedState proto.TaskState) {
i := 0
for ; i < cnt; i++ {
require.Eventually(t, func() bool {
tasks, err := mgr.GetTasksInStates(ctx, expectedState)
require.NoError(t, err)
if len(tasks) == taskCnt {
break
return true
}
historyTasks, err := testutil.GetTasksFromHistoryInStates(ctx, mgr, expectedState)
require.NoError(t, err)
if len(tasks)+len(historyTasks) == taskCnt {
break
}
time.Sleep(time.Millisecond * 50)
}
require.Less(t, i, cnt)
return len(tasks)+len(historyTasks) == taskCnt
}, 10*time.Second, 100*time.Millisecond)
}
// Test all subtasks are successful.
var err error
Expand All @@ -267,9 +261,16 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel,
return
}

subtasksMap := make(map[int64][]*proto.SubtaskBase, len(taskIDs))
for _, taskID := range taskIDs {
subtasks, err := mgr.GetActiveSubtasks(ctx, taskID)
require.NoError(t, err)
subtasksMap[taskID] = subtasks
}

if isCancel {
for i := 1; i <= taskCnt; i++ {
err = mgr.CancelTask(ctx, int64(i))
for _, taskID := range taskIDs {
err = mgr.CancelTask(ctx, taskID)
require.NoError(t, err)
}
} else if isPauseAndResume {
Expand All @@ -278,9 +279,11 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel,
require.True(t, found)
require.NoError(t, err)
}
for i := 1; i <= subtaskCnt*taskCnt; i++ {
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", int64(i), proto.SubtaskStatePaused, nil)
require.NoError(t, err)
for _, sts := range subtasksMap {
for _, st := range sts {
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", st.ID, proto.SubtaskStatePaused, nil)
require.NoError(t, err)
}
}
checkGetTaskState(proto.TaskStatePaused)
for i := 0; i < taskCnt; i++ {
Expand All @@ -290,23 +293,25 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel,
}

// Mock subtasks succeed.
for i := 1; i <= subtaskCnt*taskCnt; i++ {
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", int64(i), proto.SubtaskStateSucceed, nil)
require.NoError(t, err)
for _, sts := range subtasksMap {
for _, st := range sts {
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", st.ID, proto.SubtaskStateSucceed, nil)
require.NoError(t, err)
}
}
checkGetTaskState(proto.TaskStateSucceed)
return
} else {
if isSubtaskCancel {
// Mock a subtask canceled
for i := 1; i <= subtaskCnt*taskCnt; i += subtaskCnt {
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", int64(i), proto.SubtaskStateCanceled, nil)
for _, sts := range subtasksMap {
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", sts[0].ID, proto.SubtaskStateCanceled, nil)
require.NoError(t, err)
}
} else {
// Mock a subtask fails.
for i := 1; i <= subtaskCnt*taskCnt; i += subtaskCnt {
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", int64(i), proto.SubtaskStateFailed, nil)
for _, sts := range subtasksMap {
err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", sts[0].ID, proto.SubtaskStateFailed, nil)
require.NoError(t, err)
}
}
Expand Down

0 comments on commit 44f100f

Please sign in to comment.