Skip to content

Commit

Permalink
disttask: record subtask start/update time (#46361)
Browse files Browse the repository at this point in the history
close #45990
  • Loading branch information
D3Hunter authored Aug 24, 2023
1 parent d6f993b commit c38f0c1
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 47 deletions.
11 changes: 8 additions & 3 deletions disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,14 @@ type Subtask struct {
// SchedulerID is the ID of scheduler, right now it's the same as instance_id, exec_id.
// its value is IP:PORT, see GenerateExecID
SchedulerID string
StartTime uint64
EndTime time.Time
Meta []byte
// StartTime is the time when the subtask is started.
// it's 0 if it hasn't started yet.
StartTime time.Time
// UpdateTime is the time when the subtask is updated.
// it can be used as subtask end time if the subtask is finished.
// it's 0 if it hasn't started yet.
UpdateTime time.Time
Meta []byte
}

// NewSubtask create a new subtask.
Expand Down
2 changes: 2 additions & 0 deletions disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
type TaskTable interface {
GetGlobalTasksInStates(states ...interface{}) (task []*proto.Task, err error)
GetGlobalTaskByID(taskID int64) (task *proto.Task, err error)

GetSubtaskInStates(instanceID string, taskID int64, step int64, states ...interface{}) (*proto.Subtask, error)
StartSubtask(id int64) error
UpdateSubtaskStateAndError(id int64, state string, err error) error
FinishSubtask(id int64, meta []byte) error
HasSubtasksInStates(instanceID string, taskID int64, step int64, states ...interface{}) (bool, error)
Expand Down
6 changes: 6 additions & 0 deletions disttask/framework/scheduler/interface_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ func (t *MockTaskTable) GetGlobalTasksInStates(states ...interface{}) ([]*proto.
}
}

// StartSubtask implements TaskTable.StartSubtask.
func (t *MockTaskTable) StartSubtask(id int64) error {
args := t.Called(id)
return args.Error(0)
}

// GetGlobalTaskByID implements TaskTable.GetTaskByID.
func (t *MockTaskTable) GetGlobalTaskByID(id int64) (*proto.Task, error) {
args := t.Called(id)
Expand Down
1 change: 1 addition & 0 deletions disttask/framework/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Manager struct {
// cancelFunc is used to fast cancel the scheduler.Run.
handlingTasks map[int64]context.CancelFunc
}
// id, it's the same as server id now, i.e. host:port.
id string
wg sync.WaitGroup
ctx context.Context
Expand Down
14 changes: 11 additions & 3 deletions disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ var TestSyncChan = make(chan struct{})

// InternalSchedulerImpl is the implementation of InternalScheduler.
type InternalSchedulerImpl struct {
ctx context.Context
cancel context.CancelFunc
ctx context.Context
cancel context.CancelFunc
// id, it's the same as server id now, i.e. host:port.
id string
taskID int64
taskTable TaskTable
Expand Down Expand Up @@ -145,7 +146,7 @@ func (s *InternalSchedulerImpl) run(ctx context.Context, task *proto.Task) error
if subtask == nil {
break
}
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateRunning, nil)
s.startSubtask(subtask.ID)
if err := s.getError(); err != nil {
break
}
Expand Down Expand Up @@ -378,6 +379,13 @@ func (s *InternalSchedulerImpl) resetError() {
s.mu.handled = false
}

func (s *InternalSchedulerImpl) startSubtask(id int64) {
err := s.taskTable.StartSubtask(id)
if err != nil {
s.onError(err)
}
}

func (s *InternalSchedulerImpl) updateSubtaskStateAndError(id int64, state string, subTaskErr error) {
err := s.taskTable.UpdateSubtaskStateAndError(id, state, subTaskErr)
if err != nil {
Expand Down
18 changes: 9 additions & 9 deletions disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestSchedulerRun(t *testing.T) {
mockScheduler.On("InitSubtaskExecEnv", mock.Anything).Return(nil).Once()
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Step: proto.StepOne}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateRunning).Return(updateSubtaskErr).Once()
mockSubtaskTable.On("StartSubtask", taskID).Return(updateSubtaskErr).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()
err = scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID})
require.EqualError(t, err, updateSubtaskErr.Error())
Expand All @@ -89,7 +89,7 @@ func TestSchedulerRun(t *testing.T) {
mockScheduler.On("InitSubtaskExecEnv", mock.Anything).Return(nil).Once()
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateRunning).Return(nil).Once()
mockSubtaskTable.On("StartSubtask", taskID).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateFailed).Return(nil).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()
Expand All @@ -105,7 +105,7 @@ func TestSchedulerRun(t *testing.T) {
mockScheduler.On("InitSubtaskExecEnv", mock.Anything).Return(nil).Once()
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateRunning).Return(nil).Once()
mockSubtaskTable.On("StartSubtask", taskID).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(runSubtaskErr).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateFailed).Return(nil).Once()
Expand All @@ -117,7 +117,7 @@ func TestSchedulerRun(t *testing.T) {
mockScheduler.On("InitSubtaskExecEnv", mock.Anything).Return(nil).Once()
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateRunning).Return(nil).Once()
mockSubtaskTable.On("StartSubtask", taskID).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(nil).Once()
mockScheduler.On("OnSubtaskFinished", mock.Anything, mock.Anything).Return([]byte(""), nil).Once()
Expand All @@ -134,14 +134,14 @@ func TestSchedulerRun(t *testing.T) {
mockScheduler.On("InitSubtaskExecEnv", mock.Anything).Return(nil).Once()
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", int64(1), proto.TaskStateRunning).Return(nil).Once()
mockSubtaskTable.On("StartSubtask", int64(1)).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(nil).Once()
mockScheduler.On("OnSubtaskFinished", mock.Anything, mock.Anything).Return([]byte(""), nil).Once()
mockSubtaskTable.On("FinishSubtask", int64(1), mock.Anything).Return(nil).Once()

mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 2, Type: tp, Step: proto.StepOne}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", int64(2), proto.TaskStateRunning).Return(nil).Once()
mockSubtaskTable.On("StartSubtask", int64(2)).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(nil).Once()
mockScheduler.On("OnSubtaskFinished", mock.Anything, mock.Anything).Return([]byte(""), nil).Once()
Expand All @@ -156,7 +156,7 @@ func TestSchedulerRun(t *testing.T) {
mockScheduler.On("InitSubtaskExecEnv", mock.Anything).Return(nil).Once()
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateRunning).Return(nil).Once()
mockSubtaskTable.On("StartSubtask", taskID).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}, MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(context.Canceled).Once()
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestSchedulerRun(t *testing.T) {
mockScheduler.On("InitSubtaskExecEnv", mock.Anything).Return(nil).Once()
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateRunning).Return(nil).Once()
mockSubtaskTable.On("StartSubtask", taskID).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}, MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(nil).Once().Run(func(args mock.Arguments) {
<-syncCh
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestScheduler(t *testing.T) {
mockScheduler.On("InitSubtaskExecEnv", mock.Anything).Return(nil).Once()
mockPool.On("RunWithConcurrency", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", proto.StepOne, taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateRunning).Return(nil).Once()
mockSubtaskTable.On("StartSubtask", taskID).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(runSubtaskErr).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateFailed).Return(nil).Once()
Expand Down
1 change: 1 addition & 0 deletions disttask/framework/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_test(
"//testkit",
"//testkit/testsetup",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
Expand Down
75 changes: 58 additions & 17 deletions disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/storage"
Expand Down Expand Up @@ -134,17 +135,19 @@ func TestSubTaskTable(t *testing.T) {
require.NoError(t, err)
require.Nil(t, nilTask)

task, err := sm.GetSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStatePending)
subtask, err := sm.GetSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStatePending)
require.NoError(t, err)
require.Equal(t, proto.TaskTypeExample, task.Type)
require.Equal(t, int64(1), task.TaskID)
require.Equal(t, proto.TaskStatePending, task.State)
require.Equal(t, "tidb1", task.SchedulerID)
require.Equal(t, []byte("test"), task.Meta)
require.Equal(t, proto.TaskTypeExample, subtask.Type)
require.Equal(t, int64(1), subtask.TaskID)
require.Equal(t, proto.TaskStatePending, subtask.State)
require.Equal(t, "tidb1", subtask.SchedulerID)
require.Equal(t, []byte("test"), subtask.Meta)
require.Zero(t, subtask.StartTime)
require.Zero(t, subtask.UpdateTime)

task2, err := sm.GetSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStatePending, proto.TaskStateReverted)
subtask2, err := sm.GetSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStatePending, proto.TaskStateReverted)
require.NoError(t, err)
require.Equal(t, task, task2)
require.Equal(t, subtask, subtask2)

ids, err := sm.GetSchedulerIDsByTaskID(1)
require.NoError(t, err)
Expand All @@ -170,20 +173,31 @@ func TestSubTaskTable(t *testing.T) {
err = sm.UpdateSubtaskHeartbeat("tidb1", 1, time.Now())
require.NoError(t, err)

err = sm.UpdateSubtaskStateAndError(1, proto.TaskStateRunning, nil)
ts := time.Now()
time.Sleep(time.Second)
require.NoError(t, sm.StartSubtask(1))

subtask, err = sm.GetSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStatePending)
require.NoError(t, err)
require.Nil(t, subtask)

task, err = sm.GetSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStatePending)
subtask, err = sm.GetSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStateRunning)
require.NoError(t, err)
require.Nil(t, task)
require.Equal(t, proto.TaskTypeExample, subtask.Type)
require.Equal(t, int64(1), subtask.TaskID)
require.Equal(t, proto.TaskStateRunning, subtask.State)
require.Equal(t, "tidb1", subtask.SchedulerID)
require.Equal(t, []byte("test"), subtask.Meta)
require.GreaterOrEqual(t, subtask.StartTime, ts)
require.GreaterOrEqual(t, subtask.UpdateTime, ts)

