Skip to content

Commit

Permalink
Fix NDC events reapplication transaction policy (#2602)
Browse files Browse the repository at this point in the history
* NDC transaction manager backfill events should use active transaction policy if target workflow is current workflow & target workflow is active
* Fix NDC transaction manager target workflow transient from zombie to non-zombie state
* Add more unit test for NDC transaction manager
* Fix null pointer in history engine reapply events API
  • Loading branch information
wxing1292 authored Sep 25, 2019
1 parent 6d5494c commit f6a0247
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 15 deletions.
16 changes: 9 additions & 7 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2452,24 +2452,26 @@ func (e *historyEngineImpl) ReapplyEvents(
domainID,
execution,
func(msBuilder mutableState, tBuilder *timerBuilder) (*updateWorkflowAction, error) {
createDecisionTask := true

postActions := &updateWorkflowAction{
createDecision: true,
}
// Do not create decision task when the workflow is cron and the cron has not been started yet
if msBuilder.GetExecutionInfo().CronSchedule != "" && !msBuilder.HasProcessedOrPendingDecision() {
createDecisionTask = false
postActions.createDecision = false
}
// TODO when https://github.com/uber/cadence/issues/2420 is finished
// reset to workflow finish event
// ignore this case for now
if !msBuilder.IsWorkflowExecutionRunning() {
e.logger.Warn("failed to reapply event to a finished workflow",
e.logger.Warn("cannot reapply event to a finished workflow",
tag.WorkflowDomainID(domainID),
tag.WorkflowID(workflowID),
)
e.metricsClient.IncCounter(metrics.HistoryReapplyEventsScope, metrics.EventReapplySkippedCount)
return nil, nil
}
postActions := &updateWorkflowAction{
createDecision: createDecisionTask,
return &updateWorkflowAction{
noop: true,
}, nil
}
if err := e.eventsReapplier.reapplyEvents(
ctx,
Expand Down
14 changes: 11 additions & 3 deletions service/history/nDCTransactionMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func (r *nDCTransactionMgrImpl) backfillWorkflow(
}

mode := persistence.UpdateWorkflowModeUpdateCurrent
transactionPolicy := transactionPolicyPassive
// since we are not rebuilding the mutable state then we
// can trust the result from IsCurrentWorkflowGuaranteed
if !targetWorkflow.getMutableState().IsCurrentWorkflowGuaranteed() {
Expand Down Expand Up @@ -265,9 +266,9 @@ func (r *nDCTransactionMgrImpl) backfillWorkflow(
// target workflow is active && target workflow is current workflow
// we need to reapply events here, rather than using reapplyEvents
// within workflow execution context, or otherwise deadlock will appear

if targetWorkflow.getMutableState().IsCurrentWorkflowGuaranteed() {
// case 1
transactionPolicy = transactionPolicyActive
if err := r.eventsReapplier.reapplyEvents(
ctx,
targetWorkflow.getMutableState(),
Expand All @@ -277,11 +278,18 @@ func (r *nDCTransactionMgrImpl) backfillWorkflow(
}
} else if mode == persistence.UpdateWorkflowModeUpdateCurrent {
// case 2
r.logger.Warn("failed to reapply event to a finished workflow",
transactionPolicy = transactionPolicyActive
r.logger.Warn("cannot reapply event to a finished workflow",
tag.WorkflowDomainID(targetWorkflowEvents.DomainID),
tag.WorkflowID(targetWorkflowEvents.WorkflowID),
)
r.metricsClient.IncCounter(metrics.HistoryReapplyEventsScope, metrics.EventReapplySkippedCount)
// TODO when https://github.com/uber/cadence/issues/2420 is finished
// reset to workflow finish event and reapply to new resetted workflow by using
// transactionPolicyActive, ignore this case for now
} else {
// target workflow is active but not being pointed by current record
// do not need to handle events reapplication here
}
}

Expand All @@ -290,7 +298,7 @@ func (r *nDCTransactionMgrImpl) backfillWorkflow(
mode,
nil,
nil,
transactionPolicyPassive,
transactionPolicy,
nil,
)
}
Expand Down
3 changes: 3 additions & 0 deletions service/history/nDCTransactionMgrForExistingWorkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) suppressCurrentAndUpdateAsCur
if err != nil {
return err
}
if err := targetWorkflow.revive(); err != nil {
return err
}
}

var newContext workflowExecutionContext
Expand Down
2 changes: 2 additions & 0 deletions service/history/nDCTransactionMgrForExistingWorkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf
currentWorkflowPolicy := transactionPolicyPassive
currentMutableState.On("IsWorkflowExecutionRunning").Return(true)
currentWorkflow.EXPECT().suppressWorkflowBy(targetWorkflow).Return(currentWorkflowPolicy, nil).Times(1)
targetWorkflow.EXPECT().revive().Return(nil).Times(1)

targetContext.On(
"conflictResolveWorkflowExecution",
Expand Down Expand Up @@ -426,6 +427,7 @@ func (s *nDCTransactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkf
currentWorkflowPolicy := transactionPolicyActive
currentMutableState.On("IsWorkflowExecutionRunning").Return(true)
currentWorkflow.EXPECT().suppressWorkflowBy(targetWorkflow).Return(currentWorkflowPolicy, nil).Times(1)
targetWorkflow.EXPECT().revive().Return(nil).Times(1)

targetContext.On(
"conflictResolveWorkflowExecution",
Expand Down
3 changes: 3 additions & 0 deletions service/history/nDCTransactionMgrForNewWorkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) suppressCurrentAndCreateAsCurrent(
if err != nil {
return err
}
if err := targetWorkflow.revive(); err != nil {
return err
}

return currentWorkflow.getContext().updateWorkflowExecutionWithNew(
now,
Expand Down
1 change: 1 addition & 0 deletions service/history/nDCTransactionMgrForNewWorkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func (s *nDCTransactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_Suppre
currentMutableState.On("IsWorkflowExecutionRunning").Return(true)
currentWorkflowPolicy := transactionPolicyActive
currentWorkflow.EXPECT().suppressWorkflowBy(targetWorkflow).Return(currentWorkflowPolicy, nil).Times(1)
targetWorkflow.EXPECT().revive().Return(nil).Times(1)

currentContext.On(
"updateWorkflowExecutionWithNew",
Expand Down
120 changes: 115 additions & 5 deletions service/history/nDCTransactionMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (s *nDCTransactionMgrSuite) TestUpdateWorkflow() {
s.NoError(err)
}

func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentGuaranteed_ReapplyEvents() {
func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentGuaranteed_Active_ReapplyEvents() {
ctx := ctx.Background()
now := time.Now()
currentVersion := int64(1234)
Expand Down Expand Up @@ -167,14 +167,14 @@ func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentGuaranteed_ReapplyE
"persistNonFirstWorkflowEvents", workflowEvents,
).Return(int64(0), nil).Once()
context.On(
"updateWorkflowExecutionWithNew", now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, transactionPolicyPassive, (*transactionPolicy)(nil),
"updateWorkflowExecutionWithNew", now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, transactionPolicyActive, (*transactionPolicy)(nil),
).Return(nil).Once()
err := s.transactionMgr.backfillWorkflow(ctx, now, workflow, workflowEvents)
s.NoError(err)
s.True(releaseCalled)
}

func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentGuaranteed_NoReapplyEvents() {
func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentGuaranteed_Passive_NoReapplyEvents() {
ctx := ctx.Background()
now := time.Now()
currentVersion := int64(1234)
Expand Down Expand Up @@ -211,7 +211,7 @@ func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentGuaranteed_NoReappl
s.True(releaseCalled)
}

func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_NotCurrent() {
func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_NotCurrent_Active() {
ctx := ctx.Background()
now := time.Now()
currentVersion := int64(1234)
Expand Down Expand Up @@ -269,7 +269,65 @@ func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_NotCurrent() {
s.True(releaseCalled)
}

func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_Current() {
func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_NotCurrent_Passive() {
ctx := ctx.Background()
now := time.Now()
currentVersion := int64(1234)

domainID := "some random domain ID"
workflowID := "some random workflow ID"
runID := "some random run ID"
currentRunID := "other random run ID"

releaseCalled := false

workflow := NewMocknDCWorkflow(s.controller)
context := &mockWorkflowExecutionContext{}
defer context.AssertExpectations(s.T())
mutableState := &mockMutableState{}
defer mutableState.AssertExpectations(s.T())
var releaseFn releaseWorkflowExecutionFunc = func(error) { releaseCalled = true }

workflowEvents := &persistence.WorkflowEvents{
Events: []*shared.HistoryEvent{{
EventType: common.EventTypePtr(shared.EventTypeWorkflowExecutionSignaled),
}},
DomainID: domainID,
WorkflowID: workflowID,
}

workflow.EXPECT().getContext().Return(context).AnyTimes()
workflow.EXPECT().getMutableState().Return(mutableState).AnyTimes()
workflow.EXPECT().getReleaseFn().Return(releaseFn).AnyTimes()

s.mockClusterMetadata.On("ClusterNameForFailoverVersion", currentVersion).Return(cluster.TestCurrentClusterName)
s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestAlternativeClusterName)

mutableState.On("IsCurrentWorkflowGuaranteed").Return(false)
mutableState.On("GetCurrentVersion").Return(currentVersion)
mutableState.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
})

s.mockExecutionMgr.On("GetCurrentExecution", &persistence.GetCurrentExecutionRequest{
DomainID: domainID,
WorkflowID: workflowID,
}).Return(&persistence.GetCurrentExecutionResponse{RunID: currentRunID}, nil).Once()

context.On(
"persistNonFirstWorkflowEvents", workflowEvents,
).Return(int64(0), nil).Once()
context.On(
"updateWorkflowExecutionWithNew", now, persistence.UpdateWorkflowModeBypassCurrent, nil, nil, transactionPolicyPassive, (*transactionPolicy)(nil),
).Return(nil).Once()
err := s.transactionMgr.backfillWorkflow(ctx, now, workflow, workflowEvents)
s.NoError(err)
s.True(releaseCalled)
}

func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_Current_Active() {
ctx := ctx.Background()
now := time.Now()
currentVersion := int64(1234)
Expand Down Expand Up @@ -309,6 +367,58 @@ func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_Current() {
WorkflowID: workflowID,
}).Return(&persistence.GetCurrentExecutionResponse{RunID: runID}, nil).Once()

context.On(
"persistNonFirstWorkflowEvents", workflowEvents,
).Return(int64(0), nil).Once()
context.On(
"updateWorkflowExecutionWithNew", now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, transactionPolicyActive, (*transactionPolicy)(nil),
).Return(nil).Once()

err := s.transactionMgr.backfillWorkflow(ctx, now, workflow, workflowEvents)
s.NoError(err)
s.True(releaseCalled)
}

func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CheckDB_Current_Passive() {
ctx := ctx.Background()
now := time.Now()
currentVersion := int64(1234)

domainID := "some random domain ID"
workflowID := "some random workflow ID"
runID := "some random run ID"

releaseCalled := false

workflow := NewMocknDCWorkflow(s.controller)
context := &mockWorkflowExecutionContext{}
defer context.AssertExpectations(s.T())
mutableState := &mockMutableState{}
defer mutableState.AssertExpectations(s.T())
var releaseFn releaseWorkflowExecutionFunc = func(error) { releaseCalled = true }

workflowEvents := &persistence.WorkflowEvents{}

workflow.EXPECT().getContext().Return(context).AnyTimes()
workflow.EXPECT().getMutableState().Return(mutableState).AnyTimes()
workflow.EXPECT().getReleaseFn().Return(releaseFn).AnyTimes()

s.mockClusterMetadata.On("ClusterNameForFailoverVersion", currentVersion).Return(cluster.TestCurrentClusterName)
s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestAlternativeClusterName)

mutableState.On("IsCurrentWorkflowGuaranteed").Return(false)
mutableState.On("GetCurrentVersion").Return(currentVersion)
mutableState.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
})

s.mockExecutionMgr.On("GetCurrentExecution", &persistence.GetCurrentExecutionRequest{
DomainID: domainID,
WorkflowID: workflowID,
}).Return(&persistence.GetCurrentExecutionResponse{RunID: runID}, nil).Once()

context.On(
"persistNonFirstWorkflowEvents", workflowEvents,
).Return(int64(0), nil).Once()
Expand Down
22 changes: 22 additions & 0 deletions service/history/nDCWorkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type (
getReleaseFn() releaseWorkflowExecutionFunc
getVectorClock() (int64, int64, error)
happensAfter(that nDCWorkflow) (bool, error)
revive() error
suppressWorkflowBy(incomingWorkflow nDCWorkflow) (transactionPolicy, error)
flushBufferedEvents() error
}
Expand Down Expand Up @@ -126,6 +127,27 @@ func (r *nDCWorkflowImpl) happensAfter(
), nil
}

func (r *nDCWorkflowImpl) revive() error {

state, _ := r.mutableState.GetWorkflowStateCloseStatus()
if state != persistence.WorkflowStateZombie {
return nil
} else if state == persistence.WorkflowStateCompleted {
// workflow already finished
return nil
}

// workflow is in zombie state, need to set the state correctly accordingly
state = persistence.WorkflowStateCreated
if r.mutableState.HasProcessedOrPendingDecision() {
state = persistence.WorkflowStateRunning
}
return r.mutableState.UpdateWorkflowStateCloseStatus(
state,
persistence.WorkflowCloseStatusNone,
)
}

func (r *nDCWorkflowImpl) suppressWorkflowBy(
incomingWorkflow nDCWorkflow,
) (transactionPolicy, error) {
Expand Down
14 changes: 14 additions & 0 deletions service/history/nDCWorkflow_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f6a0247

Please sign in to comment.