Skip to content

Commit

Permalink
Follow ReusePolicy AllowDuplicateFailedOnly with TerminateExisting (#…
Browse files Browse the repository at this point in the history
…7097)

## What changed?
<!-- Describe what has changed in this PR -->

Make WorkflowIdConflictPolicy TerminateExisting follow the
WorkflowIdReusePolicy after an _unsuccessful_ termination.

NOTE: The frontend change was made in
#7099

## Why?
<!-- Tell your future self why have you made these changes -->

When the termination from TerminateExisting fails, the user would expect
the WorkflowIdReusePolicy to be applied as the Workflow is not running.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

Well ... there's no way to test this well right now. There are no unit
tests; and the functional test cannot be written without the use of
[testhooks](#6938) since
there is no other way to simulate the race condition here.

I manually added a `sync.Once` into the code that terminates the
workflow and can confirm the expected behavior.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
stephanos authored Jan 17, 2025
1 parent 95e22ff commit e08772a
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 10 deletions.
27 changes: 18 additions & 9 deletions service/history/api/startworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package startworkflow
import (
"context"
"errors"
"fmt"
"time"

commonpb "go.temporal.io/api/common/v1"
Expand All @@ -37,6 +38,7 @@ import (
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/enums"
"go.temporal.io/server/common/locks"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -74,13 +76,14 @@ const (

// Starter starts a new workflow execution.
type Starter struct {
shardContext shard.Context
workflowConsistencyChecker api.WorkflowConsistencyChecker
tokenSerializer common.TaskTokenSerializer
visibilityManager manager.VisibilityManager
request *historyservice.StartWorkflowExecutionRequest
namespace *namespace.Namespace
createOrUpdateLeaseFn api.CreateOrUpdateLeaseFunc
shardContext shard.Context
workflowConsistencyChecker api.WorkflowConsistencyChecker
tokenSerializer common.TaskTokenSerializer
visibilityManager manager.VisibilityManager
request *historyservice.StartWorkflowExecutionRequest
namespace *namespace.Namespace
createOrUpdateLeaseFn api.CreateOrUpdateLeaseFunc
followReusePolicyAfterConflictPolicyTerminate dynamicconfig.TypedPropertyFnWithNamespaceFilter[bool]
}

// creationParams is a container for all information obtained from creating the uncommitted execution.
Expand Down Expand Up @@ -124,6 +127,7 @@ func NewStarter(
request: request,
namespace: namespaceEntry,
createOrUpdateLeaseFn: createLeaseFn,
followReusePolicyAfterConflictPolicyTerminate: shardContext.GetConfig().FollowReusePolicyAfterConflictPolicyTerminate,
}, nil
}

Expand Down Expand Up @@ -493,8 +497,13 @@ func (s *Starter) resolveDuplicateWorkflowID(
resp, err := s.generateResponse(newRunID, mutableStateInfo.workflowTask, events)
return resp, StartNew, err
case consts.ErrWorkflowCompleted:
// current workflow already closed
// fallthough to the logic for only creating the new workflow below
if s.followReusePolicyAfterConflictPolicyTerminate(s.namespace.Name().String()) {
// Exit and retry again from the top.
// By returning an Unavailable service error, the entire Start request will be retried.
// NOTE: This WorkflowIDReusePolicy cannot be RejectDuplicate as the frontend will reject that.
return nil, StartErr, serviceerror.NewUnavailable(fmt.Sprintf("Termination failed: %v", err))
}
// Fallthough to the logic for only creating the new workflow below.
return nil, StartNew, nil
default:
return nil, StartErr, err
Expand Down
4 changes: 4 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ type Config struct {
EnableEagerWorkflowStart dynamicconfig.BoolPropertyFnWithNamespaceFilter
NamespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn

FollowReusePolicyAfterConflictPolicyTerminate dynamicconfig.TypedPropertyFnWithNamespaceFilter[bool]

// ArchivalQueueProcessor settings
ArchivalProcessorSchedulerWorkerCount dynamicconfig.TypedSubscribable[int]
ArchivalProcessorMaxPollHostRPS dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -548,6 +550,8 @@ func NewConfig(
ReplicationProgressCacheMaxSize: dynamicconfig.ReplicationProgressCacheMaxSize.Get(dc),
ReplicationProgressCacheTTL: dynamicconfig.ReplicationProgressCacheTTL.Get(dc),

FollowReusePolicyAfterConflictPolicyTerminate: dynamicconfig.FollowReusePolicyAfterConflictPolicyTerminate.Get(dc),

MaximumBufferedEventsBatch: dynamicconfig.MaximumBufferedEventsBatch.Get(dc),
MaximumBufferedEventsSizeInBytes: dynamicconfig.MaximumBufferedEventsSizeInBytes.Get(dc),
MaximumSignalsPerExecution: dynamicconfig.MaximumSignalsPerExecution.Get(dc),
Expand Down
6 changes: 5 additions & 1 deletion tests/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ func (s *WorkflowTestSuite) TestStartWorkflowExecution() {
}

func (s *WorkflowTestSuite) TestStartWorkflowExecution_Terminate() {

// setting this to 0 to be sure we are terminating old workflow
s.OverrideDynamicConfig(dynamicconfig.WorkflowIdReuseMinimalInterval, 0)

Expand All @@ -147,6 +146,11 @@ func (s *WorkflowTestSuite) TestStartWorkflowExecution_Terminate() {
enumspb.WORKFLOW_ID_REUSE_POLICY_UNSPECIFIED,
enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING,
},
{
"TerminateExisting with AllowDuplicateFailedOnly",
enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING,
},
}

for i, tc := range testCases {
Expand Down

0 comments on commit e08772a

Please sign in to comment.