Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert transfer to cross cluster task if target domain is active in remote cluster #4268

Merged
merged 6 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions service/history/execution/mutable_state_task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package execution

import (
"errors"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -77,6 +78,10 @@ type (
) error
GenerateWorkflowSearchAttrTasks() error
GenerateWorkflowResetTasks() error
GenerateCrossClusterTaskFromTransferTask(
transferTask *persistence.TransferTaskInfo,
targetCluster string,
) error

// these 2 APIs should only be called when mutable state transaction is being closed
GenerateActivityTimerTasks(
Expand Down Expand Up @@ -547,6 +552,67 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowResetTasks() error {
return nil
}

func (r *mutableStateTaskGeneratorImpl) GenerateCrossClusterTaskFromTransferTask(
task *persistence.TransferTaskInfo,
targetCluster string,
) error {
if targetCluster == r.clusterMetadata.GetCurrentClusterName() {
// this should not happen
return errors.New("unable to create cross-cluster task for current cluster")
}

var crossClusterTask persistence.Task
switch task.TaskType {
case persistence.TransferTaskTypeCancelExecution:
crossClusterTask = &persistence.CrossClusterCancelExecutionTask{
TargetCluster: targetCluster,
CancelExecutionTask: persistence.CancelExecutionTask{
// TaskID is set by shard context
TargetDomainID: task.TargetDomainID,
TargetWorkflowID: task.TargetWorkflowID,
TargetRunID: task.TargetRunID,
TargetChildWorkflowOnly: task.TargetChildWorkflowOnly,
InitiatedID: task.ScheduleID,
Version: task.Version,
},
}
case persistence.TransferTaskTypeSignalExecution:
crossClusterTask = &persistence.CrossClusterSignalExecutionTask{
TargetCluster: targetCluster,
SignalExecutionTask: persistence.SignalExecutionTask{
// TaskID is set by shard context
TargetDomainID: task.TargetDomainID,
TargetWorkflowID: task.TargetWorkflowID,
TargetRunID: task.TargetRunID,
TargetChildWorkflowOnly: task.TargetChildWorkflowOnly,
InitiatedID: task.ScheduleID,
Version: task.Version,
},
}
case persistence.TransferTaskTypeStartChildExecution:
crossClusterTask = &persistence.CrossClusterStartChildExecutionTask{
TargetCluster: targetCluster,
StartChildExecutionTask: persistence.StartChildExecutionTask{
// TaskID is set by shard context
TargetDomainID: task.TargetDomainID,
TargetWorkflowID: task.TargetWorkflowID,
InitiatedID: task.ScheduleID,
Version: task.Version,
},
}
// TODO: add the case for TransferTaskTypeCloseExecution
default:
return fmt.Errorf("unable to convert transfer task of type %v to cross-cluster task", task.TaskType)
}

// set visibility timestamp here so we the metric for task latency
// can include the latency for the original transfer task.
crossClusterTask.SetVisibilityTimestamp(task.VisibilityTimestamp)
r.mutableState.AddCrossClusterTasks(crossClusterTask)

return nil
}

func (r *mutableStateTaskGeneratorImpl) GenerateActivityTimerTasks(
now time.Time,
) error {
Expand Down
15 changes: 15 additions & 0 deletions service/history/execution/mutable_state_task_generator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

99 changes: 99 additions & 0 deletions service/history/execution/mutable_state_task_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,105 @@ func (s *mutableStateTaskGeneratorSuite) TestIsCrossClusterTask() {
}
}

func (s *mutableStateTaskGeneratorSuite) TestGenerateCrossClusterTaskFromTransferTask() {
targetCluster := cluster.TestAlternativeClusterName
now := time.Now()
testCases := []struct {
tranferTask *persistence.TransferTaskInfo
expectError bool
expectedCrossClusterTask persistence.Task
}{
{
tranferTask: &persistence.TransferTaskInfo{
TaskType: persistence.TransferTaskTypeActivityTask,
},
expectError: true,
},
{
tranferTask: &persistence.TransferTaskInfo{
TaskType: persistence.TransferTaskTypeCancelExecution,
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
TargetChildWorkflowOnly: false,
ScheduleID: int64(123),
},
expectError: false,
expectedCrossClusterTask: &persistence.CrossClusterCancelExecutionTask{
TargetCluster: targetCluster,
CancelExecutionTask: persistence.CancelExecutionTask{
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
TargetChildWorkflowOnly: false,
InitiatedID: int64(123),
},
},
},
{
tranferTask: &persistence.TransferTaskInfo{
TaskType: persistence.TransferTaskTypeSignalExecution,
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
TargetChildWorkflowOnly: false,
ScheduleID: int64(123),
},
expectError: false,
expectedCrossClusterTask: &persistence.CrossClusterSignalExecutionTask{
TargetCluster: targetCluster,
SignalExecutionTask: persistence.SignalExecutionTask{
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
TargetRunID: constants.TestRunID,
TargetChildWorkflowOnly: false,
InitiatedID: int64(123),
},
},
},
{
tranferTask: &persistence.TransferTaskInfo{
TaskType: persistence.TransferTaskTypeStartChildExecution,
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
ScheduleID: int64(123),
},
expectError: false,
expectedCrossClusterTask: &persistence.CrossClusterStartChildExecutionTask{
TargetCluster: targetCluster,
StartChildExecutionTask: persistence.StartChildExecutionTask{
TargetDomainID: constants.TestTargetDomainID,
TargetWorkflowID: constants.TestWorkflowID,
InitiatedID: int64(123),
},
},
},
}

for _, tc := range testCases {
var actualCrossClusterTask persistence.Task
if !tc.expectError {
tc.tranferTask.Version = int64(101)
tc.expectedCrossClusterTask.SetVersion(int64(101))
tc.tranferTask.VisibilityTimestamp = now
tc.expectedCrossClusterTask.SetVisibilityTimestamp(now)

s.mockMutableState.EXPECT().AddCrossClusterTasks(gomock.Any()).Do(
func(crossClusterTasks ...persistence.Task) {
actualCrossClusterTask = crossClusterTasks[0]
},
).MaxTimes(1)
}

err := s.taskGenerator.GenerateCrossClusterTaskFromTransferTask(tc.tranferTask, targetCluster)
if tc.expectError {
s.Error(err)
} else {
s.Equal(tc.expectedCrossClusterTask, actualCrossClusterTask)
}
}
}

func (s *mutableStateTaskGeneratorSuite) TestGetNextDecisionTimeout() {
defaultStartToCloseTimeout := 10 * time.Second
expectedResult := []time.Duration{
Expand Down
5 changes: 4 additions & 1 deletion service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,10 @@ func (s *contextImpl) allocateTransferIDsLocked(
}
s.logger.Debug(fmt.Sprintf("Assigning task ID: %v", id))
task.SetTaskID(id)
task.SetVisibilityTimestamp(now)
// only set task visibility timestamp if it's not set
if task.GetVisibilityTimestamp().IsZero() {
task.SetVisibilityTimestamp(now)
}
*transferMaxReadLevel = id
}
return nil
Expand Down
84 changes: 72 additions & 12 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand Down Expand Up @@ -403,11 +404,18 @@ func (t *transferActiveTaskExecutor) processCancelExecution(
return err
}

targetDomainName, err := t.shard.GetDomainCache().GetDomainName(task.TargetDomainID)
targetDomainEntry, err := t.shard.GetDomainCache().GetDomainByID(task.TargetDomainID)
if err != nil {
// TODO: handle the case where target domain does not exist
return err
}

if targetCluster, isCrossCluster := t.isCrossClusterTask(targetDomainEntry); isCrossCluster {
return t.generateCrossClusterTask(ctx, wfContext, task, targetCluster)
}

targetDomainName := targetDomainEntry.GetInfo().Name

// handle workflow cancel itself
if task.DomainID == task.TargetDomainID && task.WorkflowID == task.TargetWorkflowID {
// it does not matter if the run ID is a mismatch
Expand Down Expand Up @@ -495,11 +503,18 @@ func (t *transferActiveTaskExecutor) processSignalExecution(
return err
}

targetDomainName, err := t.shard.GetDomainCache().GetDomainName(task.TargetDomainID)
targetDomainEntry, err := t.shard.GetDomainCache().GetDomainByID(task.TargetDomainID)
if err != nil {
// TODO: handle the case where target domain does not exist
return err
}

if targetCluster, isCrossCluster := t.isCrossClusterTask(targetDomainEntry); isCrossCluster {
return t.generateCrossClusterTask(ctx, wfContext, task, targetCluster)
}

targetDomainName := targetDomainEntry.GetInfo().Name

// handle workflow signal itself
if task.DomainID == task.TargetDomainID && task.WorkflowID == task.TargetWorkflowID {
// it does not matter if the run ID is a mismatch
Expand Down Expand Up @@ -610,16 +625,6 @@ func (t *transferActiveTaskExecutor) processStartChildExecution(
domainName = task.DomainID
}

// Get target domain name
var targetDomainName string
if targetDomainName, err = t.shard.GetDomainCache().GetDomainName(task.TargetDomainID); err != nil {
if _, ok := err.(*types.EntityNotExistsError); !ok {
return err
}
// it is possible that the domain got deleted. Use domainID instead as this is only needed for the history event
targetDomainName = task.TargetDomainID
}

initiatedEventID := task.ScheduleID
childInfo, ok := mutableState.GetChildExecutionInfo(initiatedEventID)
if !ok {
Expand All @@ -630,6 +635,25 @@ func (t *transferActiveTaskExecutor) processStartChildExecution(
return err
}

// Get target domain name
var targetDomainName string
var targetDomainEntry *cache.DomainCacheEntry
if targetDomainEntry, err = t.shard.GetDomainCache().GetDomainByID(task.TargetDomainID); err != nil {
if _, ok := err.(*types.EntityNotExistsError); !ok {
return err
}
// TODO: handle the case where target domain does not exist

// it is possible that the domain got deleted. Use domainID instead as this is only needed for the history event
targetDomainName = task.TargetDomainID
yycptt marked this conversation as resolved.
Show resolved Hide resolved
} else {
if targetCluster, isCrossCluster := t.isCrossClusterTask(targetDomainEntry); isCrossCluster {
return t.generateCrossClusterTask(ctx, wfContext, task, targetCluster)
}

targetDomainName = targetDomainEntry.GetInfo().Name
}

initiatedEvent, err := mutableState.GetChildExecutionInitiatedEvent(ctx, initiatedEventID)
if err != nil {
return err
Expand Down Expand Up @@ -1171,6 +1195,42 @@ func (t *transferActiveTaskExecutor) signalExternalExecutionFailed(
return err
}

func (t *transferActiveTaskExecutor) isCrossClusterTask(
targetDomainEntry *cache.DomainCacheEntry,
) (string, bool) {
targetCluster := targetDomainEntry.GetReplicationConfig().ActiveClusterName
if targetCluster != t.shard.GetClusterMetadata().GetCurrentClusterName() {
return targetCluster, true
}
return "", false
}

func (t *transferActiveTaskExecutor) generateCrossClusterTask(
ctx context.Context,
wfContext execution.Context,
task *persistence.TransferTaskInfo,
targetCluster string,
) error {
return t.updateWorkflowExecution(
ctx,
wfContext,
false,
func(ctx context.Context, mutableState execution.MutableState) error {
if !mutableState.IsWorkflowExecutionRunning() {
return &types.WorkflowExecutionAlreadyCompletedError{Message: "Workflow execution already completed."}
}

taskGenerator := execution.NewMutableStateTaskGenerator(
t.shard.GetClusterMetadata(),
t.shard.GetDomainCache(),
t.logger,
mutableState,
)
return taskGenerator.GenerateCrossClusterTaskFromTransferTask(task, targetCluster)
},
)
}

func (t *transferActiveTaskExecutor) updateWorkflowExecution(
ctx context.Context,
wfContext execution.Context,
Expand Down
Loading