Skip to content

Commit

Permalink
Generate parentClosePolicy task for x-cluster child (uber#4682)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Dec 14, 2021
1 parent 9540236 commit b7d2c77
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 49 deletions.
24 changes: 11 additions & 13 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1818,7 +1818,10 @@ func (t *transferActiveTaskExecutor) applyParentClosePolicyDomainActiveCheck(
sameClusterChildDomainIDs := make(map[int64]string) // child init eventID -> child domainID
remoteClusters := make(map[string]map[string]struct{})
parentClosePolicyWorkerEnabled := t.shard.GetConfig().EnableParentClosePolicyWorker()
if parentClosePolicyWorkerEnabled && len(childInfos) >= t.shard.GetConfig().ParentClosePolicyThreshold(domainName) {
if parentClosePolicyWorkerEnabled &&
len(childInfos) >= t.shard.GetConfig().ParentClosePolicyThreshold(domainName) {
// only signal parent close policy workflow when # of child workflow exceeds threshold
// the system workflow can handle the case where child domain is active in a different cluster
return nil, nil, true, nil
}

Expand Down Expand Up @@ -1852,19 +1855,14 @@ func (t *transferActiveTaskExecutor) applyParentClosePolicyDomainActiveCheck(
}

generators := []generatorF{}
if !parentClosePolicyWorkerEnabled {
for remoteCluster, targetDomainIDs := range remoteClusters {
generators = append(
generators,
func(taskGenerator execution.MutableStateTaskGenerator) error {
return taskGenerator.GenerateCrossClusterApplyParentClosePolicyTask(task, remoteCluster, targetDomainIDs)
})
}
return generators, sameClusterChildDomainIDs, false, nil
for remoteCluster, targetDomainIDs := range remoteClusters {
generators = append(
generators,
func(taskGenerator execution.MutableStateTaskGenerator) error {
return taskGenerator.GenerateCrossClusterApplyParentClosePolicyTask(task, remoteCluster, targetDomainIDs)
})
}
// if enabled, those cross cluster children will be handled by system workflow

return generators, sameClusterChildDomainIDs, len(remoteClusters) != 0, nil
return generators, sameClusterChildDomainIDs, false, nil
}

func (t *transferActiveTaskExecutor) processParentClosePolicy(
Expand Down
42 changes: 6 additions & 36 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,23 +606,14 @@ func (s *transferActiveTaskExecutorSuite) TestApplyParentPolicy_SameClusterChild
)
}

func (s *transferActiveTaskExecutorSuite) TestApplyParentPolicy_CrossClusterChild_Abandon() {
func (s *transferActiveTaskExecutorSuite) TestApplyParentPolicy_CrossClusterAbandonedChild_Abandon() {
s.testProcessCloseExecutionNoParentHasFewChildren(
map[string]string{
"child_abandon": s.remoteTargetDomainName,
"child_terminate": s.domainName,
"child_cancel": s.domainName,
},
func() {
// s.mockParentClosePolicyClient.On("SendParentClosePolicyRequest", mock.Anything, mock.MatchedBy(
// func(request parentclosepolicy.Request) bool {
// fmt.Println(request.Executions)
// return len(request.Executions) == 3
// },
// )).Return(nil).Times(1)
// TODO: uncomment the following and remove ParentClosePolicyClient mock after
// cross cluster apply parent close task is fixed
// s.expectCrossClusterApplyParentPolicyCalls()
s.expectCancelRequest(s.domainName)
s.expectTerminateRequest(s.domainName)
},
Expand All @@ -637,15 +628,8 @@ func (s *transferActiveTaskExecutorSuite) TestApplyParentPolicy_CrossClusterChil
"child_cancel": s.childDomainName,
},
func() {
s.mockParentClosePolicyClient.On("SendParentClosePolicyRequest", mock.Anything, mock.MatchedBy(
func(request parentclosepolicy.Request) bool {
return len(request.Executions) == 2
},
)).Return(nil).Times(1)
// TODO: uncomment the following and remove ParentClosePolicyClient mock after
// cross cluster apply parent close task is fixed
// s.expectCrossClusterApplyParentPolicyCalls()
// s.expectCancelRequest(s.childDomainName)
s.expectCrossClusterApplyParentPolicyCalls()
s.expectCancelRequest(s.childDomainName)
},
)
}
Expand All @@ -658,15 +642,8 @@ func (s *transferActiveTaskExecutorSuite) TestApplyParentPolicy_CrossClusterChil
"child_cancel": s.remoteTargetDomainName,
},
func() {
s.mockParentClosePolicyClient.On("SendParentClosePolicyRequest", mock.Anything, mock.MatchedBy(
func(request parentclosepolicy.Request) bool {
return len(request.Executions) == 2
},
)).Return(nil).Times(1)
// TODO: uncomment the following and remove ParentClosePolicyClient mock after
// cross cluster apply parent close task is fixed
// s.expectCrossClusterApplyParentPolicyCalls()
// s.expectTerminateRequest(s.domainName)
s.expectCrossClusterApplyParentPolicyCalls()
s.expectTerminateRequest(s.domainName)
},
)
}
Expand All @@ -679,14 +656,7 @@ func (s *transferActiveTaskExecutorSuite) TestApplyParentPolicy_CrossClusterChil
"child_cancel": s.remoteTargetDomainName,
},
func() {
s.mockParentClosePolicyClient.On("SendParentClosePolicyRequest", mock.Anything, mock.MatchedBy(
func(request parentclosepolicy.Request) bool {
return len(request.Executions) == 2
},
)).Return(nil).Times(1)
// TODO: uncomment the following and remove ParentClosePolicyClient mock after
// cross cluster apply parent close task is fixed
// s.expectCrossClusterApplyParentPolicyCalls()
s.expectCrossClusterApplyParentPolicyCalls()
},
)
}
Expand Down

0 comments on commit b7d2c77

Please sign in to comment.