From e08772a9f0d41824a0c6043dac305e979d193f8d Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Fri, 17 Jan 2025 10:41:07 -0800 Subject: [PATCH] Follow ReusePolicy AllowDuplicateFailedOnly with TerminateExisting (#7097) ## What changed? Make WorkflowIdConflictPolicy TerminateExisting follow the WorkflowIdReusePolicy after an _unsuccessful_ termination. NOTE: The frontend change was made in https://github.com/temporalio/temporal/pull/7099 ## Why? When the termination from TerminateExisting fails, the user would expect the WorkflowIdReusePolicy to be applied as the Workflow is not running. ## How did you test it? Well ... there's no way to test this well right now. There are no unit tests; and the functional test cannot be written without the use of [testhooks](https://github.com/temporalio/temporal/pull/6938) since there is no other way to simulate the race condition here. I manually added a `sync.Once` into the code that terminates the workflow and can confirm the expected behavior. ## Potential risks ## Documentation ## Is hotfix candidate? --- service/history/api/startworkflow/api.go | 27 ++++++++++++++++-------- service/history/configs/config.go | 4 ++++ tests/workflow_test.go | 6 +++++- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/service/history/api/startworkflow/api.go b/service/history/api/startworkflow/api.go index 94f9ae47226..848b2ad7e36 100644 --- a/service/history/api/startworkflow/api.go +++ b/service/history/api/startworkflow/api.go @@ -27,6 +27,7 @@ package startworkflow import ( "context" "errors" + "fmt" "time" commonpb "go.temporal.io/api/common/v1" @@ -37,6 +38,7 @@ import ( "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/enums" "go.temporal.io/server/common/locks" "go.temporal.io/server/common/metrics" @@ -74,13 +76,14 @@ const ( // Starter starts a new workflow execution. type Starter struct { - shardContext shard.Context - workflowConsistencyChecker api.WorkflowConsistencyChecker - tokenSerializer common.TaskTokenSerializer - visibilityManager manager.VisibilityManager - request *historyservice.StartWorkflowExecutionRequest - namespace *namespace.Namespace - createOrUpdateLeaseFn api.CreateOrUpdateLeaseFunc + shardContext shard.Context + workflowConsistencyChecker api.WorkflowConsistencyChecker + tokenSerializer common.TaskTokenSerializer + visibilityManager manager.VisibilityManager + request *historyservice.StartWorkflowExecutionRequest + namespace *namespace.Namespace + createOrUpdateLeaseFn api.CreateOrUpdateLeaseFunc + followReusePolicyAfterConflictPolicyTerminate dynamicconfig.TypedPropertyFnWithNamespaceFilter[bool] } // creationParams is a container for all information obtained from creating the uncommitted execution. @@ -124,6 +127,7 @@ func NewStarter( request: request, namespace: namespaceEntry, createOrUpdateLeaseFn: createLeaseFn, + followReusePolicyAfterConflictPolicyTerminate: shardContext.GetConfig().FollowReusePolicyAfterConflictPolicyTerminate, }, nil } @@ -493,8 +497,13 @@ func (s *Starter) resolveDuplicateWorkflowID( resp, err := s.generateResponse(newRunID, mutableStateInfo.workflowTask, events) return resp, StartNew, err case consts.ErrWorkflowCompleted: - // current workflow already closed - // fallthough to the logic for only creating the new workflow below + if s.followReusePolicyAfterConflictPolicyTerminate(s.namespace.Name().String()) { + // Exit and retry again from the top. + // By returning an Unavailable service error, the entire Start request will be retried. + // NOTE: This WorkflowIDReusePolicy cannot be RejectDuplicate as the frontend will reject that. + return nil, StartErr, serviceerror.NewUnavailable(fmt.Sprintf("Termination failed: %v", err)) + } + // Fallthough to the logic for only creating the new workflow below. return nil, StartNew, nil default: return nil, StartErr, err diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 8b4a9c72062..22ed5eee8b6 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -349,6 +349,8 @@ type Config struct { EnableEagerWorkflowStart dynamicconfig.BoolPropertyFnWithNamespaceFilter NamespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn + FollowReusePolicyAfterConflictPolicyTerminate dynamicconfig.TypedPropertyFnWithNamespaceFilter[bool] + // ArchivalQueueProcessor settings ArchivalProcessorSchedulerWorkerCount dynamicconfig.TypedSubscribable[int] ArchivalProcessorMaxPollHostRPS dynamicconfig.IntPropertyFn @@ -548,6 +550,8 @@ func NewConfig( ReplicationProgressCacheMaxSize: dynamicconfig.ReplicationProgressCacheMaxSize.Get(dc), ReplicationProgressCacheTTL: dynamicconfig.ReplicationProgressCacheTTL.Get(dc), + FollowReusePolicyAfterConflictPolicyTerminate: dynamicconfig.FollowReusePolicyAfterConflictPolicyTerminate.Get(dc), + MaximumBufferedEventsBatch: dynamicconfig.MaximumBufferedEventsBatch.Get(dc), MaximumBufferedEventsSizeInBytes: dynamicconfig.MaximumBufferedEventsSizeInBytes.Get(dc), MaximumSignalsPerExecution: dynamicconfig.MaximumSignalsPerExecution.Get(dc), diff --git a/tests/workflow_test.go b/tests/workflow_test.go index 6d15a15daaa..140d4e0b0ce 100644 --- a/tests/workflow_test.go +++ b/tests/workflow_test.go @@ -128,7 +128,6 @@ func (s *WorkflowTestSuite) TestStartWorkflowExecution() { } func (s *WorkflowTestSuite) TestStartWorkflowExecution_Terminate() { - // setting this to 0 to be sure we are terminating old workflow s.OverrideDynamicConfig(dynamicconfig.WorkflowIdReuseMinimalInterval, 0) @@ -147,6 +146,11 @@ func (s *WorkflowTestSuite) TestStartWorkflowExecution_Terminate() { enumspb.WORKFLOW_ID_REUSE_POLICY_UNSPECIFIED, enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING, }, + { + "TerminateExisting with AllowDuplicateFailedOnly", + enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY, + enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING, + }, } for i, tc := range testCases {