From b4b4686cf9b97bc88d0201e6f477deda49f58e84 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Thu, 21 Nov 2019 12:15:09 -0800 Subject: [PATCH] Adding a layer to ignore reapplied events (#2787) * Adding logic to handle duplicate reapply events --- common/definition/resourceDeduplication.go | 90 +++++++++++++ .../definition/resourceDeduplication_test.go | 65 +++++++++ service/history/handler.go | 5 +- service/history/historyEngine.go | 21 ++- service/history/historyEngine_mock.go | 8 +- service/history/mutableState.go | 3 + service/history/mutableStateBuilder.go | 19 +++ service/history/mutableStateBuilder_test.go | 13 ++ service/history/mutableState_mock.go | 27 ++++ service/history/nDCEventsReapplier.go | 34 +++-- service/history/nDCEventsReapplier_mock.go | 13 +- service/history/nDCEventsReapplier_test.go | 123 +++++++++++++++++- service/history/nDCTransactionMgr.go | 6 +- service/history/nDCTransactionMgr_test.go | 5 +- service/history/workflowExecutionContext.go | 5 +- 15 files changed, 400 insertions(+), 37 deletions(-) create mode 100644 common/definition/resourceDeduplication.go create mode 100644 common/definition/resourceDeduplication_test.go diff --git a/common/definition/resourceDeduplication.go b/common/definition/resourceDeduplication.go new file mode 100644 index 00000000000..f7ac2e0bcc9 --- /dev/null +++ b/common/definition/resourceDeduplication.go @@ -0,0 +1,90 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package definition + +import ( + "fmt" +) + +const ( + resourceIDTemplate = "%v::%v" + eventReappliedIDTemplate = "%v::%v::%v" +) + +type ( + // DeduplicationID uses to generate id for deduplication + DeduplicationID interface { + GetID() string + } +) + +// Deduplication id type +const ( + eventReappliedID = iota +) + +// Deduplication resource struct +type ( + // EventReappliedID is the deduplication resource for reapply event + EventReappliedID struct { + id string + } +) + +// NewEventReappliedID returns EventReappliedID resource +func NewEventReappliedID( + runID string, + eventID int64, + version int64, +) EventReappliedID { + + newID := fmt.Sprintf( + eventReappliedIDTemplate, + runID, + eventID, + version, + ) + return EventReappliedID{ + id: newID, + } +} + +// GetID returns id of EventReappliedID +func (e EventReappliedID) GetID() string { + return e.id +} + +// GenerateDeduplicationKey generates deduplication key +func GenerateDeduplicationKey( + resource DeduplicationID, +) string { + + switch resource.(type) { + case EventReappliedID: + return generateKey(eventReappliedID, resource.GetID()) + default: + panic("unsupported deduplication key") + } +} + +func generateKey(resourceType int32, id string) string { + return fmt.Sprintf(resourceIDTemplate, resourceType, id) +} diff --git a/common/definition/resourceDeduplication_test.go b/common/definition/resourceDeduplication_test.go new file mode 100644 index 00000000000..58da3646661 --- /dev/null +++ b/common/definition/resourceDeduplication_test.go @@ -0,0 +1,65 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package definition + +import ( + "fmt" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/suite" +) + +type ( + resourceDeduplicationSuite struct { + suite.Suite + controller *gomock.Controller + } +) + +func TestResourceDeduplicationSuite(t *testing.T) { + s := new(resourceDeduplicationSuite) + suite.Run(t, s) +} + +func (s *resourceDeduplicationSuite) TestGenerateKey() { + resourceType := int32(1) + id := "id" + key := generateKey(resourceType, id) + s.Equal(fmt.Sprintf("%v::%v", resourceType, id), key) +} + +func (s *resourceDeduplicationSuite) TestGenerateDeduplicationKey() { + runID := "runID" + eventID := int64(1) + version := int64(2) + resource := NewEventReappliedID(runID, eventID, version) + key := GenerateDeduplicationKey(resource) + s.Equal(fmt.Sprintf("%v::%v::%v::%v", eventReappliedID, runID, eventID, version), key) +} + +func (s *resourceDeduplicationSuite) TestEventReappliedID() { + runID := "runID" + eventID := int64(1) + version := int64(2) + resource := NewEventReappliedID(runID, eventID, version) + s.Equal(fmt.Sprintf("%v::%v::%v", runID, eventID, version), resource.GetID()) +} diff --git a/service/history/handler.go b/service/history/handler.go index 576e9335236..3036f16da71 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -1564,10 +1564,13 @@ func (h *Handler) ReapplyEvents( if err != nil { return h.error(err, scope, domainID, workflowID) } + + execution := request.GetRequest().GetWorkflowExecution() if err := engine.ReapplyEvents( ctx, request.GetDomainUUID(), - request.GetRequest().GetWorkflowExecution().GetWorkflowId(), + execution.GetWorkflowId(), + execution.GetRunId(), historyEvents, ); err != nil { return h.error(err, scope, domainID, workflowID) diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 19f1a726e4f..a6b4c9c1f88 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -97,7 +97,7 @@ type ( SyncActivity(ctx ctx.Context, request *h.SyncActivityRequest) error GetReplicationMessages(ctx ctx.Context, taskID int64) (*r.ReplicationMessages, error) QueryWorkflow(ctx ctx.Context, request *h.QueryWorkflowRequest) (*h.QueryWorkflowResponse, error) - ReapplyEvents(ctx ctx.Context, domainUUID string, workflowID string, events []*workflow.HistoryEvent) error + ReapplyEvents(ctx ctx.Context, domainUUID string, workflowID string, runID string, events []*workflow.HistoryEvent) error NotifyNewHistoryEvent(event *historyEventNotification) NotifyNewTransferTasks(tasks []persistence.Task) @@ -2682,6 +2682,7 @@ func (e *historyEngineImpl) ReapplyEvents( ctx ctx.Context, domainUUID string, workflowID string, + runID string, reapplyEvents []*workflow.HistoryEvent, ) error { @@ -2691,14 +2692,14 @@ func (e *historyEngineImpl) ReapplyEvents( } domainID := domainEntry.GetInfo().ID // remove run id from the execution so that reapply events to the current run - execution := workflow.WorkflowExecution{ + currentExecution := workflow.WorkflowExecution{ WorkflowId: common.StringPtr(workflowID), } return e.updateWorkflowExecutionWithAction( ctx, domainID, - execution, + currentExecution, func(mutableState mutableState) (*updateWorkflowAction, error) { postActions := &updateWorkflowAction{ @@ -2714,22 +2715,28 @@ func (e *historyEngineImpl) ReapplyEvents( if !mutableState.IsWorkflowExecutionRunning() { e.logger.Warn("cannot reapply event to a finished workflow", tag.WorkflowDomainID(domainID), - tag.WorkflowID(workflowID), + tag.WorkflowID(currentExecution.GetWorkflowId()), ) e.metricsClient.IncCounter(metrics.HistoryReapplyEventsScope, metrics.EventReapplySkippedCount) return &updateWorkflowAction{ noop: true, }, nil } - if err := e.eventsReapplier.reapplyEvents( + reappliedEvents, err := e.eventsReapplier.reapplyEvents( ctx, mutableState, reapplyEvents, - ); err != nil { + runID, + ) + if err != nil { e.logger.Error("failed to re-apply stale events", tag.Error(err)) return nil, &workflow.InternalServiceError{Message: "unable to re-apply stale events"} } - + if len(reappliedEvents) == 0 { + return &updateWorkflowAction{ + noop: true, + }, nil + } return postActions, nil }) } diff --git a/service/history/historyEngine_mock.go b/service/history/historyEngine_mock.go index 19e82ab2443..428e149d759 100644 --- a/service/history/historyEngine_mock.go +++ b/service/history/historyEngine_mock.go @@ -507,17 +507,17 @@ func (mr *MockEngineMockRecorder) QueryWorkflow(ctx, request interface{}) *gomoc } // ReapplyEvents mocks base method -func (m *MockEngine) ReapplyEvents(ctx context.Context, domainUUID, workflowID string, events []*shared.HistoryEvent) error { +func (m *MockEngine) ReapplyEvents(ctx context.Context, domainUUID, workflowID, runID string, events []*shared.HistoryEvent) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReapplyEvents", ctx, domainUUID, workflowID, events) + ret := m.ctrl.Call(m, "ReapplyEvents", ctx, domainUUID, workflowID, runID, events) ret0, _ := ret[0].(error) return ret0 } // ReapplyEvents indicates an expected call of ReapplyEvents -func (mr *MockEngineMockRecorder) ReapplyEvents(ctx, domainUUID, workflowID, events interface{}) *gomock.Call { +func (mr *MockEngineMockRecorder) ReapplyEvents(ctx, domainUUID, workflowID, runID, events interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReapplyEvents", reflect.TypeOf((*MockEngine)(nil).ReapplyEvents), ctx, domainUUID, workflowID, events) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReapplyEvents", reflect.TypeOf((*MockEngine)(nil).ReapplyEvents), ctx, domainUUID, workflowID, runID, events) } // NotifyNewHistoryEvent mocks base method diff --git a/service/history/mutableState.go b/service/history/mutableState.go index eae726c0fb2..77a533aab34 100644 --- a/service/history/mutableState.go +++ b/service/history/mutableState.go @@ -28,6 +28,7 @@ import ( h "github.com/uber/cadence/.gen/go/history" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/persistence" ) @@ -157,6 +158,8 @@ type ( IsSignalRequested(requestID string) bool IsStickyTaskListEnabled() bool IsWorkflowExecutionRunning() bool + IsResourceDuplicated(resourceDedupKey definition.DeduplicationID) bool + UpdateDuplicatedResource(resourceDedupKey definition.DeduplicationID) Load(*persistence.WorkflowMutableState) ReplicateActivityInfo(*h.SyncActivityRequest, bool) error ReplicateActivityTaskCancelRequestedEvent(*workflow.HistoryEvent) error diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index 01f615504d2..62018534006 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -122,6 +122,9 @@ type ( // domain entry contains a snapshot of domain // NOTE: do not use the failover version inside, use currentVersion above domainEntry *cache.DomainCacheEntry + // record if a event has been applied to mutable state + // TODO: persist this to db + appliedEvents map[string]struct{} insertTransferTasks []persistence.Task insertReplicationTasks []persistence.Task @@ -181,6 +184,7 @@ func newMutableStateBuilder( stateInDB: persistence.WorkflowStateVoid, nextEventIDInDB: 0, domainEntry: domainEntry, + appliedEvents: make(map[string]struct{}), queryRegistry: newQueryRegistry(), @@ -3962,6 +3966,21 @@ func (e *mutableStateBuilder) CloseTransactionAsSnapshot( return workflowSnapshot, workflowEventsSeq, nil } +func (e *mutableStateBuilder) IsResourceDuplicated( + resourceDedupKey definition.DeduplicationID, +) bool { + id := definition.GenerateDeduplicationKey(resourceDedupKey) + _, duplicated := e.appliedEvents[id] + return duplicated +} + +func (e *mutableStateBuilder) UpdateDuplicatedResource( + resourceDedupKey definition.DeduplicationID, +) { + id := definition.GenerateDeduplicationKey(resourceDedupKey) + e.appliedEvents[id] = struct{}{} +} + func (e *mutableStateBuilder) prepareCloseTransaction( now time.Time, transactionPolicy transactionPolicy, diff --git a/service/history/mutableStateBuilder_test.go b/service/history/mutableStateBuilder_test.go index 71f597cf829..dcfdae58bb5 100644 --- a/service/history/mutableStateBuilder_test.go +++ b/service/history/mutableStateBuilder_test.go @@ -33,6 +33,7 @@ import ( workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/persistence" @@ -398,6 +399,18 @@ func (s *mutableStateSuite) TestMergeMapOfByteArray() { s.Equal(2, len(resultMap)) } +func (s *mutableStateSuite) TestEventReapplied() { + runID := uuid.New() + eventID := int64(1) + version := int64(2) + dedupResource := definition.NewEventReappliedID(runID, eventID, version) + isReapplied := s.msBuilder.IsResourceDuplicated(dedupResource) + s.False(isReapplied) + s.msBuilder.UpdateDuplicatedResource(dedupResource) + isReapplied = s.msBuilder.IsResourceDuplicated(dedupResource) + s.True(isReapplied) +} + func (s *mutableStateSuite) prepareTransientDecisionCompletionFirstBatchReplicated(version int64, runID string) (*shared.HistoryEvent, *shared.HistoryEvent) { domainID := testDomainID execution := shared.WorkflowExecution{ diff --git a/service/history/mutableState_mock.go b/service/history/mutableState_mock.go index dea639568a1..305a82ab0fa 100644 --- a/service/history/mutableState_mock.go +++ b/service/history/mutableState_mock.go @@ -36,6 +36,7 @@ import ( history "github.com/uber/cadence/.gen/go/history" shared "github.com/uber/cadence/.gen/go/shared" cache "github.com/uber/cadence/common/cache" + definition "github.com/uber/cadence/common/definition" persistence "github.com/uber/cadence/common/persistence" ) @@ -1568,6 +1569,32 @@ func (mr *MockmutableStateMockRecorder) IsWorkflowExecutionRunning() *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsWorkflowExecutionRunning", reflect.TypeOf((*MockmutableState)(nil).IsWorkflowExecutionRunning)) } +// IsResourceDuplicated mocks base method +func (m *MockmutableState) IsResourceDuplicated(resourceDedupKey definition.DeduplicationID) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsResourceDuplicated", resourceDedupKey) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsResourceDuplicated indicates an expected call of IsResourceDuplicated +func (mr *MockmutableStateMockRecorder) IsResourceDuplicated(resourceDedupKey interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsResourceDuplicated", reflect.TypeOf((*MockmutableState)(nil).IsResourceDuplicated), resourceDedupKey) +} + +// UpdateDuplicatedResource mocks base method +func (m *MockmutableState) UpdateDuplicatedResource(resourceDedupKey definition.DeduplicationID) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "UpdateDuplicatedResource", resourceDedupKey) +} + +// UpdateDuplicatedResource indicates an expected call of UpdateDuplicatedResource +func (mr *MockmutableStateMockRecorder) UpdateDuplicatedResource(resourceDedupKey interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateDuplicatedResource", reflect.TypeOf((*MockmutableState)(nil).UpdateDuplicatedResource), resourceDedupKey) +} + // Load mocks base method func (m *MockmutableState) Load(arg0 *persistence.WorkflowMutableState) { m.ctrl.T.Helper() diff --git a/service/history/nDCEventsReapplier.go b/service/history/nDCEventsReapplier.go index e212b5c89fa..37b85f3b628 100644 --- a/service/history/nDCEventsReapplier.go +++ b/service/history/nDCEventsReapplier.go @@ -26,6 +26,7 @@ import ( ctx "context" workflow "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/metrics" ) @@ -36,7 +37,8 @@ type ( ctx ctx.Context, msBuilder mutableState, historyEvents []*workflow.HistoryEvent, - ) error + runID string, + ) ([]*workflow.HistoryEvent, error) } nDCEventsReapplierImpl struct { @@ -60,38 +62,46 @@ func (r *nDCEventsReapplierImpl) reapplyEvents( ctx ctx.Context, msBuilder mutableState, historyEvents []*workflow.HistoryEvent, -) error { + runID string, +) ([]*workflow.HistoryEvent, error) { - var reapplyEvents []*workflow.HistoryEvent - // TODO: need to implement Reapply policy + var toReapplyEvents []*workflow.HistoryEvent for _, event := range historyEvents { switch event.GetEventType() { case workflow.EventTypeWorkflowExecutionSignaled: - reapplyEvents = append(reapplyEvents, event) + dedupResource := definition.NewEventReappliedID(runID, event.GetEventId(), event.GetVersion()) + if msBuilder.IsResourceDuplicated(dedupResource) { + // skip already applied event + continue + } + toReapplyEvents = append(toReapplyEvents, event) } } - if len(reapplyEvents) == 0 { - return nil + if len(toReapplyEvents) == 0 { + return nil, nil } if !msBuilder.IsWorkflowExecutionRunning() { // TODO when https://github.com/uber/cadence/issues/2420 is finished // reset to workflow finish event // ignore this case for now - return nil + return nil, nil } - // TODO: need to have signal deduplicate logic - for _, event := range reapplyEvents { + var reappliedEvents []*workflow.HistoryEvent + for _, event := range toReapplyEvents { signal := event.GetWorkflowExecutionSignaledEventAttributes() if _, err := msBuilder.AddWorkflowExecutionSignaled( signal.GetSignalName(), signal.GetInput(), signal.GetIdentity(), ); err != nil { - return err + return nil, err } + deDupResource := definition.NewEventReappliedID(runID, event.GetEventId(), event.GetVersion()) + msBuilder.UpdateDuplicatedResource(deDupResource) + reappliedEvents = append(reappliedEvents, event) } - return nil + return reappliedEvents, nil } diff --git a/service/history/nDCEventsReapplier_mock.go b/service/history/nDCEventsReapplier_mock.go index ca8bda67839..ca7787e0b0f 100644 --- a/service/history/nDCEventsReapplier_mock.go +++ b/service/history/nDCEventsReapplier_mock.go @@ -60,15 +60,16 @@ func (m *MocknDCEventsReapplier) EXPECT() *MocknDCEventsReapplierMockRecorder { } // reapplyEvents mocks base method -func (m *MocknDCEventsReapplier) reapplyEvents(ctx context.Context, msBuilder mutableState, historyEvents []*shared.HistoryEvent) error { +func (m *MocknDCEventsReapplier) reapplyEvents(ctx context.Context, msBuilder mutableState, historyEvents []*shared.HistoryEvent, runID string) ([]*shared.HistoryEvent, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "reapplyEvents", ctx, msBuilder, historyEvents) - ret0, _ := ret[0].(error) - return ret0 + ret := m.ctrl.Call(m, "reapplyEvents", ctx, msBuilder, historyEvents, runID) + ret0, _ := ret[0].([]*shared.HistoryEvent) + ret1, _ := ret[1].(error) + return ret0, ret1 } // reapplyEvents indicates an expected call of reapplyEvents -func (mr *MocknDCEventsReapplierMockRecorder) reapplyEvents(ctx, msBuilder, historyEvents interface{}) *gomock.Call { +func (mr *MocknDCEventsReapplierMockRecorder) reapplyEvents(ctx, msBuilder, historyEvents, runID interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "reapplyEvents", reflect.TypeOf((*MocknDCEventsReapplier)(nil).reapplyEvents), ctx, msBuilder, historyEvents) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "reapplyEvents", reflect.TypeOf((*MocknDCEventsReapplier)(nil).reapplyEvents), ctx, msBuilder, historyEvents, runID) } diff --git a/service/history/nDCEventsReapplier_test.go b/service/history/nDCEventsReapplier_test.go index 297e529b27c..f95a5617b8a 100644 --- a/service/history/nDCEventsReapplier_test.go +++ b/service/history/nDCEventsReapplier_test.go @@ -22,6 +22,7 @@ package history import ( "context" + "fmt" "testing" "github.com/golang/mock/gomock" @@ -32,6 +33,7 @@ import ( "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" @@ -70,7 +72,8 @@ func (s *nDCEventReapplicationSuite) TearDownTest() { s.controller.Finish() } -func (s *nDCEventReapplicationSuite) TestReapplyEvents() { +func (s *nDCEventReapplicationSuite) TestReapplyEvents_AppliedEvent() { + runID := uuid.New() execution := &persistence.WorkflowExecutionInfo{ DomainID: uuid.New(), } @@ -90,13 +93,127 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents() { msBuilderCurrent.EXPECT().GetLastWriteVersion().Return(int64(1), nil).AnyTimes() msBuilderCurrent.EXPECT().GetExecutionInfo().Return(execution).AnyTimes() msBuilderCurrent.EXPECT().AddWorkflowExecutionSignaled( - attr.GetSignalName(), attr.GetInput(), attr.GetIdentity(), + attr.GetSignalName(), + attr.GetInput(), + attr.GetIdentity(), ).Return(event, nil).Times(1) + dedupResource := definition.NewEventReappliedID(runID, event.GetEventId(), event.GetVersion()) + msBuilderCurrent.EXPECT().IsResourceDuplicated(dedupResource).Return(false).Times(1) + msBuilderCurrent.EXPECT().UpdateDuplicatedResource(dedupResource).Times(1) + events := []*shared.HistoryEvent{ + {EventType: common.EventTypePtr(shared.EventTypeWorkflowExecutionStarted)}, + event, + } + appliedEvent, err := s.nDCReapplication.reapplyEvents(context.Background(), msBuilderCurrent, events, runID) + s.NoError(err) + s.Equal(1, len(appliedEvent)) +} + +func (s *nDCEventReapplicationSuite) TestReapplyEvents_Noop() { + runID := uuid.New() + event := &shared.HistoryEvent{ + EventId: common.Int64Ptr(1), + EventType: common.EventTypePtr(shared.EventTypeWorkflowExecutionSignaled), + WorkflowExecutionSignaledEventAttributes: &shared.WorkflowExecutionSignaledEventAttributes{ + Identity: common.StringPtr("test"), + SignalName: common.StringPtr("signal"), + Input: []byte{}, + }, + } + msBuilderCurrent := NewMockmutableState(s.controller) + dedupResource := definition.NewEventReappliedID(runID, event.GetEventId(), event.GetVersion()) + msBuilderCurrent.EXPECT().IsResourceDuplicated(dedupResource).Return(true).Times(1) events := []*shared.HistoryEvent{ {EventType: common.EventTypePtr(shared.EventTypeWorkflowExecutionStarted)}, event, } - err := s.nDCReapplication.reapplyEvents(context.Background(), msBuilderCurrent, events) + appliedEvent, err := s.nDCReapplication.reapplyEvents(context.Background(), msBuilderCurrent, events, runID) + s.NoError(err) + s.Equal(0, len(appliedEvent)) +} + +func (s *nDCEventReapplicationSuite) TestReapplyEvents_PartialAppliedEvent() { + runID := uuid.New() + execution := &persistence.WorkflowExecutionInfo{ + DomainID: uuid.New(), + } + event1 := &shared.HistoryEvent{ + EventId: common.Int64Ptr(1), + EventType: common.EventTypePtr(shared.EventTypeWorkflowExecutionSignaled), + WorkflowExecutionSignaledEventAttributes: &shared.WorkflowExecutionSignaledEventAttributes{ + Identity: common.StringPtr("test"), + SignalName: common.StringPtr("signal"), + Input: []byte{}, + }, + } + event2 := &shared.HistoryEvent{ + EventId: common.Int64Ptr(2), + EventType: common.EventTypePtr(shared.EventTypeWorkflowExecutionSignaled), + WorkflowExecutionSignaledEventAttributes: &shared.WorkflowExecutionSignaledEventAttributes{ + Identity: common.StringPtr("test"), + SignalName: common.StringPtr("signal"), + Input: []byte{}, + }, + } + attr1 := event1.WorkflowExecutionSignaledEventAttributes + + msBuilderCurrent := NewMockmutableState(s.controller) + msBuilderCurrent.EXPECT().IsWorkflowExecutionRunning().Return(true) + msBuilderCurrent.EXPECT().GetLastWriteVersion().Return(int64(1), nil).AnyTimes() + msBuilderCurrent.EXPECT().GetExecutionInfo().Return(execution).AnyTimes() + msBuilderCurrent.EXPECT().AddWorkflowExecutionSignaled( + attr1.GetSignalName(), + attr1.GetInput(), + attr1.GetIdentity(), + ).Return(event1, nil).Times(1) + dedupResource1 := definition.NewEventReappliedID(runID, event1.GetEventId(), event1.GetVersion()) + msBuilderCurrent.EXPECT().IsResourceDuplicated(dedupResource1).Return(false).Times(1) + dedupResource2 := definition.NewEventReappliedID(runID, event2.GetEventId(), event2.GetVersion()) + msBuilderCurrent.EXPECT().IsResourceDuplicated(dedupResource2).Return(true).Times(1) + msBuilderCurrent.EXPECT().UpdateDuplicatedResource(dedupResource1).Times(1) + events := []*shared.HistoryEvent{ + {EventType: common.EventTypePtr(shared.EventTypeWorkflowExecutionStarted)}, + event1, + event2, + } + appliedEvent, err := s.nDCReapplication.reapplyEvents(context.Background(), msBuilderCurrent, events, runID) s.NoError(err) + s.Equal(1, len(appliedEvent)) +} + +func (s *nDCEventReapplicationSuite) TestReapplyEvents_Error() { + runID := uuid.New() + execution := &persistence.WorkflowExecutionInfo{ + DomainID: uuid.New(), + } + event := &shared.HistoryEvent{ + EventId: common.Int64Ptr(1), + EventType: common.EventTypePtr(shared.EventTypeWorkflowExecutionSignaled), + WorkflowExecutionSignaledEventAttributes: &shared.WorkflowExecutionSignaledEventAttributes{ + Identity: common.StringPtr("test"), + SignalName: common.StringPtr("signal"), + Input: []byte{}, + }, + } + attr := event.WorkflowExecutionSignaledEventAttributes + + msBuilderCurrent := NewMockmutableState(s.controller) + msBuilderCurrent.EXPECT().IsWorkflowExecutionRunning().Return(true) + msBuilderCurrent.EXPECT().GetLastWriteVersion().Return(int64(1), nil).AnyTimes() + msBuilderCurrent.EXPECT().GetExecutionInfo().Return(execution).AnyTimes() + msBuilderCurrent.EXPECT().AddWorkflowExecutionSignaled( + attr.GetSignalName(), + attr.GetInput(), + attr.GetIdentity(), + ).Return(nil, fmt.Errorf("test")).Times(1) + dedupResource := definition.NewEventReappliedID(runID, event.GetEventId(), event.GetVersion()) + msBuilderCurrent.EXPECT().IsResourceDuplicated(dedupResource).Return(false).Times(1) + events := []*shared.HistoryEvent{ + {EventType: common.EventTypePtr(shared.EventTypeWorkflowExecutionStarted)}, + event, + } + appliedEvent, err := s.nDCReapplication.reapplyEvents(context.Background(), msBuilderCurrent, events, runID) + s.Error(err) + s.Equal(0, len(appliedEvent)) } diff --git a/service/history/nDCTransactionMgr.go b/service/history/nDCTransactionMgr.go index 817a3e3ccb3..2eacc3934a7 100644 --- a/service/history/nDCTransactionMgr.go +++ b/service/history/nDCTransactionMgr.go @@ -272,13 +272,15 @@ func (r *nDCTransactionMgrImpl) backfillWorkflow( // target workflow is active && target workflow is current workflow // we need to reapply events here, rather than using reapplyEvents // within workflow execution context, or otherwise deadlock will appear - if targetWorkflow.getMutableState().IsCurrentWorkflowGuaranteed() { + targetMutableState := targetWorkflow.getMutableState() + if targetMutableState.IsCurrentWorkflowGuaranteed() { // case 1 transactionPolicy = transactionPolicyActive - if err := r.eventsReapplier.reapplyEvents( + if _, err := r.eventsReapplier.reapplyEvents( ctx, targetWorkflow.getMutableState(), targetWorkflowEvents.Events, + targetMutableState.GetExecutionInfo().RunID, ); err != nil { return err } diff --git a/service/history/nDCTransactionMgr_test.go b/service/history/nDCTransactionMgr_test.go index 9bc817fd4dc..24585d73aaf 100644 --- a/service/history/nDCTransactionMgr_test.go +++ b/service/history/nDCTransactionMgr_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/uber-go/tally" @@ -139,6 +140,7 @@ func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentGuaranteed_Active_R now := time.Now() currentVersion := int64(1234) releaseCalled := false + runID := uuid.New() workflow := NewMocknDCWorkflow(s.controller) context := NewMockworkflowExecutionContext(s.controller) @@ -155,10 +157,11 @@ func (s *nDCTransactionMgrSuite) TestBackfillWorkflow_CurrentGuaranteed_Active_R s.mockClusterMetadata.On("ClusterNameForFailoverVersion", currentVersion).Return(cluster.TestCurrentClusterName) s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) - s.mockEventsReapplier.EXPECT().reapplyEvents(ctx, mutableState, workflowEvents.Events).Return(nil).Times(1) + s.mockEventsReapplier.EXPECT().reapplyEvents(ctx, mutableState, workflowEvents.Events, runID).Return(workflowEvents.Events, nil).Times(1) mutableState.EXPECT().IsCurrentWorkflowGuaranteed().Return(true).AnyTimes() mutableState.EXPECT().GetCurrentVersion().Return(currentVersion).AnyTimes() + mutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{RunID: runID}).Times(1) context.EXPECT().persistNonFirstWorkflowEvents(workflowEvents).Return(int64(0), nil).Times(1) context.EXPECT().updateWorkflowExecutionWithNew( now, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, transactionPolicyActive, (*transactionPolicy)(nil), diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index fb305c6d2b2..f2423955cf6 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -1184,6 +1184,7 @@ func (c *workflowExecutionContextImpl) reapplyEvents( domainID := eventBatches[0].DomainID workflowID := eventBatches[0].WorkflowID + runID := eventBatches[0].RunID var reapplyEvents []*workflow.HistoryEvent for _, events := range eventBatches { if events.DomainID != domainID || @@ -1205,9 +1206,10 @@ func (c *workflowExecutionContextImpl) reapplyEvents( } // Reapply events only reapply to the current run. - // Leave the run id empty will reapply events to the current run. + // The run id is only used for reapply event de-duplication execution := &workflow.WorkflowExecution{ WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(runID), } domainCache := c.shard.GetDomainCache() clientBean := c.shard.GetService().GetClientBean() @@ -1226,6 +1228,7 @@ func (c *workflowExecutionContextImpl) reapplyEvents( ctx, domainID, workflowID, + runID, reapplyEvents, ) }