Skip to content

Commit

Permalink
Allow generating workflow tasks if workflow is non-current (uber#4688)
Browse files Browse the repository at this point in the history
* finish implementation

* support workflow refresh on standby

* fix test

Co-authored-by: Zijian <Shaddoll@users.noreply.github.com>
  • Loading branch information
yycptt and Shaddoll committed Jan 13, 2022
1 parent 27a0df2 commit a5c527f
Show file tree
Hide file tree
Showing 15 changed files with 281 additions and 58 deletions.
5 changes: 5 additions & 0 deletions common/persistence/dataManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ const (
// Update workflow, without current record
// NOTE: current record CANNOT point to the workflow to be updated
UpdateWorkflowModeBypassCurrent
// Update workflow, ignoring current record
// NOTE: current record may or may not point to the workflow
// this mode should only be used for (re-)generating workflow tasks
// and there's no other changes to the workflow
UpdateWorkflowModeIgnoreCurrent
)

// ConflictResolveWorkflowMode conflict resolve mode
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/nosql/nosqlExecutionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ func (d *nosqlExecutionStore) UpdateWorkflowExecution(
var currentWorkflowWriteReq *nosqlplugin.CurrentWorkflowWriteRequest

switch request.Mode {
case p.UpdateWorkflowModeIgnoreCurrent:
currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
WriteMode: nosqlplugin.CurrentWorkflowWriteModeNoop,
}
case p.UpdateWorkflowModeBypassCurrent:
if err := d.assertNotCurrentExecution(
ctx,
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/operationModeValidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,16 @@ func ValidateUpdateWorkflowModeState(
}
return nil

case UpdateWorkflowModeIgnoreCurrent:
if newWorkflowState != nil {
return newInvalidUpdateWorkflowWithNewMode(
mode,
currentWorkflowState,
*newWorkflowState,
)
}
return nil

default:
return &types.InternalServiceError{
Message: fmt.Sprintf("unknown mode: %v", mode),
Expand Down
18 changes: 18 additions & 0 deletions common/persistence/operationModeValidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,24 @@ func (s *validateOperationWorkflowModeStateSuite) TestUpdateMode_BypassCurrent()
}
}

func (s *validateOperationWorkflowModeStateSuite) TestUpdateMode_IgnoreCurrent() {
testMutation := s.newTestWorkflowMutation(WorkflowStateCompleted)
err := ValidateUpdateWorkflowModeState(
UpdateWorkflowModeIgnoreCurrent,
testMutation,
nil,
)
s.NoError(err)

testNewSnapshot := s.newTestWorkflowSnapshot(WorkflowStateRunning)
err = ValidateUpdateWorkflowModeState(
UpdateWorkflowModeIgnoreCurrent,
testMutation,
&testNewSnapshot,
)
s.Error(err)
}

func (s *validateOperationWorkflowModeStateSuite) TestConflictResolveMode_UpdateCurrent() {

// only reset workflow
Expand Down
83 changes: 82 additions & 1 deletion common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,87 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflowExecutionWithZombieState() {
s.Equal(workflowExecutionRunning.GetRunID(), currentRunID)
}

// TestUpdateWorkflowExecutionTasks test
func (s *ExecutionManagerSuite) TestUpdateWorkflowExecutionTasks() {
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()

domainID := "b0a8571c-0257-40ea-afcd-3a14eae181c0"
workflowExecution := types.WorkflowExecution{
WorkflowID: "update-workflow-tasks-test",
RunID: "5ba5e531-e46b-48d9-b4b3-859919839553",
}
task0, err0 := s.CreateWorkflowExecution(ctx, domainID, workflowExecution, "queue1", "wType", 20, 13, nil, 3, 0, 2, nil)
s.NoError(err0)
s.NotNil(task0, "Expected non empty task identifier.")

taskD, err := s.GetTransferTasks(ctx, 1, false)
s.Equal(1, len(taskD), "Expected 1 decision task.")
err = s.CompleteTransferTask(ctx, taskD[0].TaskID)
s.NoError(err)

state1, err := s.GetWorkflowExecutionInfo(ctx, domainID, workflowExecution)
s.NoError(err)
info1 := state1.ExecutionInfo
s.NotNil(info1, "Valid Workflow info expected.")
updatedInfo1 := copyWorkflowExecutionInfo(info1)
updatedStats1 := copyExecutionStats(state1.ExecutionStats)

now := time.Now()
remoteClusterName := "remote-cluster"
transferTasks := []p.Task{
&p.ActivityTask{
VisibilityTimestamp: now,
TaskID: s.GetNextSequenceNumber(),
DomainID: domainID,
TaskList: "some randome tasklist name",
ScheduleID: 123,
Version: common.EmptyVersion,
},
}
timerTasks := []p.Task{
&p.UserTimerTask{
VisibilityTimestamp: now.Add(time.Minute),
TaskID: s.GetNextSequenceNumber(),
EventID: 124,
Version: common.EmptyVersion,
},
}
crossClusterTasks := []p.Task{
&p.CrossClusterApplyParentClosePolicyTask{
ApplyParentClosePolicyTask: p.ApplyParentClosePolicyTask{
VisibilityTimestamp: now,
TaskID: s.GetNextSequenceNumber(),
Version: common.EmptyVersion,
},
TargetCluster: remoteClusterName,
},
}

err = s.UpdateWorkflowExecutionTasks(
ctx,
updatedInfo1,
updatedStats1,
int64(3),
transferTasks,
timerTasks,
crossClusterTasks,
)
s.NoError(err)

loadedTransferTasks, err := s.GetTransferTasks(ctx, 10, true)
s.NoError(err)
s.Len(loadedTransferTasks, len(transferTasks))

loadedTimerTasks, err := s.GetTimerIndexTasks(ctx, 10, true)
s.NoError(err)
s.Len(loadedTimerTasks, len(timerTasks))

loadedCrossClusterTasks, err := s.GetCrossClusterTasks(ctx, remoteClusterName, 0, 10, true)
s.NoError(err)
s.Len(loadedCrossClusterTasks, len(crossClusterTasks))
}

// TestCreateWorkflowExecutionBrandNew test
func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionBrandNew() {
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
Expand Down Expand Up @@ -2016,7 +2097,7 @@ func (s *ExecutionManagerSuite) TestCancelTransferTaskTasks() {
TargetDomainID: targetDomainID,
TargetWorkflowID: targetWorkflowID,
TargetRunID: targetRunID,
TargetChildWorkflowOnly: false,
TargetChildWorkflowOnly: targetChildWorkflowOnly,
InitiatedID: 1,
}}
versionHistory := p.NewVersionHistory([]byte{}, []*p.VersionHistoryItem{
Expand Down
27 changes: 27 additions & 0 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,33 @@ func (s *TestBase) UpdateWorkflowExecutionWithReplication(
return err
}

// UpdateWorkflowExecutionTasks is a utility method to update workflow tasks
// with IgnoreCurrent update mode.
func (s *TestBase) UpdateWorkflowExecutionTasks(
ctx context.Context,
updatedInfo *p.WorkflowExecutionInfo,
updatedStats *p.ExecutionStats,
condition int64,
transferTasks []p.Task,
timerTasks []p.Task,
crossClusterTasks []p.Task,
) error {
_, err := s.ExecutionManager.UpdateWorkflowExecution(ctx, &p.UpdateWorkflowExecutionRequest{
Mode: persistence.UpdateWorkflowModeIgnoreCurrent,
UpdateWorkflowMutation: p.WorkflowMutation{
ExecutionInfo: updatedInfo,
ExecutionStats: updatedStats,
TransferTasks: transferTasks,
TimerTasks: timerTasks,
CrossClusterTasks: crossClusterTasks,
Condition: condition,
},
RangeID: s.ShardInfo.RangeID,
Encoding: pickRandomEncoding(),
})
return err
}

// UpdateWorkflowExecutionWithTransferTasks is a utility method to update workflow execution
func (s *TestBase) UpdateWorkflowExecutionWithTransferTasks(
ctx context.Context,
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/sql/sqlExecutionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ func (m *sqlExecutionStore) updateWorkflowExecutionTx(
}

switch request.Mode {
case p.UpdateWorkflowModeIgnoreCurrent:
// no-op
case p.UpdateWorkflowModeBypassCurrent:
if err := assertNotCurrentExecution(
ctx,
Expand Down
60 changes: 60 additions & 0 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ type (
currentWorkflowTransactionPolicy TransactionPolicy,
newWorkflowTransactionPolicy *TransactionPolicy,
) error
UpdateWorkflowExecutionTasks(
ctx context.Context,
now time.Time,
) error
}
)

Expand Down Expand Up @@ -601,6 +605,62 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNewAsPassive(
)
}

func (c *contextImpl) UpdateWorkflowExecutionTasks(
ctx context.Context,
now time.Time,
) (retError error) {

defer func() {
if retError != nil {
c.Clear()
}
}()

currentWorkflow, currentWorkflowEventsSeq, err := c.mutableState.CloseTransactionAsMutation(
now,
TransactionPolicyPassive,
)
if err != nil {
return err
}

if len(currentWorkflowEventsSeq) != 0 {
return types.InternalServiceError{
Message: "UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new history events",
}
}
currentWorkflow.ExecutionStats = &persistence.ExecutionStats{
HistorySize: c.GetHistorySize(),
}

resp, err := c.updateWorkflowExecutionWithRetry(ctx, &persistence.UpdateWorkflowExecutionRequest{
// RangeID , this is set by shard context
Mode: persistence.UpdateWorkflowModeIgnoreCurrent,
UpdateWorkflowMutation: *currentWorkflow,
// Encoding, this is set by shard context
})
if err != nil {
if c.isPersistenceTimeoutError(err) {
c.notifyTasksFromWorkflowMutation(currentWorkflow)
}
return err
}

// TODO remove updateCondition in favor of condition in mutable state
c.updateCondition = currentWorkflow.ExecutionInfo.NextEventID

// notify current workflow tasks
c.notifyTasksFromWorkflowMutation(currentWorkflow)

emitSessionUpdateStats(
c.metricsClient,
c.GetDomainName(),
resp.MutableStateUpdateSessionStats,
)

return nil
}

func (c *contextImpl) UpdateWorkflowExecutionWithNew(
ctx context.Context,
now time.Time,
Expand Down
14 changes: 14 additions & 0 deletions service/history/execution/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4027,7 +4027,7 @@ func (e *mutableStateBuilder) CloseTransactionAsMutation(
}

e.checksum = checksum
if err := e.cleanupTransaction(transactionPolicy); err != nil {
if err := e.cleanupTransaction(); err != nil {
return nil, nil, err
}
return workflowMutation, workflowEventsSeq, nil
Expand Down Expand Up @@ -4105,7 +4105,7 @@ func (e *mutableStateBuilder) CloseTransactionAsSnapshot(
}

e.checksum = checksum
if err := e.cleanupTransaction(transactionPolicy); err != nil {
if err := e.cleanupTransaction(); err != nil {
return nil, nil, err
}
return workflowSnapshot, workflowEventsSeq, nil
Expand Down Expand Up @@ -4165,9 +4165,7 @@ func (e *mutableStateBuilder) prepareCloseTransaction(
)
}

func (e *mutableStateBuilder) cleanupTransaction(
transactionPolicy TransactionPolicy,
) error {
func (e *mutableStateBuilder) cleanupTransaction() error {

// Clear all updates to prepare for the next session
e.hBuilder = NewHistoryBuilder(e, e.logger)
Expand Down
4 changes: 2 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3301,7 +3301,7 @@ func (e *historyEngineImpl) RefreshWorkflowTasks(
domainUUID string,
workflowExecution types.WorkflowExecution,
) (retError error) {
domainEntry, err := e.shard.GetDomainCache().GetActiveDomainByID(domainUUID)
domainEntry, err := e.shard.GetDomainCache().GetDomainByID(domainUUID)
if err != nil {
return err
}
Expand Down Expand Up @@ -3332,7 +3332,7 @@ func (e *historyEngineImpl) RefreshWorkflowTasks(
return err
}

err = wfContext.UpdateWorkflowExecutionAsActive(ctx, e.shard.GetTimeSource().Now())
err = wfContext.UpdateWorkflowExecutionTasks(ctx, e.shard.GetTimeSource().Now())
if err != nil {
return err
}
Expand Down
11 changes: 1 addition & 10 deletions service/history/task/cross_cluster_source_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,16 +531,7 @@ func (t *crossClusterSourceTaskExecutor) generateNewTask(
return err
}

now := t.shard.GetTimeSource().Now()
isActive := clusterMetadata.ClusterNameForFailoverVersion(mutableState.GetCurrentVersion()) ==
clusterMetadata.GetCurrentClusterName()

var err error
if isActive {
err = wfContext.UpdateWorkflowExecutionAsActive(ctx, now)
} else {
err = wfContext.UpdateWorkflowExecutionAsPassive(ctx, now)
}
err := wfContext.UpdateWorkflowExecutionTasks(ctx, t.shard.GetTimeSource().Now())
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit a5c527f

Please sign in to comment.