task, err = sm.GetSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStateRunning)
// check update time after state change to cancel
time.Sleep(time.Second)
require.NoError(t, sm.UpdateSubtaskStateAndError(1, proto.TaskStateCancelling, nil))
subtask2, err = sm.GetSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStateCancelling)
require.NoError(t, err)
require.Equal(t, proto.TaskTypeExample, task.Type)
require.Equal(t, int64(1), task.TaskID)
require.Equal(t, proto.TaskStateRunning, task.State)
require.Equal(t, "tidb1", task.SchedulerID)
require.Equal(t, []byte("test"), task.Meta)
require.Equal(t, proto.TaskStateCancelling, subtask2.State)
require.Greater(t, subtask2.UpdateTime, subtask.UpdateTime)

cnt, err = sm.GetSubtaskInStatesCnt(1, proto.TaskStatePending)
require.NoError(t, err)
Expand Down Expand Up @@ -215,6 +229,33 @@ func TestSubTaskTable(t *testing.T) {
subtasks, err = sm.GetSucceedSubtasksByStep(2, proto.StepInit)
require.NoError(t, err)
require.Len(t, subtasks, 1)

// test UpdateErrorToSubtask do update start/update time
err = sm.AddNewSubTask(3, proto.StepInit, "for_test", []byte("test"), proto.TaskTypeExample, false)
require.NoError(t, err)
require.NoError(t, sm.UpdateErrorToSubtask("for_test", errors.New("fail")))
subtask, err = sm.GetSubtaskInStates("for_test", 3, proto.StepInit, proto.TaskStateFailed)
require.NoError(t, err)
require.Equal(t, proto.TaskStateFailed, subtask.State)
require.Greater(t, subtask.StartTime, ts)
require.Greater(t, subtask.UpdateTime, ts)

// test FinishSubtask do update update time
err = sm.AddNewSubTask(4, proto.StepInit, "for_test1", []byte("test"), proto.TaskTypeExample, false)
require.NoError(t, err)
subtask, err = sm.GetSubtaskInStates("for_test1", 4, proto.StepInit, proto.TaskStatePending)
require.NoError(t, err)
require.NoError(t, sm.StartSubtask(subtask.ID))
subtask, err = sm.GetSubtaskInStates("for_test1", 4, proto.StepInit, proto.TaskStateRunning)
require.NoError(t, err)
require.Greater(t, subtask.StartTime, ts)
require.Greater(t, subtask.UpdateTime, ts)
time.Sleep(time.Second)
require.NoError(t, sm.FinishSubtask(subtask.ID, []byte{}))
subtask2, err = sm.GetSubtaskInStates("for_test1", 4, proto.StepInit, proto.TaskStateSucceed)
require.NoError(t, err)
require.Equal(t, subtask2.StartTime, subtask.StartTime)
require.Greater(t, subtask2.UpdateTime, subtask.UpdateTime)
}

func TestBothGlobalAndSubTaskTable(t *testing.T) {
Expand Down
Loading

0 comments on commit c38f0c1

Please sign in to comment.