Skip to content

Commit

Permalink
Update parent close policy to terminate/cancel child workflows even a…
Browse files Browse the repository at this point in the history
…fter continue as new (uber#5032)
  • Loading branch information
Shaddoll authored Nov 28, 2022
1 parent d249434 commit 477c65b
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 77 deletions.
44 changes: 24 additions & 20 deletions common/types/mapper/proto/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2648,11 +2648,12 @@ func FromRequestCancelWorkflowExecutionRequest(t *types.RequestCancelWorkflowExe
return nil
}
return &apiv1.RequestCancelWorkflowExecutionRequest{
Domain: t.Domain,
WorkflowExecution: FromWorkflowExecution(t.WorkflowExecution),
Identity: t.Identity,
RequestId: t.RequestID,
Cause: t.Cause,
Domain: t.Domain,
WorkflowExecution: FromWorkflowExecution(t.WorkflowExecution),
Identity: t.Identity,
RequestId: t.RequestID,
Cause: t.Cause,
FirstExecutionRunId: t.FirstExecutionRunID,
}
}

Expand All @@ -2661,11 +2662,12 @@ func ToRequestCancelWorkflowExecutionRequest(t *apiv1.RequestCancelWorkflowExecu
return nil
}
return &types.RequestCancelWorkflowExecutionRequest{
Domain: t.Domain,
WorkflowExecution: ToWorkflowExecution(t.WorkflowExecution),
Identity: t.Identity,
RequestID: t.RequestId,
Cause: t.Cause,
Domain: t.Domain,
WorkflowExecution: ToWorkflowExecution(t.WorkflowExecution),
Identity: t.Identity,
RequestID: t.RequestId,
Cause: t.Cause,
FirstExecutionRunID: t.FirstExecutionRunId,
}
}

Expand Down Expand Up @@ -3917,11 +3919,12 @@ func FromTerminateWorkflowExecutionRequest(t *types.TerminateWorkflowExecutionRe
return nil
}
return &apiv1.TerminateWorkflowExecutionRequest{
Domain: t.Domain,
WorkflowExecution: FromWorkflowExecution(t.WorkflowExecution),
Reason: t.Reason,
Details: FromPayload(t.Details),
Identity: t.Identity,
Domain: t.Domain,
WorkflowExecution: FromWorkflowExecution(t.WorkflowExecution),
Reason: t.Reason,
Details: FromPayload(t.Details),
Identity: t.Identity,
FirstExecutionRunId: t.FirstExecutionRunID,
}
}

Expand All @@ -3930,11 +3933,12 @@ func ToTerminateWorkflowExecutionRequest(t *apiv1.TerminateWorkflowExecutionRequ
return nil
}
return &types.TerminateWorkflowExecutionRequest{
Domain: t.Domain,
WorkflowExecution: ToWorkflowExecution(t.WorkflowExecution),
Reason: t.Reason,
Details: ToPayload(t.Details),
Identity: t.Identity,
Domain: t.Domain,
WorkflowExecution: ToWorkflowExecution(t.WorkflowExecution),
Reason: t.Reason,
Details: ToPayload(t.Details),
Identity: t.Identity,
FirstExecutionRunID: t.FirstExecutionRunId,
}
}

Expand Down
44 changes: 24 additions & 20 deletions common/types/mapper/thrift/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -4229,11 +4229,12 @@ func FromRequestCancelWorkflowExecutionRequest(t *types.RequestCancelWorkflowExe
return nil
}
return &shared.RequestCancelWorkflowExecutionRequest{
Domain: &t.Domain,
WorkflowExecution: FromWorkflowExecution(t.WorkflowExecution),
Identity: &t.Identity,
RequestId: &t.RequestID,
Cause: &t.Cause,
Domain: &t.Domain,
WorkflowExecution: FromWorkflowExecution(t.WorkflowExecution),
Identity: &t.Identity,
RequestId: &t.RequestID,
Cause: &t.Cause,
FirstExecutionRunID: &t.FirstExecutionRunID,
}
}

Expand All @@ -4243,11 +4244,12 @@ func ToRequestCancelWorkflowExecutionRequest(t *shared.RequestCancelWorkflowExec
return nil
}
return &types.RequestCancelWorkflowExecutionRequest{
Domain: t.GetDomain(),
WorkflowExecution: ToWorkflowExecution(t.WorkflowExecution),
Identity: t.GetIdentity(),
RequestID: t.GetRequestId(),
Cause: t.GetCause(),
Domain: t.GetDomain(),
WorkflowExecution: ToWorkflowExecution(t.WorkflowExecution),
Identity: t.GetIdentity(),
RequestID: t.GetRequestId(),
Cause: t.GetCause(),
FirstExecutionRunID: t.GetFirstExecutionRunID(),
}
}

