Skip to content

Commit

Permalink
api(engine/dm): rename CreatedTime to Duration (#7567)
Browse files Browse the repository at this point in the history
close #7343
  • Loading branch information
okJiang authored Nov 10, 2022
1 parent f4877a8 commit da92740
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 30 deletions.
8 changes: 6 additions & 2 deletions engine/jobmaster/dm/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type TaskStatus struct {
WorkerID frameModel.WorkerID `json:"worker_id"`
ConfigOutdated bool `json:"config_outdated"`
Status *dmpkg.QueryStatusResponse `json:"status"`
CreatedTime time.Time `json:"created_time"`
Duration time.Duration `json:"duration"`
}

// JobStatus represents status of a job
Expand Down Expand Up @@ -99,6 +99,7 @@ func (jm *JobMaster) QueryJobStatus(ctx context.Context, tasks []string) (*JobSt
cfgModRevision uint64
expectedStage metadata.TaskStage
createdTime time.Time
duration time.Duration
)

// task not exist
Expand All @@ -122,14 +123,17 @@ func (jm *JobMaster) QueryJobStatus(ctx context.Context, tasks []string) (*JobSt
createdTime = status.CreatedTime
}
}
if !createdTime.IsZero() {
duration = time.Since(createdTime)
}

mu.Lock()
jobStatus.TaskStatus[taskID] = TaskStatus{
ExpectedStage: expectedStage,
WorkerID: workerID,
Status: queryStatusResp,
ConfigOutdated: cfgModRevision != expectedCfgModRevision,
CreatedTime: createdTime,
Duration: duration,
}
mu.Unlock()
}()
Expand Down
55 changes: 35 additions & 20 deletions engine/jobmaster/dm/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ func TestQueryStatusAPI(t *testing.T) {
dumpTime, _ = time.Parse(time.RFC3339Nano, "2022-11-04T18:47:57.43382274+08:00")
loadTime, _ = time.Parse(time.RFC3339Nano, "2022-11-04T19:47:57.43382274+08:00")
syncTime, _ = time.Parse(time.RFC3339Nano, "2022-11-04T20:47:57.43382274+08:00")
dumpDuration = time.Hour
loadDuration = time.Minute
unitState = &metadata.UnitState{
CurrentUnitStatus: map[string]*metadata.UnitStatus{
// task1's worker not found, and current unit status is not stored
Expand All @@ -133,8 +135,8 @@ func TestQueryStatusAPI(t *testing.T) {
Stage: metadata.StageFinished,
CfgModRevision: 3,
},
Status: dumpStatusBytes,
CreatedTime: dumpTime,
Status: dumpStatusBytes,
Duration: dumpDuration,
},
&metadata.FinishedTaskStatus{
TaskStatus: metadata.TaskStatus{
Expand All @@ -143,8 +145,8 @@ func TestQueryStatusAPI(t *testing.T) {
Stage: metadata.StageFinished,
CfgModRevision: 3,
},
Status: loadStatusBytes,
CreatedTime: loadTime,
Status: loadStatusBytes,
Duration: loadDuration,
},
},
"task7": {
Expand All @@ -155,8 +157,8 @@ func TestQueryStatusAPI(t *testing.T) {
Stage: metadata.StageFinished,
CfgModRevision: 4,
},
Status: dumpStatusBytes,
CreatedTime: dumpTime,
Status: dumpStatusBytes,
Duration: dumpDuration,
},
&metadata.FinishedTaskStatus{
TaskStatus: metadata.TaskStatus{
Expand All @@ -165,8 +167,8 @@ func TestQueryStatusAPI(t *testing.T) {
Stage: metadata.StageFinished,
CfgModRevision: 4,
},
Status: loadStatusBytes,
CreatedTime: loadTime,
Status: loadStatusBytes,
Duration: loadDuration,
},
},
},
Expand Down Expand Up @@ -206,9 +208,22 @@ func TestQueryStatusAPI(t *testing.T) {
require.Equal(t, &dmpkg.QueryStatusResponse{ErrorMsg: "task task8 for job not found"}, taskStatus.Status)

jobStatus, err = jm.QueryJobStatus(ctx, nil)

require.NoError(t, err)

for task, currentStatus := range jobStatus.TaskStatus {
switch currentStatus.Status.Unit {
case frameModel.WorkerDMDump:
require.True(t, currentStatus.Duration-time.Since(dumpTime) < time.Second)
case frameModel.WorkerDMLoad:
require.True(t, currentStatus.Duration-time.Since(loadTime) < time.Second)
case frameModel.WorkerDMSync:
require.True(t, currentStatus.Duration-time.Since(syncTime) < time.Second)
}
// this is for passing follow test, because we can't offer the precise duration in advance
currentStatus.Duration = time.Second
jobStatus.TaskStatus[task] = currentStatus
}

expectedStatus := `{
"job_id": "dm-jobmaster-id",
"task_status": {
Expand All @@ -223,7 +238,7 @@ func TestQueryStatusAPI(t *testing.T) {
"result": null,
"status": null
},
"created_time": "0001-01-01T00:00:00Z"
"duration": 1000000000
},
"task2": {
"expected_stage": "Finished",
Expand All @@ -236,7 +251,7 @@ func TestQueryStatusAPI(t *testing.T) {
"result": null,
"status": null
},
"created_time": "2022-11-04T20:47:57.43382274+08:00"
"duration": 1000000000
},
"task3": {
"expected_stage": "Finished",
Expand All @@ -249,7 +264,7 @@ func TestQueryStatusAPI(t *testing.T) {
"result": null,
"status": null
},
"created_time": "2022-11-04T18:47:57.43382274+08:00"
"duration": 1000000000
},
"task4": {
"expected_stage": "Running",
Expand All @@ -270,7 +285,7 @@ func TestQueryStatusAPI(t *testing.T) {
"progress": "20.00 %"
}
},
"created_time": "2022-11-04T18:47:57.43382274+08:00"
"duration": 1000000000
},
"task5": {
"expected_stage": "Running",
Expand All @@ -292,7 +307,7 @@ func TestQueryStatusAPI(t *testing.T) {
"bps": 1000
}
},
"created_time": "2022-11-04T19:47:57.43382274+08:00"
"duration": 1000000000
},
"task6": {
"expected_stage": "Running",
Expand Down Expand Up @@ -330,7 +345,7 @@ func TestQueryStatusAPI(t *testing.T) {
"recentRps": 10
}
},
"created_time": "2022-11-04T20:47:57.43382274+08:00"
"duration": 1000000000
},
"task7": {
"expected_stage": "Finished",
Expand All @@ -343,7 +358,7 @@ func TestQueryStatusAPI(t *testing.T) {
"result": null,
"status": null
},
"created_time": "2022-11-04T20:47:57.43382274+08:00"
"duration": 1000000000
}
},
"finished_unit_status": {
Expand All @@ -363,7 +378,7 @@ func TestQueryStatusAPI(t *testing.T) {
"bps": 1000,
"progress": "20.00 %"
},
"CreatedTime": "2022-11-04T18:47:57.43382274+08:00"
"Duration": 3600000000000
},
{
"Unit": "DMLoadTask",
Expand All @@ -379,7 +394,7 @@ func TestQueryStatusAPI(t *testing.T) {
"metaBinlogGTID": "1-2-3",
"bps": 1000
},
"CreatedTime": "2022-11-04T19:47:57.43382274+08:00"
"Duration": 60000000000
}
],
"task7": [
Expand All @@ -398,7 +413,7 @@ func TestQueryStatusAPI(t *testing.T) {
"bps": 1000,
"progress": "20.00 %"
},
"CreatedTime": "2022-11-04T18:47:57.43382274+08:00"
"Duration": 3600000000000
},
{
"Unit": "DMLoadTask",
Expand All @@ -414,7 +429,7 @@ func TestQueryStatusAPI(t *testing.T) {
"metaBinlogGTID": "1-2-3",
"bps": 1000
},
"CreatedTime": "2022-11-04T19:47:57.43382274+08:00"
"Duration": 60000000000
}
]
}
Expand Down
2 changes: 1 addition & 1 deletion engine/jobmaster/dm/dm_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (jm *JobMaster) onWorkerFinished(finishedTaskStatus runtime.FinishedTaskSta

unitStateStore := jm.metadata.UnitStateStore()
err := unitStateStore.ReadModifyWrite(context.TODO(), func(state *metadata.UnitState) error {
finishedTaskStatus.CreatedTime = state.CurrentUnitStatus[taskStatus.Task].CreatedTime
finishedTaskStatus.Duration = time.Since(state.CurrentUnitStatus[taskStatus.Task].CreatedTime)
for i, status := range state.FinishedUnitStatus[taskStatus.Task] {
// when the unit is restarted by update-cfg or something, overwrite the old status and truncate
if status.Unit == taskStatus.Unit {
Expand Down
10 changes: 6 additions & 4 deletions engine/jobmaster/dm/dm_jobmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,9 @@ func TestDuplicateFinishedState(t *testing.T) {
mockBaseJobmaster.On("GetWorkers").Return(map[string]framework.WorkerHandle{}).Once()
err := jm.initComponents()
require.NoError(t, err)
dumpTime, _ := time.Parse(time.RFC3339Nano, "2022-11-04T18:47:57.43382274+08:00")
loadTime, _ := time.Parse(time.RFC3339Nano, "2022-11-04T19:47:57.43382274+08:00")
dumpDuration := time.Hour
loadDuration := time.Hour
state := &metadata.UnitState{
CurrentUnitStatus: map[string]*metadata.UnitStatus{
"task2": {
Expand All @@ -449,7 +450,7 @@ func TestDuplicateFinishedState(t *testing.T) {
Stage: metadata.StageFinished,
CfgModRevision: 3,
},
CreatedTime: dumpTime,
Duration: dumpDuration,
},
&metadata.FinishedTaskStatus{
TaskStatus: metadata.TaskStatus{
Expand All @@ -458,7 +459,7 @@ func TestDuplicateFinishedState(t *testing.T) {
Stage: metadata.StageFinished,
CfgModRevision: 3,
},
CreatedTime: loadTime,
Duration: loadDuration,
},
},
},
Expand Down Expand Up @@ -486,7 +487,8 @@ func TestDuplicateFinishedState(t *testing.T) {
require.Equal(t, 2, len(state2.FinishedUnitStatus["task2"]))
require.Equal(t, frameModel.WorkerDMLoad, state2.FinishedUnitStatus["task2"][1].Unit)
require.Equal(t, uint64(4), state2.FinishedUnitStatus["task2"][1].CfgModRevision)
require.Equal(t, loadTime, state2.FinishedUnitStatus["task2"][1].CreatedTime)
// the correct duration is that from loadTime to now
require.NotEqual(t, loadDuration, state2.FinishedUnitStatus["task2"][1].Duration)
}

// TODO: move to separate file
Expand Down
6 changes: 3 additions & 3 deletions engine/jobmaster/dm/metadata/unit_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ type TaskStatus struct {
// It only used when a task is finished.
type FinishedTaskStatus struct {
TaskStatus
Result *pb.ProcessResult
Status json.RawMessage
CreatedTime time.Time
Result *pb.ProcessResult
Status json.RawMessage
Duration time.Duration
}

// UnitStatus defines the unit status.
Expand Down

0 comments on commit da92740

Please sign in to comment.