Skip to content

Commit

Permalink
Update task executor to handle WorkflowAlreadyCompletedError for sign…
Browse files Browse the repository at this point in the history
…al and cancel workflow
  • Loading branch information
Shaddoll committed Apr 30, 2024
1 parent bf088f2 commit e99e02f
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 13 deletions.
10 changes: 10 additions & 0 deletions service/history/task/cross_cluster_source_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ func (t *crossClusterSourceTaskExecutor) executeCancelExecutionTask(

if failedCause != nil {
// remaining errors are non-retryable
cause := types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution
if *failedCause == types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted {
cause = types.CancelExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted
}
return requestCancelExternalExecutionFailed(
ctx,
taskInfo,
Expand All @@ -265,6 +269,7 @@ func (t *crossClusterSourceTaskExecutor) executeCancelExecutionTask(
taskInfo.TargetWorkflowID,
taskInfo.TargetRunID,
now,
cause,
)
}
return requestCancelExternalExecutionCompleted(
Expand Down Expand Up @@ -479,6 +484,10 @@ func (t *crossClusterSourceTaskExecutor) executeSignalExecutionTask(

if failedCause != nil {
// remaining errors are non-retryable
cause := types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution
if *failedCause == types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted {
cause = types.SignalExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted
}
return signalExternalExecutionFailed(
ctx,
taskInfo,
Expand All @@ -488,6 +497,7 @@ func (t *crossClusterSourceTaskExecutor) executeSignalExecutionTask(
taskInfo.TargetRunID,
signalInfo.Control,
now,
cause,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func (s *crossClusterSourceTaskExecutorSuite) TestExecuteCancelExecution_Failure
&types.CrossClusterTaskResponse{
TaskType: types.CrossClusterTaskTypeCancelExecution.Ptr(),
TaskState: int16(processingStateInitialized),
FailedCause: types.CrossClusterTaskFailedCauseWorkflowNotExists.Ptr(),
FailedCause: types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted.Ptr(),
},
func(
mutableState execution.MutableState,
Expand Down
36 changes: 26 additions & 10 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ func (t *transferActiveTaskExecutor) processCancelExecution(
task.TargetWorkflowID,
task.TargetRunID,
t.shard.GetTimeSource().Now(),
types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,

Check warning on line 633 in service/history/task/transfer_active_task_executor.go

View check run for this annotation

Codecov / codecov/patch

service/history/task/transfer_active_task_executor.go#L633

Added line #L633 was not covered by tests
)
return err
}
Expand All @@ -650,10 +651,15 @@ func (t *transferActiveTaskExecutor) processCancelExecution(
tag.TargetWorkflowRunID(task.TargetRunID),
tag.Error(err))

// Check to see if the error is non-transient, in which case add RequestCancelFailed
// event and complete transfer task by setting the err = nil
if common.IsServiceTransientError(err) || common.IsContextTimeoutError(err) {
// for retryable error just return
var notExistsErr *types.EntityNotExistsError
var alreadyCompletedErr *types.WorkflowExecutionAlreadyCompletedError
var cause *types.CancelExternalWorkflowExecutionFailedCause
if errors.As(err, &notExistsErr) {
cause = types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution.Ptr()
} else if errors.As(err, &alreadyCompletedErr) {
cause = types.CancelExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted.Ptr()
}
if cause == nil {
return err
}
return requestCancelExternalExecutionFailed(
Expand All @@ -664,6 +670,7 @@ func (t *transferActiveTaskExecutor) processCancelExecution(
task.TargetWorkflowID,
task.TargetRunID,
t.shard.GetTimeSource().Now(),
*cause,
)
}

Expand Down Expand Up @@ -750,6 +757,7 @@ func (t *transferActiveTaskExecutor) processSignalExecution(
task.TargetRunID,
signalInfo.Control,
t.shard.GetTimeSource().Now(),
types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,

Check warning on line 760 in service/history/task/transfer_active_task_executor.go

View check run for this annotation

Codecov / codecov/patch

service/history/task/transfer_active_task_executor.go#L760

Added line #L760 was not covered by tests
)
}

Expand All @@ -769,10 +777,15 @@ func (t *transferActiveTaskExecutor) processSignalExecution(
tag.TargetWorkflowRunID(task.TargetRunID),
tag.Error(err))

// Check to see if the error is non-transient, in which case add SignalFailed
// event and complete transfer task by setting the err = nil
if common.IsServiceTransientError(err) || common.IsContextTimeoutError(err) {
// for retryable error just return
var notExistsErr *types.EntityNotExistsError
var alreadyCompletedErr *types.WorkflowExecutionAlreadyCompletedError
var cause *types.SignalExternalWorkflowExecutionFailedCause
if errors.As(err, &notExistsErr) {
cause = types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution.Ptr()
} else if errors.As(err, &alreadyCompletedErr) {
cause = types.SignalExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted.Ptr()
}
if cause == nil {
return err
}
return signalExternalExecutionFailed(
Expand All @@ -784,6 +797,7 @@ func (t *transferActiveTaskExecutor) processSignalExecution(
task.TargetRunID,
signalInfo.Control,
t.shard.GetTimeSource().Now(),
*cause,
)
}

Expand Down Expand Up @@ -1419,6 +1433,7 @@ func requestCancelExternalExecutionFailed(
targetWorkflowID string,
targetRunID string,
now time.Time,
cause types.CancelExternalWorkflowExecutionFailedCause,
) error {

err := updateWorkflowExecution(ctx, wfContext, true,
Expand All @@ -1439,7 +1454,7 @@ func requestCancelExternalExecutionFailed(
targetDomain,
targetWorkflowID,
targetRunID,
types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,
cause,
)
return err
},
Expand All @@ -1464,6 +1479,7 @@ func signalExternalExecutionFailed(
targetRunID string,
control []byte,
now time.Time,
cause types.SignalExternalWorkflowExecutionFailedCause,
) error {

err := updateWorkflowExecution(ctx, wfContext, true,
Expand All @@ -1485,7 +1501,7 @@ func signalExternalExecutionFailed(
targetWorkflowID,
targetRunID,
control,
types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution,
cause,
)
return err
},
Expand Down
45 changes: 43 additions & 2 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Success() {
)
}

func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Failure() {
func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_EntityNotExistsError() {
s.testProcessCancelExecution(
s.targetDomainID,
func(
Expand All @@ -1067,6 +1067,27 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Failure() {
)
}

func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_WorkflowAlreadyCompleted() {
s.testProcessCancelExecution(
s.targetDomainID,
func(
mutableState execution.MutableState,
workflowExecution, targetExecution types.WorkflowExecution,
event *types.HistoryEvent,
transferTask Task,
requestCancelInfo *persistence.RequestCancelInfo,
) {
persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, event.ID, event.Version)
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
cancelRequest := createTestRequestCancelWorkflowExecutionRequest(s.targetDomainName, transferTask.GetInfo().(*persistence.TransferTaskInfo), requestCancelInfo.CancelRequestID)
s.mockHistoryClient.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), cancelRequest).Return(&types.WorkflowExecutionAlreadyCompletedError{}).Times(1)
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&persistence.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once()
},
)
}

func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Duplication() {
s.testProcessCancelExecution(
s.targetDomainID,
Expand Down Expand Up @@ -1202,7 +1223,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Success() {
)
}

func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Failure() {
func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_EntityNotExistsError() {
s.testProcessSignalExecution(
s.targetDomainID,
func(
Expand All @@ -1223,6 +1244,26 @@ func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Failure() {
)
}

func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_WorkflowAlreadyCompletedError() {
s.testProcessSignalExecution(
s.targetDomainID,
func(
mutableState execution.MutableState,
workflowExecution, targetExecution types.WorkflowExecution,
event *types.HistoryEvent,
transferTask Task,
signalInfo *persistence.SignalInfo,
) {
persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, event.ID, event.Version)
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
signalRequest := createTestSignalWorkflowExecutionRequest(s.targetDomainName, transferTask.GetInfo().(*persistence.TransferTaskInfo), signalInfo)
s.mockHistoryClient.EXPECT().SignalWorkflowExecution(gomock.Any(), signalRequest).Return(&types.WorkflowExecutionAlreadyCompletedError{}).Times(1)
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&persistence.AppendHistoryNodesResponse{}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once()
},
)
}
func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Duplication() {
s.testProcessSignalExecution(
s.targetDomainID,
Expand Down

0 comments on commit e99e02f

Please sign in to comment.