Skip to content

Commit

Permalink
Adding a layer to ignore reapplied events (#2787)
Browse files Browse the repository at this point in the history
* Adding logic to handle duplicate reapply events
  • Loading branch information
yux0 authored Nov 21, 2019
1 parent 6137576 commit b4b4686
Show file tree
Hide file tree
Showing 15 changed files with 400 additions and 37 deletions.
90 changes: 90 additions & 0 deletions common/definition/resourceDeduplication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package definition

import (
"fmt"
)

const (
resourceIDTemplate = "%v::%v"
eventReappliedIDTemplate = "%v::%v::%v"
)

type (
// DeduplicationID uses to generate id for deduplication
DeduplicationID interface {
GetID() string
}
)

// Deduplication id type
const (
eventReappliedID = iota
)

// Deduplication resource struct
type (
// EventReappliedID is the deduplication resource for reapply event
EventReappliedID struct {
id string
}
)

// NewEventReappliedID returns EventReappliedID resource
func NewEventReappliedID(
runID string,
eventID int64,
version int64,
) EventReappliedID {

newID := fmt.Sprintf(
eventReappliedIDTemplate,
runID,
eventID,
version,
)
return EventReappliedID{
id: newID,
}
}

// GetID returns id of EventReappliedID
func (e EventReappliedID) GetID() string {
return e.id
}

// GenerateDeduplicationKey generates deduplication key
func GenerateDeduplicationKey(
resource DeduplicationID,
) string {

switch resource.(type) {
case EventReappliedID:
return generateKey(eventReappliedID, resource.GetID())
default:
panic("unsupported deduplication key")
}
}

func generateKey(resourceType int32, id string) string {
return fmt.Sprintf(resourceIDTemplate, resourceType, id)
}
65 changes: 65 additions & 0 deletions common/definition/resourceDeduplication_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package definition

import (
"fmt"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"
)

type (
resourceDeduplicationSuite struct {
suite.Suite
controller *gomock.Controller
}
)

func TestResourceDeduplicationSuite(t *testing.T) {
s := new(resourceDeduplicationSuite)
suite.Run(t, s)
}

func (s *resourceDeduplicationSuite) TestGenerateKey() {
resourceType := int32(1)
id := "id"
key := generateKey(resourceType, id)
s.Equal(fmt.Sprintf("%v::%v", resourceType, id), key)
}

func (s *resourceDeduplicationSuite) TestGenerateDeduplicationKey() {
runID := "runID"
eventID := int64(1)
version := int64(2)
resource := NewEventReappliedID(runID, eventID, version)
key := GenerateDeduplicationKey(resource)
s.Equal(fmt.Sprintf("%v::%v::%v::%v", eventReappliedID, runID, eventID, version), key)
}

func (s *resourceDeduplicationSuite) TestEventReappliedID() {
runID := "runID"
eventID := int64(1)
version := int64(2)
resource := NewEventReappliedID(runID, eventID, version)
s.Equal(fmt.Sprintf("%v::%v::%v", runID, eventID, version), resource.GetID())
}
5 changes: 4 additions & 1 deletion service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1564,10 +1564,13 @@ func (h *Handler) ReapplyEvents(
if err != nil {
return h.error(err, scope, domainID, workflowID)
}

execution := request.GetRequest().GetWorkflowExecution()
if err := engine.ReapplyEvents(
ctx,
request.GetDomainUUID(),
request.GetRequest().GetWorkflowExecution().GetWorkflowId(),
execution.GetWorkflowId(),
execution.GetRunId(),
historyEvents,
); err != nil {
return h.error(err, scope, domainID, workflowID)
Expand Down
21 changes: 14 additions & 7 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type (
SyncActivity(ctx ctx.Context, request *h.SyncActivityRequest) error
GetReplicationMessages(ctx ctx.Context, taskID int64) (*r.ReplicationMessages, error)
QueryWorkflow(ctx ctx.Context, request *h.QueryWorkflowRequest) (*h.QueryWorkflowResponse, error)
ReapplyEvents(ctx ctx.Context, domainUUID string, workflowID string, events []*workflow.HistoryEvent) error
ReapplyEvents(ctx ctx.Context, domainUUID string, workflowID string, runID string, events []*workflow.HistoryEvent) error

NotifyNewHistoryEvent(event *historyEventNotification)
NotifyNewTransferTasks(tasks []persistence.Task)
Expand Down Expand Up @@ -2682,6 +2682,7 @@ func (e *historyEngineImpl) ReapplyEvents(
ctx ctx.Context,
domainUUID string,
workflowID string,
runID string,
reapplyEvents []*workflow.HistoryEvent,
) error {

Expand All @@ -2691,14 +2692,14 @@ func (e *historyEngineImpl) ReapplyEvents(
}
domainID := domainEntry.GetInfo().ID
// remove run id from the execution so that reapply events to the current run
execution := workflow.WorkflowExecution{
currentExecution := workflow.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
}

return e.updateWorkflowExecutionWithAction(
ctx,
domainID,
execution,
currentExecution,
func(mutableState mutableState) (*updateWorkflowAction, error) {

postActions := &updateWorkflowAction{
Expand All @@ -2714,22 +2715,28 @@ func (e *historyEngineImpl) ReapplyEvents(
if !mutableState.IsWorkflowExecutionRunning() {
e.logger.Warn("cannot reapply event to a finished workflow",
tag.WorkflowDomainID(domainID),
tag.WorkflowID(workflowID),
tag.WorkflowID(currentExecution.GetWorkflowId()),
)
e.metricsClient.IncCounter(metrics.HistoryReapplyEventsScope, metrics.EventReapplySkippedCount)
return &updateWorkflowAction{
noop: true,
}, nil
}
if err := e.eventsReapplier.reapplyEvents(
reappliedEvents, err := e.eventsReapplier.reapplyEvents(
ctx,
mutableState,
reapplyEvents,
); err != nil {
runID,
)
if err != nil {
e.logger.Error("failed to re-apply stale events", tag.Error(err))
return nil, &workflow.InternalServiceError{Message: "unable to re-apply stale events"}
}

if len(reappliedEvents) == 0 {
return &updateWorkflowAction{
noop: true,
}, nil
}
return postActions, nil
})
}
Expand Down
8 changes: 4 additions & 4 deletions service/history/historyEngine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions service/history/mutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
h "github.com/uber/cadence/.gen/go/history"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/definition"
"github.com/uber/cadence/common/persistence"
)

Expand Down Expand Up @@ -157,6 +158,8 @@ type (
IsSignalRequested(requestID string) bool
IsStickyTaskListEnabled() bool
IsWorkflowExecutionRunning() bool
IsResourceDuplicated(resourceDedupKey definition.DeduplicationID) bool
UpdateDuplicatedResource(resourceDedupKey definition.DeduplicationID)
Load(*persistence.WorkflowMutableState)
ReplicateActivityInfo(*h.SyncActivityRequest, bool) error
ReplicateActivityTaskCancelRequestedEvent(*workflow.HistoryEvent) error
Expand Down
19 changes: 19 additions & 0 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ type (
// domain entry contains a snapshot of domain
// NOTE: do not use the failover version inside, use currentVersion above
domainEntry *cache.DomainCacheEntry
// record if a event has been applied to mutable state
// TODO: persist this to db
appliedEvents map[string]struct{}

insertTransferTasks []persistence.Task
insertReplicationTasks []persistence.Task
Expand Down Expand Up @@ -181,6 +184,7 @@ func newMutableStateBuilder(
stateInDB: persistence.WorkflowStateVoid,
nextEventIDInDB: 0,
domainEntry: domainEntry,
appliedEvents: make(map[string]struct{}),

queryRegistry: newQueryRegistry(),

Expand Down Expand Up @@ -3962,6 +3966,21 @@ func (e *mutableStateBuilder) CloseTransactionAsSnapshot(
return workflowSnapshot, workflowEventsSeq, nil
}

func (e *mutableStateBuilder) IsResourceDuplicated(
resourceDedupKey definition.DeduplicationID,
) bool {
id := definition.GenerateDeduplicationKey(resourceDedupKey)
_, duplicated := e.appliedEvents[id]
return duplicated
}

func (e *mutableStateBuilder) UpdateDuplicatedResource(
resourceDedupKey definition.DeduplicationID,
) {
id := definition.GenerateDeduplicationKey(resourceDedupKey)
e.appliedEvents[id] = struct{}{}
}

func (e *mutableStateBuilder) prepareCloseTransaction(
now time.Time,
transactionPolicy transactionPolicy,
Expand Down
13 changes: 13 additions & 0 deletions service/history/mutableStateBuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/definition"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -398,6 +399,18 @@ func (s *mutableStateSuite) TestMergeMapOfByteArray() {
s.Equal(2, len(resultMap))
}

func (s *mutableStateSuite) TestEventReapplied() {
runID := uuid.New()
eventID := int64(1)
version := int64(2)
dedupResource := definition.NewEventReappliedID(runID, eventID, version)
isReapplied := s.msBuilder.IsResourceDuplicated(dedupResource)
s.False(isReapplied)
s.msBuilder.UpdateDuplicatedResource(dedupResource)
isReapplied = s.msBuilder.IsResourceDuplicated(dedupResource)
s.True(isReapplied)
}

func (s *mutableStateSuite) prepareTransientDecisionCompletionFirstBatchReplicated(version int64, runID string) (*shared.HistoryEvent, *shared.HistoryEvent) {
domainID := testDomainID
execution := shared.WorkflowExecution{
Expand Down
27 changes: 27 additions & 0 deletions service/history/mutableState_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b4b4686

Please sign in to comment.