diff --git a/engine/jobmaster/dm/api.go b/engine/jobmaster/dm/api.go index beb422d5c23..425e92243ee 100644 --- a/engine/jobmaster/dm/api.go +++ b/engine/jobmaster/dm/api.go @@ -33,7 +33,7 @@ type TaskStatus struct { WorkerID frameModel.WorkerID `json:"worker_id"` ConfigOutdated bool `json:"config_outdated"` Status *dmpkg.QueryStatusResponse `json:"status"` - CreatedTime time.Time `json:"created_time"` + Duration time.Duration `json:"duration"` } // JobStatus represents status of a job @@ -99,6 +99,7 @@ func (jm *JobMaster) QueryJobStatus(ctx context.Context, tasks []string) (*JobSt cfgModRevision uint64 expectedStage metadata.TaskStage createdTime time.Time + duration time.Duration ) // task not exist @@ -122,6 +123,9 @@ func (jm *JobMaster) QueryJobStatus(ctx context.Context, tasks []string) (*JobSt createdTime = status.CreatedTime } } + if !createdTime.IsZero() { + duration = time.Since(createdTime) + } mu.Lock() jobStatus.TaskStatus[taskID] = TaskStatus{ @@ -129,7 +133,7 @@ func (jm *JobMaster) QueryJobStatus(ctx context.Context, tasks []string) (*JobSt WorkerID: workerID, Status: queryStatusResp, ConfigOutdated: cfgModRevision != expectedCfgModRevision, - CreatedTime: createdTime, + Duration: duration, } mu.Unlock() }() diff --git a/engine/jobmaster/dm/api_test.go b/engine/jobmaster/dm/api_test.go index d1a2da1092d..c7574e5f460 100644 --- a/engine/jobmaster/dm/api_test.go +++ b/engine/jobmaster/dm/api_test.go @@ -112,6 +112,8 @@ func TestQueryStatusAPI(t *testing.T) { dumpTime, _ = time.Parse(time.RFC3339Nano, "2022-11-04T18:47:57.43382274+08:00") loadTime, _ = time.Parse(time.RFC3339Nano, "2022-11-04T19:47:57.43382274+08:00") syncTime, _ = time.Parse(time.RFC3339Nano, "2022-11-04T20:47:57.43382274+08:00") + dumpDuration = time.Hour + loadDuration = time.Minute unitState = &metadata.UnitState{ CurrentUnitStatus: map[string]*metadata.UnitStatus{ // task1's worker not found, and current unit status is not stored @@ -133,8 +135,8 @@ func TestQueryStatusAPI(t *testing.T) { Stage: metadata.StageFinished, CfgModRevision: 3, }, - Status: dumpStatusBytes, - CreatedTime: dumpTime, + Status: dumpStatusBytes, + Duration: dumpDuration, }, &metadata.FinishedTaskStatus{ TaskStatus: metadata.TaskStatus{ @@ -143,8 +145,8 @@ func TestQueryStatusAPI(t *testing.T) { Stage: metadata.StageFinished, CfgModRevision: 3, }, - Status: loadStatusBytes, - CreatedTime: loadTime, + Status: loadStatusBytes, + Duration: loadDuration, }, }, "task7": { @@ -155,8 +157,8 @@ func TestQueryStatusAPI(t *testing.T) { Stage: metadata.StageFinished, CfgModRevision: 4, }, - Status: dumpStatusBytes, - CreatedTime: dumpTime, + Status: dumpStatusBytes, + Duration: dumpDuration, }, &metadata.FinishedTaskStatus{ TaskStatus: metadata.TaskStatus{ @@ -165,8 +167,8 @@ func TestQueryStatusAPI(t *testing.T) { Stage: metadata.StageFinished, CfgModRevision: 4, }, - Status: loadStatusBytes, - CreatedTime: loadTime, + Status: loadStatusBytes, + Duration: loadDuration, }, }, }, @@ -206,9 +208,22 @@ func TestQueryStatusAPI(t *testing.T) { require.Equal(t, &dmpkg.QueryStatusResponse{ErrorMsg: "task task8 for job not found"}, taskStatus.Status) jobStatus, err = jm.QueryJobStatus(ctx, nil) - require.NoError(t, err) + for task, currentStatus := range jobStatus.TaskStatus { + switch currentStatus.Status.Unit { + case frameModel.WorkerDMDump: + require.True(t, currentStatus.Duration-time.Since(dumpTime) < time.Second) + case frameModel.WorkerDMLoad: + require.True(t, currentStatus.Duration-time.Since(loadTime) < time.Second) + case frameModel.WorkerDMSync: + require.True(t, currentStatus.Duration-time.Since(syncTime) < time.Second) + } + // this is for passing follow test, because we can't offer the precise duration in advance + currentStatus.Duration = time.Second + jobStatus.TaskStatus[task] = currentStatus + } + expectedStatus := `{ "job_id": "dm-jobmaster-id", "task_status": { @@ -223,7 +238,7 @@ func TestQueryStatusAPI(t *testing.T) { "result": null, "status": null }, - "created_time": "0001-01-01T00:00:00Z" + "duration": 1000000000 }, "task2": { "expected_stage": "Finished", @@ -236,7 +251,7 @@ func TestQueryStatusAPI(t *testing.T) { "result": null, "status": null }, - "created_time": "2022-11-04T20:47:57.43382274+08:00" + "duration": 1000000000 }, "task3": { "expected_stage": "Finished", @@ -249,7 +264,7 @@ func TestQueryStatusAPI(t *testing.T) { "result": null, "status": null }, - "created_time": "2022-11-04T18:47:57.43382274+08:00" + "duration": 1000000000 }, "task4": { "expected_stage": "Running", @@ -270,7 +285,7 @@ func TestQueryStatusAPI(t *testing.T) { "progress": "20.00 %" } }, - "created_time": "2022-11-04T18:47:57.43382274+08:00" + "duration": 1000000000 }, "task5": { "expected_stage": "Running", @@ -292,7 +307,7 @@ func TestQueryStatusAPI(t *testing.T) { "bps": 1000 } }, - "created_time": "2022-11-04T19:47:57.43382274+08:00" + "duration": 1000000000 }, "task6": { "expected_stage": "Running", @@ -330,7 +345,7 @@ func TestQueryStatusAPI(t *testing.T) { "recentRps": 10 } }, - "created_time": "2022-11-04T20:47:57.43382274+08:00" + "duration": 1000000000 }, "task7": { "expected_stage": "Finished", @@ -343,7 +358,7 @@ func TestQueryStatusAPI(t *testing.T) { "result": null, "status": null }, - "created_time": "2022-11-04T20:47:57.43382274+08:00" + "duration": 1000000000 } }, "finished_unit_status": { @@ -363,7 +378,7 @@ func TestQueryStatusAPI(t *testing.T) { "bps": 1000, "progress": "20.00 %" }, - "CreatedTime": "2022-11-04T18:47:57.43382274+08:00" + "Duration": 3600000000000 }, { "Unit": "DMLoadTask", @@ -379,7 +394,7 @@ func TestQueryStatusAPI(t *testing.T) { "metaBinlogGTID": "1-2-3", "bps": 1000 }, - "CreatedTime": "2022-11-04T19:47:57.43382274+08:00" + "Duration": 60000000000 } ], "task7": [ @@ -398,7 +413,7 @@ func TestQueryStatusAPI(t *testing.T) { "bps": 1000, "progress": "20.00 %" }, - "CreatedTime": "2022-11-04T18:47:57.43382274+08:00" + "Duration": 3600000000000 }, { "Unit": "DMLoadTask", @@ -414,7 +429,7 @@ func TestQueryStatusAPI(t *testing.T) { "metaBinlogGTID": "1-2-3", "bps": 1000 }, - "CreatedTime": "2022-11-04T19:47:57.43382274+08:00" + "Duration": 60000000000 } ] } diff --git a/engine/jobmaster/dm/dm_jobmaster.go b/engine/jobmaster/dm/dm_jobmaster.go index fb6a38389fd..9a6c72e5fe6 100644 --- a/engine/jobmaster/dm/dm_jobmaster.go +++ b/engine/jobmaster/dm/dm_jobmaster.go @@ -213,7 +213,7 @@ func (jm *JobMaster) onWorkerFinished(finishedTaskStatus runtime.FinishedTaskSta unitStateStore := jm.metadata.UnitStateStore() err := unitStateStore.ReadModifyWrite(context.TODO(), func(state *metadata.UnitState) error { - finishedTaskStatus.CreatedTime = state.CurrentUnitStatus[taskStatus.Task].CreatedTime + finishedTaskStatus.Duration = time.Since(state.CurrentUnitStatus[taskStatus.Task].CreatedTime) for i, status := range state.FinishedUnitStatus[taskStatus.Task] { // when the unit is restarted by update-cfg or something, overwrite the old status and truncate if status.Unit == taskStatus.Unit { diff --git a/engine/jobmaster/dm/dm_jobmaster_test.go b/engine/jobmaster/dm/dm_jobmaster_test.go index e2e26c244d4..37471437f0f 100644 --- a/engine/jobmaster/dm/dm_jobmaster_test.go +++ b/engine/jobmaster/dm/dm_jobmaster_test.go @@ -430,8 +430,9 @@ func TestDuplicateFinishedState(t *testing.T) { mockBaseJobmaster.On("GetWorkers").Return(map[string]framework.WorkerHandle{}).Once() err := jm.initComponents() require.NoError(t, err) - dumpTime, _ := time.Parse(time.RFC3339Nano, "2022-11-04T18:47:57.43382274+08:00") loadTime, _ := time.Parse(time.RFC3339Nano, "2022-11-04T19:47:57.43382274+08:00") + dumpDuration := time.Hour + loadDuration := time.Hour state := &metadata.UnitState{ CurrentUnitStatus: map[string]*metadata.UnitStatus{ "task2": { @@ -449,7 +450,7 @@ func TestDuplicateFinishedState(t *testing.T) { Stage: metadata.StageFinished, CfgModRevision: 3, }, - CreatedTime: dumpTime, + Duration: dumpDuration, }, &metadata.FinishedTaskStatus{ TaskStatus: metadata.TaskStatus{ @@ -458,7 +459,7 @@ func TestDuplicateFinishedState(t *testing.T) { Stage: metadata.StageFinished, CfgModRevision: 3, }, - CreatedTime: loadTime, + Duration: loadDuration, }, }, }, @@ -486,7 +487,8 @@ func TestDuplicateFinishedState(t *testing.T) { require.Equal(t, 2, len(state2.FinishedUnitStatus["task2"])) require.Equal(t, frameModel.WorkerDMLoad, state2.FinishedUnitStatus["task2"][1].Unit) require.Equal(t, uint64(4), state2.FinishedUnitStatus["task2"][1].CfgModRevision) - require.Equal(t, loadTime, state2.FinishedUnitStatus["task2"][1].CreatedTime) + // the correct duration is that from loadTime to now + require.NotEqual(t, loadDuration, state2.FinishedUnitStatus["task2"][1].Duration) } // TODO: move to separate file diff --git a/engine/jobmaster/dm/metadata/unit_state.go b/engine/jobmaster/dm/metadata/unit_state.go index e2378ca2793..775773c03ce 100644 --- a/engine/jobmaster/dm/metadata/unit_state.go +++ b/engine/jobmaster/dm/metadata/unit_state.go @@ -95,9 +95,9 @@ type TaskStatus struct { // It only used when a task is finished. type FinishedTaskStatus struct { TaskStatus - Result *pb.ProcessResult - Status json.RawMessage - CreatedTime time.Time + Result *pb.ProcessResult + Status json.RawMessage + Duration time.Duration } // UnitStatus defines the unit status.