Expand Down Expand Up @@ -5561,11 +5563,12 @@ func FromTerminateWorkflowExecutionRequest(t *types.TerminateWorkflowExecutionRe
return nil
}
return &shared.TerminateWorkflowExecutionRequest{
Domain: &t.Domain,
WorkflowExecution: FromWorkflowExecution(t.WorkflowExecution),
Reason: &t.Reason,
Details: t.Details,
Identity: &t.Identity,
Domain: &t.Domain,
WorkflowExecution: FromWorkflowExecution(t.WorkflowExecution),
Reason: &t.Reason,
Details: t.Details,
Identity: &t.Identity,
FirstExecutionRunID: &t.FirstExecutionRunID,
}
}

Expand All @@ -5575,11 +5578,12 @@ func ToTerminateWorkflowExecutionRequest(t *shared.TerminateWorkflowExecutionReq
return nil
}
return &types.TerminateWorkflowExecutionRequest{
Domain: t.GetDomain(),
WorkflowExecution: ToWorkflowExecution(t.WorkflowExecution),
Reason: t.GetReason(),
Details: t.Details,
Identity: t.GetIdentity(),
Domain: t.GetDomain(),
WorkflowExecution: ToWorkflowExecution(t.WorkflowExecution),
Reason: t.GetReason(),
Details: t.Details,
Identity: t.GetIdentity(),
FirstExecutionRunID: t.GetFirstExecutionRunID(),
}
}

Expand Down
38 changes: 28 additions & 10 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -4640,11 +4640,12 @@ func (v *RequestCancelExternalWorkflowExecutionInitiatedEventAttributes) GetChil

// RequestCancelWorkflowExecutionRequest is an internal type (TBD...)
type RequestCancelWorkflowExecutionRequest struct {
Domain string `json:"domain,omitempty"`
WorkflowExecution *WorkflowExecution `json:"workflowExecution,omitempty"`
Identity string `json:"identity,omitempty"`
RequestID string `json:"requestId,omitempty"`
Cause string `json:"cause,omitempty"`
Domain string `json:"domain,omitempty"`
WorkflowExecution *WorkflowExecution `json:"workflowExecution,omitempty"`
Identity string `json:"identity,omitempty"`
RequestID string `json:"requestId,omitempty"`
Cause string `json:"cause,omitempty"`
FirstExecutionRunID string `json:"first_execution_run_id,omitempty"`
}

// GetDomain is an internal getter (TBD...)
Expand All @@ -4671,6 +4672,14 @@ func (v *RequestCancelWorkflowExecutionRequest) GetRequestID() (o string) {
return
}

// GetFirstExecutionRunID is an internal getter (TBD...)
func (v *RequestCancelWorkflowExecutionRequest) GetFirstExecutionRunID() (o string) {
if v != nil {
return v.FirstExecutionRunID
}
return
}

