diff --git a/engine/client/task_dispatcher.go b/engine/client/task_dispatcher.go index 5fbf93132e7..3c9696148a9 100644 --- a/engine/client/task_dispatcher.go +++ b/engine/client/task_dispatcher.go @@ -85,6 +85,7 @@ func (d *TaskDispatcher) DispatchTask( ) error { requestID, err := d.preDispatchTaskWithRetry(ctx, args) if err != nil { + abortWorker(err) return derrors.ErrExecutorPreDispatchFailed.Wrap(err) } diff --git a/engine/client/task_dispatcher_test.go b/engine/client/task_dispatcher_test.go index 1aa0e8461db..5033a30ba85 100644 --- a/engine/client/task_dispatcher_test.go +++ b/engine/client/task_dispatcher_test.go @@ -92,14 +92,17 @@ func TestPreDispatchAborted(t *testing.T) { Return((*ExecutorResponse)(nil), status.Error(codes.Aborted, "aborted error")). Once() // Aborted calls should NOT be retried. + var abortCalled atomic.Bool err := dispatcher.DispatchTask(context.Background(), args, func() { require.Fail(t, "the callback should never be called") }, func(error) { - require.Fail(t, "not expected") + require.False(t, abortCalled.Swap(true)) }) require.Error(t, err) require.Regexp(t, ".*aborted error.*", err) mockExecClient.AssertExpectations(t) + + require.True(t, abortCalled.Load()) } func TestAlreadyExistsPanics(t *testing.T) { @@ -144,8 +147,9 @@ func TestDispatchRetryCanceled(t *testing.T) { } var ( - retryCount atomic.Int64 - wg sync.WaitGroup + retryCount atomic.Int64 + abortCalled atomic.Bool + wg sync.WaitGroup ) mockExecClient.On("Send", mock.Anything, mock.Anything). Return((*ExecutorResponse)(nil), status.Error(codes.Unknown, "should retry")).Run( @@ -168,10 +172,11 @@ func TestDispatchRetryCanceled(t *testing.T) { err := dispatcher.DispatchTask(cancelCtx, args, func() { require.Fail(t, "the callback should never be called") }, func(error) { - require.Fail(t, "not expected") + require.False(t, abortCalled.Swap(true)) }) require.Error(t, err) require.Regexp(t, ".*ErrExecutorPreDispatchFailed.*", err) + require.True(t, abortCalled.Load()) wg.Wait() } diff --git a/engine/lib/master/worker_manager_test.go b/engine/lib/master/worker_manager_test.go index a0a8a8d688d..d431a422b46 100644 --- a/engine/lib/master/worker_manager_test.go +++ b/engine/lib/master/worker_manager_test.go @@ -263,6 +263,22 @@ func TestCreateWorkerAndWorkerTimesOut(t *testing.T) { suite.Close() } +func TestCreateWorkerPredispatchFailed(t *testing.T) { + t.Parallel() + + suite := NewWorkerManageTestSuite(true) + suite.manager.AbortCreatingWorker("worker-1", errors.New("injected error")) + + event := suite.WaitForEvent(t, "worker-1") + require.Equal(t, workerDispatchFailedEvent, event.Tp) + require.NotNil(t, event.Handle.GetTombstone()) + require.Error(t, event.Err) + require.Regexp(t, ".*injected error.*", event.Err) + + suite.AssertNoEvents(t, "worker-1", 500*time.Millisecond) + suite.Close() +} + func TestCreateWorkerAndWorkerStatusUpdatedAndTimesOut(t *testing.T) { t.Parallel()