From 44f100f80a7a7aa02622585ec14d6882be42735d Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Mon, 26 Aug 2024 20:46:45 +0800 Subject: [PATCH] disttask: fix flaky test TestParallelCancel (#55668) close pingcap/tidb#55658 --- .../framework/scheduler/scheduler_test.go | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/pkg/disttask/framework/scheduler/scheduler_test.go b/pkg/disttask/framework/scheduler/scheduler_test.go index bc7b5e236f52d..e6e452b9ee0a8 100644 --- a/pkg/disttask/framework/scheduler/scheduler_test.go +++ b/pkg/disttask/framework/scheduler/scheduler_test.go @@ -184,7 +184,6 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel, }() // 3s - cnt := 60 checkGetRunningTaskCnt := func(expected int) { require.Eventually(t, func() bool { return sch.GetRunningTaskCnt() == expected @@ -236,21 +235,16 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel, // test DetectTaskLoop checkGetTaskState := func(expectedState proto.TaskState) { - i := 0 - for ; i < cnt; i++ { + require.Eventually(t, func() bool { tasks, err := mgr.GetTasksInStates(ctx, expectedState) require.NoError(t, err) if len(tasks) == taskCnt { - break + return true } historyTasks, err := testutil.GetTasksFromHistoryInStates(ctx, mgr, expectedState) require.NoError(t, err) - if len(tasks)+len(historyTasks) == taskCnt { - break - } - time.Sleep(time.Millisecond * 50) - } - require.Less(t, i, cnt) + return len(tasks)+len(historyTasks) == taskCnt + }, 10*time.Second, 100*time.Millisecond) } // Test all subtasks are successful. var err error @@ -267,9 +261,16 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel, return } + subtasksMap := make(map[int64][]*proto.SubtaskBase, len(taskIDs)) + for _, taskID := range taskIDs { + subtasks, err := mgr.GetActiveSubtasks(ctx, taskID) + require.NoError(t, err) + subtasksMap[taskID] = subtasks + } + if isCancel { - for i := 1; i <= taskCnt; i++ { - err = mgr.CancelTask(ctx, int64(i)) + for _, taskID := range taskIDs { + err = mgr.CancelTask(ctx, taskID) require.NoError(t, err) } } else if isPauseAndResume { @@ -278,9 +279,11 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel, require.True(t, found) require.NoError(t, err) } - for i := 1; i <= subtaskCnt*taskCnt; i++ { - err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", int64(i), proto.SubtaskStatePaused, nil) - require.NoError(t, err) + for _, sts := range subtasksMap { + for _, st := range sts { + err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", st.ID, proto.SubtaskStatePaused, nil) + require.NoError(t, err) + } } checkGetTaskState(proto.TaskStatePaused) for i := 0; i < taskCnt; i++ { @@ -290,23 +293,25 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel, } // Mock subtasks succeed. - for i := 1; i <= subtaskCnt*taskCnt; i++ { - err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", int64(i), proto.SubtaskStateSucceed, nil) - require.NoError(t, err) + for _, sts := range subtasksMap { + for _, st := range sts { + err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", st.ID, proto.SubtaskStateSucceed, nil) + require.NoError(t, err) + } } checkGetTaskState(proto.TaskStateSucceed) return } else { if isSubtaskCancel { // Mock a subtask canceled - for i := 1; i <= subtaskCnt*taskCnt; i += subtaskCnt { - err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", int64(i), proto.SubtaskStateCanceled, nil) + for _, sts := range subtasksMap { + err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", sts[0].ID, proto.SubtaskStateCanceled, nil) require.NoError(t, err) } } else { // Mock a subtask fails. - for i := 1; i <= subtaskCnt*taskCnt; i += subtaskCnt { - err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", int64(i), proto.SubtaskStateFailed, nil) + for _, sts := range subtasksMap { + err = mgr.UpdateSubtaskStateAndError(ctx, ":4000", sts[0].ID, proto.SubtaskStateFailed, nil) require.NoError(t, err) } }