diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 76c5f4cc914..354f9f41aa4 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -2452,24 +2452,26 @@ func (e *historyEngineImpl) ReapplyEvents( domainID, execution, func(msBuilder mutableState, tBuilder *timerBuilder) (*updateWorkflowAction, error) { - createDecisionTask := true + + postActions := &updateWorkflowAction{ + createDecision: true, + } // Do not create decision task when the workflow is cron and the cron has not been started yet if msBuilder.GetExecutionInfo().CronSchedule != "" && !msBuilder.HasProcessedOrPendingDecision() { - createDecisionTask = false + postActions.createDecision = false } // TODO when https://github.com/uber/cadence/issues/2420 is finished // reset to workflow finish event // ignore this case for now if !msBuilder.IsWorkflowExecutionRunning() { - e.logger.Warn("failed to reapply event to a finished workflow", + e.logger.Warn("cannot reapply event to a finished workflow", tag.WorkflowDomainID(domainID), tag.WorkflowID(workflowID), ) e.metricsClient.IncCounter(metrics.HistoryReapplyEventsScope, metrics.EventReapplySkippedCount) - return nil, nil - } - postActions := &updateWorkflowAction{ - createDecision: createDecisionTask, + return &updateWorkflowAction{ + noop: true, + }, nil } if err := e.eventsReapplier.reapplyEvents( ctx, diff --git a/service/history/nDCTransactionMgr.go b/service/history/nDCTransactionMgr.go index 246bd8284c3..80e64640ac2 100644 --- a/service/history/nDCTransactionMgr.go +++ b/service/history/nDCTransactionMgr.go @@ -227,6 +227,7 @@ func (r *nDCTransactionMgrImpl) backfillWorkflow( } mode := persistence.UpdateWorkflowModeUpdateCurrent + transactionPolicy := transactionPolicyPassive // since we are not rebuilding the mutable state then we // can trust the result from IsCurrentWorkflowGuaranteed if !targetWorkflow.getMutableState().IsCurrentWorkflowGuaranteed() { @@ -265,9 +266,9 @@ func (r *nDCTransactionMgrImpl) backfillWorkflow( // target workflow is active && target workflow is current workflow // we need to reapply events here, rather than using reapplyEvents // within workflow execution context, or otherwise deadlock will appear - if targetWorkflow.getMutableState().IsCurrentWorkflowGuaranteed() { // case 1 + transactionPolicy = transactionPolicyActive if err := r.eventsReapplier.reapplyEvents( ctx, targetWorkflow.getMutableState(), @@ -277,11 +278,18 @@ func (r *nDCTransactionMgrImpl) backfillWorkflow( } } else if mode == persistence.UpdateWorkflowModeUpdateCurrent { // case 2 - r.logger.Warn("failed to reapply event to a finished workflow", + transactionPolicy = transactionPolicyActive + r.logger.Warn("cannot reapply event to a finished workflow", tag.WorkflowDomainID(targetWorkflowEvents.DomainID), tag.WorkflowID(targetWorkflowEvents.WorkflowID), ) r.metricsClient.IncCounter(metrics.HistoryReapplyEventsScope, metrics.EventReapplySkippedCount) + // TODO when https://github.com/uber/cadence/issues/2420 is finished + // reset to workflow finish event and reapply to new resetted workflow by using + // transactionPolicyActive, ignore this case for now + } else { + // target workflow is active but not being pointed by current record + // do not need to handle events reapplication here } } @@ -290,7 +298,7 @@ func (r *nDCTransactionMgrImpl) backfillWorkflow( mode, nil, nil, - transactionPolicyPassive, + transactionPolicy, nil, ) } diff --git a/service/history/nDCTransactionMgrForExistingWorkflow.go b/service/history/nDCTransactionMgrForExistingWorkflow.go index 44ed5533337..c997447fb64 100644 --- a/service/history/nDCTransactionMgrForExistingWorkflow.go +++ b/service/history/nDCTransactionMgrForExistingWorkflow.go @@ -314,6 +314,9 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) suppressCurrentAndUpdateAsCur if err != nil { return err } + if err := targetWorkflow.revive(); err != nil { + return err + } } var newContext workflowExecutionContext diff --git a/service/history/nDCTransactionMgrForExistingWorkflow_test.go b/service/history/nDCTransactionMgrForExistingWorkflow_test.go index a70964f886a..a7bf4d7ada5 100644 --- a/service/history/nDCTransactionMgrForExistingWorkflow_test.go +++ b/service/history/nDCTransactionMgrForExistingWorkflow_test.go @@ -216,6 +216,7 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf currentWorkflowPolicy := transactionPolicyPassive currentMutableState.On("IsWorkflowExecutionRunning").Return(true) currentWorkflow.EXPECT().suppressWorkflowBy(targetWorkflow).Return(currentWorkflowPolicy, nil).Times(1) + targetWorkflow.EXPECT().revive().Return(nil).Times(1) targetContext.On( "conflictResolveWorkflowExecution", @@ -426,6 +427,7 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf currentWorkflowPolicy := transactionPolicyActive currentMutableState.On("IsWorkflowExecutionRunning").Return(true) currentWorkflow.EXPECT().suppressWorkflowBy(targetWorkflow).Return(currentWorkflowPolicy, nil).Times(1) + targetWorkflow.EXPECT().revive().Return(nil).Times(1) targetContext.On( "conflictResolveWorkflowExecution", diff --git a/service/history/nDCTransactionMgrForNewWorkflow.go b/service/history/nDCTransactionMgrForNewWorkflow.go index 966dbf53819..5d4d2756052 100644 --- a/service/history/nDCTransactionMgrForNewWorkflow.go +++ b/service/history/nDCTransactionMgrForNewWorkflow.go @@ -262,6 +262,9 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) suppressCurrentAndCreateAsCurrent( if err != nil { return err } + if err := targetWorkflow.revive(); err != nil { + return err + } return currentWorkflow.getContext().updateWorkflowExecutionWithNew( now, diff --git a/service/history/nDCTransactionMgrForNewWorkflow_test.go b/service/history/nDCTransactionMgrForNewWorkflow_test.go index c0d903f60b6..a4cf884d649 100644 --- a/service/history/nDCTransactionMgrForNewWorkflow_test.go +++ b/service/history/nDCTransactionMgrForNewWorkflow_test.go @@ -353,6 +353,7 @@ func (s *nDCTransactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_Suppre currentMutableState.On("IsWorkflowExecutionRunning").Return(true) currentWorkflowPolicy := transactionPolicyActive currentWorkflow.EXPECT().suppressWorkflowBy(targetWorkflow).Return(currentWorkflowPolicy, nil).Times(1) + targetWorkflow.EXPECT().revive().Return(nil).Times(1) currentContext.On( "updateWorkflowExecutionWithNew", diff --git a/service/history/nDCTransactionMgr_test.go b/service/history/nDCTransactionMgr_test.go index 7cbcece4994..c99052c66c3 100644 --- a/service/history/nDCTransactionMgr_test.go +++ b/service/history/nDCTransactionMgr_test.go @@ -136,7 +136,7 @@ func (s *nDCTransactionMgrSuite) TestUpdateWorkflow() { s.NoError(err) } -func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentGuaranteed_ReapplyEvents() { +func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentGuaranteed_Active_ReapplyEvents() { ctx := ctx.Background() now := time.Now() currentVersion := int64(1234) @@ -167,14 +167,14 @@ func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentGuaranteed_ReapplyE "persistNonFirstWorkflowEvents", workflowEvents, ).Return(int64(0), nil).Once() context.On( - "updateWorkflowExecutionWithNew", now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, transactionPolicyPassive, (*transactionPolicy)(nil), + "updateWorkflowExecutionWithNew", now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, transactionPolicyActive, (*transactionPolicy)(nil), ).Return(nil).Once() err := s.transactionMgr.backfillWorkflow(ctx, now, workflow, workflowEvents) s.NoError(err) s.True(releaseCalled) } -func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentGuaranteed_NoReapplyEvents() { +func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentGuaranteed_Passive_NoReapplyEvents() { ctx := ctx.Background() now := time.Now() currentVersion := int64(1234) @@ -211,7 +211,7 @@ func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentGuaranteed_NoReappl s.True(releaseCalled) } -func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_NotCurrent() { +func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_NotCurrent_Active() { ctx := ctx.Background() now := time.Now() currentVersion := int64(1234) @@ -269,7 +269,65 @@ func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_NotCurrent() { s.True(releaseCalled) } -func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_Current() { +func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_NotCurrent_Passive() { + ctx := ctx.Background() + now := time.Now() + currentVersion := int64(1234) + + domainID := "some random domain ID" + workflowID := "some random workflow ID" + runID := "some random run ID" + currentRunID := "other random run ID" + + releaseCalled := false + + workflow := NewMocknDCWorkflow(s.controller) + context := &mockWorkflowExecutionContext{} + defer context.AssertExpectations(s.T()) + mutableState := &mockMutableState{} + defer mutableState.AssertExpectations(s.T()) + var releaseFn releaseWorkflowExecutionFunc = func(error) { releaseCalled = true } + + workflowEvents := &persistence.WorkflowEvents{ + Events: []*shared.HistoryEvent{{ + EventType: common.EventTypePtr(shared.EventTypeWorkflowExecutionSignaled), + }}, + DomainID: domainID, + WorkflowID: workflowID, + } + + workflow.EXPECT().getContext().Return(context).AnyTimes() + workflow.EXPECT().getMutableState().Return(mutableState).AnyTimes() + workflow.EXPECT().getReleaseFn().Return(releaseFn).AnyTimes() + + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", currentVersion).Return(cluster.TestCurrentClusterName) + s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestAlternativeClusterName) + + mutableState.On("IsCurrentWorkflowGuaranteed").Return(false) + mutableState.On("GetCurrentVersion").Return(currentVersion) + mutableState.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{ + DomainID: domainID, + WorkflowID: workflowID, + RunID: runID, + }) + + s.mockExecutionMgr.On("GetCurrentExecution", &persistence.GetCurrentExecutionRequest{ + DomainID: domainID, + WorkflowID: workflowID, + }).Return(&persistence.GetCurrentExecutionResponse{RunID: currentRunID}, nil).Once() + + context.On( + "persistNonFirstWorkflowEvents", workflowEvents, + ).Return(int64(0), nil).Once() + context.On( + "updateWorkflowExecutionWithNew", now, persistence.UpdateWorkflowModeBypassCurrent, nil, nil, transactionPolicyPassive, (*transactionPolicy)(nil), + ).Return(nil).Once() + err := s.transactionMgr.backfillWorkflow(ctx, now, workflow, workflowEvents) + s.NoError(err) + s.True(releaseCalled) +} + +func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_Current_Active() { ctx := ctx.Background() now := time.Now() currentVersion := int64(1234) @@ -309,6 +367,58 @@ func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_Current() { WorkflowID: workflowID, }).Return(&persistence.GetCurrentExecutionResponse{RunID: runID}, nil).Once() + context.On( + "persistNonFirstWorkflowEvents", workflowEvents, + ).Return(int64(0), nil).Once() + context.On( + "updateWorkflowExecutionWithNew", now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, transactionPolicyActive, (*transactionPolicy)(nil), + ).Return(nil).Once() + + err := s.transactionMgr.backfillWorkflow(ctx, now, workflow, workflowEvents) + s.NoError(err) + s.True(releaseCalled) +} + +func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_Current_Passive() { + ctx := ctx.Background() + now := time.Now() + currentVersion := int64(1234) + + domainID := "some random domain ID" + workflowID := "some random workflow ID" + runID := "some random run ID" + + releaseCalled := false + + workflow := NewMocknDCWorkflow(s.controller) + context := &mockWorkflowExecutionContext{} + defer context.AssertExpectations(s.T()) + mutableState := &mockMutableState{} + defer mutableState.AssertExpectations(s.T()) + var releaseFn releaseWorkflowExecutionFunc = func(error) { releaseCalled = true } + + workflowEvents := &persistence.WorkflowEvents{} + + workflow.EXPECT().getContext().Return(context).AnyTimes() + workflow.EXPECT().getMutableState().Return(mutableState).AnyTimes() + workflow.EXPECT().getReleaseFn().Return(releaseFn).AnyTimes() + + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", currentVersion).Return(cluster.TestCurrentClusterName) + s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestAlternativeClusterName) + + mutableState.On("IsCurrentWorkflowGuaranteed").Return(false) + mutableState.On("GetCurrentVersion").Return(currentVersion) + mutableState.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{ + DomainID: domainID, + WorkflowID: workflowID, + RunID: runID, + }) + + s.mockExecutionMgr.On("GetCurrentExecution", &persistence.GetCurrentExecutionRequest{ + DomainID: domainID, + WorkflowID: workflowID, + }).Return(&persistence.GetCurrentExecutionResponse{RunID: runID}, nil).Once() + context.On( "persistNonFirstWorkflowEvents", workflowEvents, ).Return(int64(0), nil).Once() diff --git a/service/history/nDCWorkflow.go b/service/history/nDCWorkflow.go index e2bce16b0a0..b9d4d16ce6e 100644 --- a/service/history/nDCWorkflow.go +++ b/service/history/nDCWorkflow.go @@ -40,6 +40,7 @@ type ( getReleaseFn() releaseWorkflowExecutionFunc getVectorClock() (int64, int64, error) happensAfter(that nDCWorkflow) (bool, error) + revive() error suppressWorkflowBy(incomingWorkflow nDCWorkflow) (transactionPolicy, error) flushBufferedEvents() error } @@ -126,6 +127,27 @@ func (r *nDCWorkflowImpl) happensAfter( ), nil } +func (r *nDCWorkflowImpl) revive() error { + + state, _ := r.mutableState.GetWorkflowStateCloseStatus() + if state != persistence.WorkflowStateZombie { + return nil + } else if state == persistence.WorkflowStateCompleted { + // workflow already finished + return nil + } + + // workflow is in zombie state, need to set the state correctly accordingly + state = persistence.WorkflowStateCreated + if r.mutableState.HasProcessedOrPendingDecision() { + state = persistence.WorkflowStateRunning + } + return r.mutableState.UpdateWorkflowStateCloseStatus( + state, + persistence.WorkflowCloseStatusNone, + ) +} + func (r *nDCWorkflowImpl) suppressWorkflowBy( incomingWorkflow nDCWorkflow, ) (transactionPolicy, error) { diff --git a/service/history/nDCWorkflow_mock.go b/service/history/nDCWorkflow_mock.go index 7d076377dec..447ddea6953 100644 --- a/service/history/nDCWorkflow_mock.go +++ b/service/history/nDCWorkflow_mock.go @@ -128,6 +128,20 @@ func (mr *MocknDCWorkflowMockRecorder) happensAfter(that interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "happensAfter", reflect.TypeOf((*MocknDCWorkflow)(nil).happensAfter), that) } +// revive mocks base method +func (m *MocknDCWorkflow) revive() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "revive") + ret0, _ := ret[0].(error) + return ret0 +} + +// revive indicates an expected call of revive +func (mr *MocknDCWorkflowMockRecorder) revive() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "revive", reflect.TypeOf((*MocknDCWorkflow)(nil).revive)) +} + // suppressWorkflowBy mocks base method func (m *MocknDCWorkflow) suppressWorkflowBy(incomingWorkflow nDCWorkflow) (transactionPolicy, error) { m.ctrl.T.Helper()