Skip to content

Commit

Permalink
lib/master(engine): fix PreDispatch failure handling (#5603)
Browse files Browse the repository at this point in the history
close #5589
  • Loading branch information
liuzix authored May 26, 2022
1 parent 1c5e550 commit e87ea0f
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
1 change: 1 addition & 0 deletions engine/client/task_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (d *TaskDispatcher) DispatchTask(
) error {
requestID, err := d.preDispatchTaskWithRetry(ctx, args)
if err != nil {
abortWorker(err)
return derrors.ErrExecutorPreDispatchFailed.Wrap(err)
}

Expand Down
13 changes: 9 additions & 4 deletions engine/client/task_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,17 @@ func TestPreDispatchAborted(t *testing.T) {
Return((*ExecutorResponse)(nil), status.Error(codes.Aborted, "aborted error")).
Once() // Aborted calls should NOT be retried.

var abortCalled atomic.Bool
err := dispatcher.DispatchTask(context.Background(), args, func() {
require.Fail(t, "the callback should never be called")
}, func(error) {
require.Fail(t, "not expected")
require.False(t, abortCalled.Swap(true))
})
require.Error(t, err)
require.Regexp(t, ".*aborted error.*", err)
mockExecClient.AssertExpectations(t)

require.True(t, abortCalled.Load())
}

func TestAlreadyExistsPanics(t *testing.T) {
Expand Down Expand Up @@ -144,8 +147,9 @@ func TestDispatchRetryCanceled(t *testing.T) {
}

var (
retryCount atomic.Int64
wg sync.WaitGroup
retryCount atomic.Int64
abortCalled atomic.Bool
wg sync.WaitGroup
)
mockExecClient.On("Send", mock.Anything, mock.Anything).
Return((*ExecutorResponse)(nil), status.Error(codes.Unknown, "should retry")).Run(
Expand All @@ -168,10 +172,11 @@ func TestDispatchRetryCanceled(t *testing.T) {
err := dispatcher.DispatchTask(cancelCtx, args, func() {
require.Fail(t, "the callback should never be called")
}, func(error) {
require.Fail(t, "not expected")
require.False(t, abortCalled.Swap(true))
})
require.Error(t, err)
require.Regexp(t, ".*ErrExecutorPreDispatchFailed.*", err)
require.True(t, abortCalled.Load())

wg.Wait()
}
Expand Down
16 changes: 16 additions & 0 deletions engine/lib/master/worker_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,22 @@ func TestCreateWorkerAndWorkerTimesOut(t *testing.T) {
suite.Close()
}

func TestCreateWorkerPredispatchFailed(t *testing.T) {
t.Parallel()

suite := NewWorkerManageTestSuite(true)
suite.manager.AbortCreatingWorker("worker-1", errors.New("injected error"))

event := suite.WaitForEvent(t, "worker-1")
require.Equal(t, workerDispatchFailedEvent, event.Tp)
require.NotNil(t, event.Handle.GetTombstone())
require.Error(t, event.Err)
require.Regexp(t, ".*injected error.*", event.Err)

suite.AssertNoEvents(t, "worker-1", 500*time.Millisecond)
suite.Close()
}

func TestCreateWorkerAndWorkerStatusUpdatedAndTimesOut(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit e87ea0f

Please sign in to comment.