// ResetPointInfo is an internal type (TBD...)
type ResetPointInfo struct {
BinaryChecksum string `json:"binaryChecksum,omitempty"`
Expand Down Expand Up @@ -6295,11 +6304,12 @@ const (

// TerminateWorkflowExecutionRequest is an internal type (TBD...)
type TerminateWorkflowExecutionRequest struct {
Domain string `json:"domain,omitempty"`
WorkflowExecution *WorkflowExecution `json:"workflowExecution,omitempty"`
Reason string `json:"reason,omitempty"`
Details []byte `json:"details,omitempty"`
Identity string `json:"identity,omitempty"`
Domain string `json:"domain,omitempty"`
WorkflowExecution *WorkflowExecution `json:"workflowExecution,omitempty"`
Reason string `json:"reason,omitempty"`
Details []byte `json:"details,omitempty"`
Identity string `json:"identity,omitempty"`
FirstExecutionRunID string `json:"first_execution_run_id,omitempty"`
}

// GetDomain is an internal getter (TBD...)
Expand Down Expand Up @@ -6342,6 +6352,14 @@ func (v *TerminateWorkflowExecutionRequest) GetIdentity() (o string) {
return
}

// GetFirstExecutionRunID is an internal getter (TBD...)
func (v *TerminateWorkflowExecutionRequest) GetFirstExecutionRunID() (o string) {
if v != nil {
return v.FirstExecutionRunID
}
return
}

// TimeoutType is an internal type (TBD...)
type TimeoutType int32

Expand Down
20 changes: 11 additions & 9 deletions common/types/testdata/service_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,11 @@ var (
WorkerVersionInfo: &WorkerVersionInfo,
}
RequestCancelWorkflowExecutionRequest = types.RequestCancelWorkflowExecutionRequest{
Domain: DomainName,
WorkflowExecution: &WorkflowExecution,
Identity: Identity,
RequestID: RequestID,
Domain: DomainName,
WorkflowExecution: &WorkflowExecution,
Identity: Identity,
RequestID: RequestID,
FirstExecutionRunID: RunID,
}
StartWorkflowExecutionRequest = types.StartWorkflowExecutionRequest{
Domain: DomainName,
Expand Down Expand Up @@ -364,11 +365,12 @@ var (
RunID: RunID,
}
TerminateWorkflowExecutionRequest = types.TerminateWorkflowExecutionRequest{
Domain: DomainName,
WorkflowExecution: &WorkflowExecution,
Reason: Reason,
Details: Payload1,
Identity: Identity,
Domain: DomainName,
WorkflowExecution: &WorkflowExecution,
Reason: Reason,
Details: Payload1,
Identity: Identity,
FirstExecutionRunID: RunID,
}
DescribeWorkflowExecutionRequest = types.DescribeWorkflowExecutionRequest{
Domain: DomainName,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.7.1
github.com/uber-go/tally v3.3.15+incompatible
github.com/uber/cadence-idl v0.0.0-20220823092026-486e297e84de
github.com/uber/cadence-idl v0.0.0-20221119005017-6c250ae41984
github.com/uber/ringpop-go v0.8.5
github.com/uber/tchannel-go v1.22.0
github.com/urfave/cli v1.22.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20220823092026-486e297e84de h1:n1JFJtqdjW2VhZAGh+VBErXb8OVFy866sw4CHxaqvf8=
github.com/uber/cadence-idl v0.0.0-20220823092026-486e297e84de/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20221119005017-6c250ae41984 h1:4GqjyNMOxmeDe9an0P52XgpNFufJJ80rktcV0SWE3nk=
github.com/uber/cadence-idl v0.0.0-20221119005017-6c250ae41984/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
44 changes: 41 additions & 3 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2295,7 +2295,10 @@ func (e *historyEngineImpl) RequestCancelWorkflowExecution(
childWorkflowOnly := req.GetChildWorkflowOnly()
workflowExecution := types.WorkflowExecution{
WorkflowID: request.WorkflowExecution.WorkflowID,
RunID: request.WorkflowExecution.RunID,
}
// If firstExecutionRunID is set on the request always try to cancel currently running execution
if request.GetFirstExecutionRunID() == "" {
workflowExecution.RunID = request.WorkflowExecution.RunID
}

return workflow.UpdateCurrentWithActionFunc(ctx, e.executionCache, e.executionManager, domainID, e.shard.GetDomainCache(), workflowExecution, e.timeSource.Now(),
Expand All @@ -2313,6 +2316,22 @@ func (e *historyEngineImpl) RequestCancelWorkflowExecution(
}

executionInfo := mutableState.GetExecutionInfo()
if request.GetFirstExecutionRunID() != "" {
firstRunID := executionInfo.FirstExecutionRunID
if firstRunID == "" {
// This is needed for backwards compatibility. Workflow execution create with Cadence release v0.25.0 or earlier
// does not have FirstExecutionRunID stored as part of mutable state. If this is not set then load it from
// workflow execution started event.
startEvent, err := mutableState.GetStartEvent(ctx)
if err != nil {
return nil, err
}
firstRunID = startEvent.GetWorkflowExecutionStartedEventAttributes().GetFirstExecutionRunID()
}
if request.GetFirstExecutionRunID() != firstRunID {
return nil, &types.EntityNotExistsError{Message: "Workflow execution not found"}
}
}
if childWorkflowOnly {
parentWorkflowID := executionInfo.ParentWorkflowID
parentRunID := executionInfo.ParentRunID
Expand Down Expand Up @@ -2644,7 +2663,10 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(
childWorkflowOnly := terminateRequest.GetChildWorkflowOnly()
workflowExecution := types.WorkflowExecution{
WorkflowID: request.WorkflowExecution.WorkflowID,
RunID: request.WorkflowExecution.RunID,
}
// If firstExecutionRunID is set on the request always try to cancel currently running execution
if request.GetFirstExecutionRunID() == "" {
workflowExecution.RunID = request.WorkflowExecution.RunID
}

return workflow.UpdateCurrentWithActionFunc(
Expand All @@ -2660,8 +2682,24 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(
return nil, workflow.ErrAlreadyCompleted
}

executionInfo := mutableState.GetExecutionInfo()
if request.GetFirstExecutionRunID() != "" {
firstRunID := executionInfo.FirstExecutionRunID
if firstRunID == "" {
// This is needed for backwards compatibility. Workflow execution create with Cadence release v0.25.0 or earlier
// does not have FirstExecutionRunID stored as part of mutable state. If this is not set then load it from
// workflow execution started event.
startEvent, err := mutableState.GetStartEvent(ctx)
if err != nil {
return nil, err
}
firstRunID = startEvent.GetWorkflowExecutionStartedEventAttributes().GetFirstExecutionRunID()
}
if request.GetFirstExecutionRunID() != firstRunID {
return nil, &types.EntityNotExistsError{Message: "Workflow execution not found"}
}
}
if childWorkflowOnly {
executionInfo := mutableState.GetExecutionInfo()
parentWorkflowID := executionInfo.ParentWorkflowID
parentRunID := executionInfo.ParentRunID
if parentExecution.GetWorkflowID() != parentWorkflowID ||
Expand Down
10 changes: 5 additions & 5 deletions service/history/task/cross_cluster_target_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,10 @@ func (s *crossClusterTargetTaskExecutorSuite) testApplyParentPolicyTaskWithFailu
Domain: "some random domain name",
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: "some random workflow id",
RunID: "some random run id",
},
Identity: "history-service",
RequestID: "",
Identity: "history-service",
RequestID: "",
FirstExecutionRunID: "some random run id",
},
}
}
Expand Down Expand Up @@ -450,7 +450,7 @@ func (s *crossClusterTargetTaskExecutorSuite) TestApplyParentClosePolicyTask_Suc
func(_ context.Context, request *types.HistoryRequestCancelWorkflowExecutionRequest, option ...yarpc.CallOption) error {
s.Equal(childAttr.ChildDomainID, request.GetDomainUUID())
s.Equal(childAttr.ChildWorkflowID, request.GetCancelRequest().GetWorkflowExecution().GetWorkflowID())
s.Equal(childAttr.ChildRunID, request.GetCancelRequest().GetWorkflowExecution().GetRunID())
s.Equal(childAttr.ChildRunID, request.GetCancelRequest().GetFirstExecutionRunID())
s.True(request.GetChildWorkflowOnly())
s.Equal(taskInfo.GetWorkflowID(), request.GetExternalWorkflowExecution().GetWorkflowID())
s.Equal(taskInfo.GetRunID(), request.GetExternalWorkflowExecution().GetRunID())
Expand All @@ -461,7 +461,7 @@ func (s *crossClusterTargetTaskExecutorSuite) TestApplyParentClosePolicyTask_Suc
func(_ context.Context, request *types.HistoryTerminateWorkflowExecutionRequest, option ...yarpc.CallOption) error {
s.Equal(childAttr.ChildDomainID, request.GetDomainUUID())
s.Equal(childAttr.ChildWorkflowID, request.GetTerminateRequest().GetWorkflowExecution().GetWorkflowID())
s.Equal(childAttr.ChildRunID, request.GetTerminateRequest().GetWorkflowExecution().GetRunID())
s.Equal(childAttr.ChildRunID, request.GetTerminateRequest().GetFirstExecutionRunID())
s.True(request.GetChildWorkflowOnly())
s.Equal(taskInfo.GetWorkflowID(), request.GetExternalWorkflowExecution().GetWorkflowID())
s.Equal(taskInfo.GetRunID(), request.GetExternalWorkflowExecution().GetRunID())
Expand Down
8 changes: 6 additions & 2 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2034,10 +2034,12 @@ func applyParentClosePolicy(
Domain: childDomainName,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: childStartedWorkflowID,
RunID: childStartedRunID,
},
Reason: "by parent close policy",
Identity: execution.IdentityHistoryService,
// Include StartedRunID as FirstExecutionRunID on the request to allow child to be terminated across runs.
// If the child does continue as new it still propagates the RunID of first execution.
FirstExecutionRunID: childStartedRunID,
},
ExternalWorkflowExecution: parentWorkflowExecution,
ChildWorkflowOnly: true,
Expand All @@ -2050,9 +2052,11 @@ func applyParentClosePolicy(
Domain: childDomainName,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: childStartedWorkflowID,
RunID: childStartedRunID,
},
Identity: execution.IdentityHistoryService,
// Include StartedRunID as FirstExecutionRunID on the request to allow child to be canceled across runs.
// If the child does continue as new it still propagates the RunID of first execution.
FirstExecutionRunID: childStartedRunID,
},
ExternalWorkflowExecution: parentWorkflowExecution,
ChildWorkflowOnly: true,
Expand Down
Loading

0 comments on commit 477c65b

Please sign in to comment.