Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate parentClosePolicy task for x-cluster child #4682

Merged
merged 1 commit into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1818,7 +1818,10 @@ func (t *transferActiveTaskExecutor) applyParentClosePolicyDomainActiveCheck(
sameClusterChildDomainIDs := make(map[int64]string) // child init eventID -> child domainID
remoteClusters := make(map[string]map[string]struct{})
parentClosePolicyWorkerEnabled := t.shard.GetConfig().EnableParentClosePolicyWorker()
if parentClosePolicyWorkerEnabled && len(childInfos) >= t.shard.GetConfig().ParentClosePolicyThreshold(domainName) {
if parentClosePolicyWorkerEnabled &&
len(childInfos) >= t.shard.GetConfig().ParentClosePolicyThreshold(domainName) {
// only signal parent close policy workflow when # of child workflow exceeds threshold
// the system workflow can handle the case where child domain is active in a different cluster
return nil, nil, true, nil
}

Expand Down Expand Up @@ -1852,19 +1855,14 @@ func (t *transferActiveTaskExecutor) applyParentClosePolicyDomainActiveCheck(
}

generators := []generatorF{}
if !parentClosePolicyWorkerEnabled {
for remoteCluster, targetDomainIDs := range remoteClusters {
generators = append(
generators,
func(taskGenerator execution.MutableStateTaskGenerator) error {
return taskGenerator.GenerateCrossClusterApplyParentClosePolicyTask(task, remoteCluster, targetDomainIDs)
})
}
return generators, sameClusterChildDomainIDs, false, nil
for remoteCluster, targetDomainIDs := range remoteClusters {
generators = append(
generators,
func(taskGenerator execution.MutableStateTaskGenerator) error {
return taskGenerator.GenerateCrossClusterApplyParentClosePolicyTask(task, remoteCluster, targetDomainIDs)
})
}
// if enabled, those cross cluster children will be handled by system workflow

return generators, sameClusterChildDomainIDs, len(remoteClusters) != 0, nil
return generators, sameClusterChildDomainIDs, false, nil
}

func (t *transferActiveTaskExecutor) processParentClosePolicy(
Expand Down
42 changes: 6 additions & 36 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,23 +606,14 @@ func (s *transferActiveTaskExecutorSuite) TestApplyParentPolicy_SameClusterChild
)
}

func (s *transferActiveTaskExecutorSuite) TestApplyParentPolicy_CrossClusterChild_Abandon() {
func (s *transferActiveTaskExecutorSuite) TestApplyParentPolicy_CrossClusterAbandonedChild_Abandon() {
s.testProcessCloseExecutionNoParentHasFewChildren(
map[string]string{
"child_abandon": s.remoteTargetDomainName,
"child_terminate": s.domainName,
"child_cancel": s.domainName,
},
func() {
// s.mockParentClosePolicyClient.On("SendParentClosePolicyRequest", mock.Anything, mock.MatchedBy(
// func(request parentclosepolicy.Request) bool {
// fmt.Println(request.Executions)
// return len(request.Executions) == 3
// },
// )).Return(nil).Times(1)
// TODO: uncomment the following and remove ParentClosePolicyClient mock after
// cross cluster apply parent close task is fixed
// s.expectCrossClusterApplyParentPolicyCalls()
s.expectCancelRequest(s.domainName)
s.expectTerminateRequest(s.domainName)
},
Expand All @@ -637,15 +628,8 @@ func (s *transferActiveTaskExecutorSuite) TestApplyParentPolicy_CrossClusterChil
"child_cancel": s.childDomainName,
},
func() {
s.mockParentClosePolicyClient.On("SendParentClosePolicyRequest", mock.Anything, mock.MatchedBy(
func(request parentclosepolicy.Request) bool {
return len(request.Executions) == 2
},
)).Return(nil).Times(1)
// TODO: uncomment the following and remove ParentClosePolicyClient mock after
// cross cluster apply parent close task is fixed
// s.expectCrossClusterApplyParentPolicyCalls()
// s.expectCancelRequest(s.childDomainName)
s.expectCrossClusterApplyParentPolicyCalls()
s.expectCancelRequest(s.childDomainName)
},
)
}
Expand All @@ -658,15 +642,8 @@ func (s *transferActiveTaskExecutorSuite) TestApplyParentPolicy_CrossClusterChil
"child_cancel": s.remoteTargetDomainName,
},
func() {
s.mockParentClosePolicyClient.On("SendParentClosePolicyRequest", mock.Anything, mock.MatchedBy(
func(request parentclosepolicy.Request) bool {
return len(request.Executions) == 2
},
)).Return(nil).Times(1)
// TODO: uncomment the following and remove ParentClosePolicyClient mock after
// cross cluster apply parent close task is fixed
// s.expectCrossClusterApplyParentPolicyCalls()
// s.expectTerminateRequest(s.domainName)
s.expectCrossClusterApplyParentPolicyCalls()
s.expectTerminateRequest(s.domainName)
},
)
}
Expand All @@ -679,14 +656,7 @@ func (s *transferActiveTaskExecutorSuite) TestApplyParentPolicy_CrossClusterChil
"child_cancel": s.remoteTargetDomainName,
},
func() {
s.mockParentClosePolicyClient.On("SendParentClosePolicyRequest", mock.Anything, mock.MatchedBy(
func(request parentclosepolicy.Request) bool {
return len(request.Executions) == 2
},
)).Return(nil).Times(1)
// TODO: uncomment the following and remove ParentClosePolicyClient mock after
// cross cluster apply parent close task is fixed
// s.expectCrossClusterApplyParentPolicyCalls()
s.expectCrossClusterApplyParentPolicyCalls()
},
)
}
Expand Down