From af42a50bf4f97f5e6273f4acbe482c6827e8359f Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Wed, 24 Jul 2019 17:57:42 -0700 Subject: [PATCH] Implement Sticky TTL (#2264) --- common/service/dynamicconfig/constants.go | 3 + .../history/MockWorkflowExecutionContext.go | 15 ++++ service/history/conflictResolver.go | 1 + service/history/historyBuilder_test.go | 2 +- service/history/historyEngine.go | 34 ++++---- service/history/historyEngine2_test.go | 84 +++++++++++++++++++ .../history/historyEngine3_eventsv2_test.go | 2 + service/history/historyEngine_test.go | 4 +- service/history/historyReplicator.go | 7 +- service/history/mutableStateBuilder.go | 18 +++- service/history/mutableStateBuilder_test.go | 2 +- service/history/service.go | 4 + service/history/stateBuilder.go | 1 + service/history/stateBuilder_test.go | 2 + service/history/timerBuilder_test.go | 10 +-- service/history/workflowExecutionContext.go | 2 + service/history/workflowResetor.go | 9 +- 17 files changed, 169 insertions(+), 31 deletions(-) diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index ab0cc727a66..6b7f462d1eb 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -180,6 +180,7 @@ var keys = map[Key]string{ ArchiveRequestRPS: "history.archiveRequestRPS", EmitShardDiffLog: "history.emitShardDiffLog", HistoryThrottledLogRPS: "history.throttledLogRPS", + StickyTTL: "history.stickyTTL", WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS", WorkerReplicatorMetaTaskConcurrency: "worker.replicatorMetaTaskConcurrency", @@ -480,6 +481,8 @@ const ( EnableEventsV2 // HistoryThrottledLogRPS is the rate limit on number of log messages emitted per second for throttled logger HistoryThrottledLogRPS + // StickyTTL is to expire a sticky tasklist if no update more than this duration + StickyTTL // key for worker diff --git a/service/history/MockWorkflowExecutionContext.go b/service/history/MockWorkflowExecutionContext.go index 788ff7d00ae..bc7491d3f0c 100644 --- a/service/history/MockWorkflowExecutionContext.go +++ b/service/history/MockWorkflowExecutionContext.go @@ -42,6 +42,21 @@ func (_m *mockWorkflowExecutionContext) clear() { _m.Called() } +func (_m *mockWorkflowExecutionContext) getDomainName() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(string) + } + } + + return r0 +} + func (_m *mockWorkflowExecutionContext) getDomainID() string { ret := _m.Called() diff --git a/service/history/conflictResolver.go b/service/history/conflictResolver.go index 60709201502..1a63a9d2a35 100644 --- a/service/history/conflictResolver.go +++ b/service/history/conflictResolver.go @@ -122,6 +122,7 @@ func (r *conflictResolverImpl) reset( // if can see replication task, meaning that domain is // global domain with > 1 target clusters cache.ReplicationPolicyMultiCluster, + r.context.getDomainName(), ) resetMutableStateBuilder.executionInfo.EventStoreVersion = eventStoreVersion diff --git a/service/history/historyBuilder_test.go b/service/history/historyBuilder_test.go index 4d9d4cf635a..ac9b8e4e248 100644 --- a/service/history/historyBuilder_test.go +++ b/service/history/historyBuilder_test.go @@ -76,7 +76,7 @@ func (s *historyBuilderSuite) SetupTest() { } s.mockEventsCache = &MockEventsCache{} s.msBuilder = newMutableStateBuilder(s.mockShard, s.mockEventsCache, - s.logger) + s.logger, "") s.builder = newHistoryBuilder(s.msBuilder, s.logger) } diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 95376c74484..1835a75a819 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -306,12 +306,14 @@ func (e *historyEngineImpl) createMutableState( e.logger, domainEntry.GetFailoverVersion(), domainEntry.GetReplicationPolicy(), + domainEntry.GetInfo().Name, ) } else { msBuilder = newMutableStateBuilder( e.shard, e.shard.GetEventsCache(), e.logger, + domainEntry.GetInfo().Name, ) } @@ -592,22 +594,24 @@ func (e *historyEngineImpl) getMutableState( executionInfo := msBuilder.GetExecutionInfo() execution.RunId = context.getExecution().RunId retResp = &h.GetMutableStateResponse{ - Execution: &execution, - WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(executionInfo.WorkflowTypeName)}, - LastFirstEventId: common.Int64Ptr(msBuilder.GetLastFirstEventID()), - NextEventId: common.Int64Ptr(msBuilder.GetNextEventID()), - PreviousStartedEventId: common.Int64Ptr(msBuilder.GetPreviousStartedEventID()), - TaskList: &workflow.TaskList{Name: common.StringPtr(executionInfo.TaskList)}, - StickyTaskList: &workflow.TaskList{Name: common.StringPtr(executionInfo.StickyTaskList)}, - ClientLibraryVersion: common.StringPtr(executionInfo.ClientLibraryVersion), - ClientFeatureVersion: common.StringPtr(executionInfo.ClientFeatureVersion), - ClientImpl: common.StringPtr(executionInfo.ClientImpl), - IsWorkflowRunning: common.BoolPtr(msBuilder.IsWorkflowExecutionRunning()), - StickyTaskListScheduleToStartTimeout: common.Int32Ptr(executionInfo.StickyScheduleToStartTimeout), - EventStoreVersion: common.Int32Ptr(msBuilder.GetEventStoreVersion()), - BranchToken: msBuilder.GetCurrentBranch(), + Execution: &execution, + WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(executionInfo.WorkflowTypeName)}, + LastFirstEventId: common.Int64Ptr(msBuilder.GetLastFirstEventID()), + NextEventId: common.Int64Ptr(msBuilder.GetNextEventID()), + PreviousStartedEventId: common.Int64Ptr(msBuilder.GetPreviousStartedEventID()), + TaskList: &workflow.TaskList{Name: common.StringPtr(executionInfo.TaskList)}, + ClientLibraryVersion: common.StringPtr(executionInfo.ClientLibraryVersion), + ClientFeatureVersion: common.StringPtr(executionInfo.ClientFeatureVersion), + ClientImpl: common.StringPtr(executionInfo.ClientImpl), + IsWorkflowRunning: common.BoolPtr(msBuilder.IsWorkflowExecutionRunning()), + EventStoreVersion: common.Int32Ptr(msBuilder.GetEventStoreVersion()), + BranchToken: msBuilder.GetCurrentBranch(), + } + + if msBuilder.IsStickyTaskListEnabled() { + retResp.StickyTaskList = &workflow.TaskList{Name: common.StringPtr(executionInfo.StickyTaskList)} + retResp.StickyTaskListScheduleToStartTimeout = common.Int32Ptr(executionInfo.StickyScheduleToStartTimeout) } - replicationState := msBuilder.GetReplicationState() if replicationState != nil { retResp.ReplicationInfo = map[string]*workflow.ReplicationInfo{} diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index bda517cc72d..c1ccd6c92cd 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -25,6 +25,7 @@ import ( "encoding/json" "errors" "testing" + "time" "github.com/pborman/uuid" "github.com/stretchr/testify/mock" @@ -195,6 +196,88 @@ func (s *engine2Suite) TearDownTest() { s.mockTimerProcessor.AssertExpectations(s.T()) } +func (s *engine2Suite) TestRecordDecisionTaskStartedSuccessStickyExpired() { + domainID := validDomainID + we := workflow.WorkflowExecution{ + WorkflowId: common.StringPtr("wId"), + RunId: common.StringPtr(validRunID), + } + tl := "testTaskList" + stickyTl := "stickyTaskList" + identity := "testIdentity" + + msBuilder := newMutableStateBuilderWithEventV2(s.historyEngine.shard, s.mockEventsCache, + loggerimpl.NewDevelopmentForTest(s.Suite), we.GetRunId()) + executionInfo := msBuilder.GetExecutionInfo() + executionInfo.StickyTaskList = stickyTl + + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + di := addDecisionTaskScheduledEvent(msBuilder) + + ms := createMutableState(msBuilder) + + gwmsResponse := &p.GetWorkflowExecutionResponse{State: ms} + + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once() + s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once() + s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{ + MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}, + }, nil).Once() + s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( + &p.GetDomainResponse{ + Info: &p.DomainInfo{ID: domainID}, + Config: &p.DomainConfig{Retention: 1}, + ReplicationConfig: &p.DomainReplicationConfig{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []*p.ClusterReplicationConfig{ + &p.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName}, + }, + }, + TableVersion: p.DomainTableVersionV1, + }, + nil, + ) + + request := h.RecordDecisionTaskStartedRequest{ + DomainUUID: common.StringPtr(domainID), + WorkflowExecution: &we, + ScheduleId: common.Int64Ptr(2), + TaskId: common.Int64Ptr(100), + RequestId: common.StringPtr("reqId"), + PollRequest: &workflow.PollForDecisionTaskRequest{ + TaskList: &workflow.TaskList{ + Name: common.StringPtr(stickyTl), + }, + Identity: common.StringPtr(identity), + }, + } + + expectedResponse := h.RecordDecisionTaskStartedResponse{} + expectedResponse.WorkflowType = msBuilder.GetWorkflowType() + executionInfo = msBuilder.GetExecutionInfo() + if executionInfo.LastProcessedEvent != common.EmptyEventID { + expectedResponse.PreviousStartedEventId = common.Int64Ptr(executionInfo.LastProcessedEvent) + } + expectedResponse.ScheduledEventId = common.Int64Ptr(di.ScheduleID) + expectedResponse.StartedEventId = common.Int64Ptr(di.ScheduleID + 1) + expectedResponse.StickyExecutionEnabled = common.BoolPtr(false) + expectedResponse.NextEventId = common.Int64Ptr(msBuilder.GetNextEventID() + 1) + expectedResponse.Attempt = common.Int64Ptr(di.Attempt) + expectedResponse.WorkflowExecutionTaskList = common.TaskListPtr(workflow.TaskList{ + Name: &executionInfo.TaskList, + Kind: common.TaskListKindPtr(workflow.TaskListKindNormal), + }) + expectedResponse.EventStoreVersion = common.Int32Ptr(p.EventStoreVersionV2) + expectedResponse.BranchToken = msBuilder.GetCurrentBranch() + + response, err := s.historyEngine.RecordDecisionTaskStarted(context.Background(), &request) + s.Nil(err) + s.NotNil(response) + expectedResponse.StartedTimestamp = response.StartedTimestamp + expectedResponse.ScheduledTimestamp = common.Int64Ptr(0) + s.Equal(&expectedResponse, response) +} + func (s *engine2Suite) TestRecordDecisionTaskStartedSuccessStickyEnabled() { domainID := validDomainID we := workflow.WorkflowExecution{ @@ -208,6 +291,7 @@ func (s *engine2Suite) TestRecordDecisionTaskStartedSuccessStickyEnabled() { msBuilder := newMutableStateBuilderWithEventV2(s.historyEngine.shard, s.mockEventsCache, loggerimpl.NewDevelopmentForTest(s.Suite), we.GetRunId()) executionInfo := msBuilder.GetExecutionInfo() + executionInfo.LastUpdatedTimestamp = time.Now() executionInfo.StickyTaskList = stickyTl addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) diff --git a/service/history/historyEngine3_eventsv2_test.go b/service/history/historyEngine3_eventsv2_test.go index c1670947370..5b073630a7f 100644 --- a/service/history/historyEngine3_eventsv2_test.go +++ b/service/history/historyEngine3_eventsv2_test.go @@ -23,6 +23,7 @@ package history import ( "context" "testing" + "time" "github.com/pborman/uuid" "github.com/stretchr/testify/mock" @@ -209,6 +210,7 @@ func (s *engine3Suite) TestRecordDecisionTaskStartedSuccessStickyEnabled() { msBuilder := newMutableStateBuilderWithEventV2(s.historyEngine.shard, s.mockEventsCache, loggerimpl.NewDevelopmentForTest(s.Suite), we.GetRunId()) executionInfo := msBuilder.GetExecutionInfo() + executionInfo.LastUpdatedTimestamp = time.Now() executionInfo.StickyTaskList = stickyTl addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 03c37c1b54f..e7832719b51 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -5126,7 +5126,7 @@ func addCompleteWorkflowEvent(builder mutableState, decisionCompletedEventID int func newMutableStateBuilderWithEventV2(shard ShardContext, eventsCache eventsCache, logger log.Logger, runID string) *mutableStateBuilder { - msBuilder := newMutableStateBuilder(shard, eventsCache, logger) + msBuilder := newMutableStateBuilder(shard, eventsCache, logger, "") _ = msBuilder.SetHistoryTree(runID) return msBuilder @@ -5135,7 +5135,7 @@ func newMutableStateBuilderWithEventV2(shard ShardContext, eventsCache eventsCac func newMutableStateBuilderWithReplicationStateWithEventV2(shard ShardContext, eventsCache eventsCache, logger log.Logger, version int64, runID string) *mutableStateBuilder { - msBuilder := newMutableStateBuilderWithReplicationState(shard, eventsCache, logger, version, cache.ReplicationPolicyOneCluster) + msBuilder := newMutableStateBuilderWithReplicationState(shard, eventsCache, logger, version, cache.ReplicationPolicyOneCluster, "") _ = msBuilder.SetHistoryTree(runID) return msBuilder diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index 6c121ba34f2..1233335b139 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -52,7 +52,7 @@ var ( type ( conflictResolverProvider func(context workflowExecutionContext, logger log.Logger) conflictResolver stateBuilderProvider func(msBuilder mutableState, logger log.Logger) stateBuilder - mutableStateProvider func(version int64, logger log.Logger) mutableState + mutableStateProvider func(version int64, logger log.Logger, domainName string) mutableState historyReplicator struct { shard ShardContext @@ -146,7 +146,7 @@ func newHistoryReplicator( getNewStateBuilder: func(msBuilder mutableState, logger log.Logger) stateBuilder { return newStateBuilder(shard, msBuilder, logger) }, - getNewMutableState: func(version int64, logger log.Logger) mutableState { + getNewMutableState: func(version int64, logger log.Logger, domainName string) mutableState { return newMutableStateBuilderWithReplicationState( shard, shard.GetEventsCache(), @@ -155,6 +155,7 @@ func newHistoryReplicator( // if can see replication task, meaning that domain is // global domain with > 1 target clusters cache.ReplicationPolicyMultiCluster, + domainName, ) }, } @@ -435,7 +436,7 @@ func (r *historyReplicator) ApplyStartEvent( logger log.Logger, ) error { - msBuilder := r.getNewMutableState(request.GetVersion(), logger) + msBuilder := r.getNewMutableState(request.GetVersion(), logger, context.getDomainName()) err := r.ApplyReplicationTask(ctx, context, msBuilder, request, logger) return err } diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index 722d8593522..b34d9604edf 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -107,6 +107,7 @@ type ( config *Config timeSource clock.TimeSource logger log.Logger + domainName string } ) @@ -116,6 +117,7 @@ func newMutableStateBuilder( shard ShardContext, eventsCache eventsCache, logger log.Logger, + domainName string, ) *mutableStateBuilder { s := &mutableStateBuilder{ updateActivityInfos: make(map[*persistence.ActivityInfo]struct{}), @@ -153,6 +155,7 @@ func newMutableStateBuilder( config: shard.GetConfig(), timeSource: shard.GetTimeSource(), logger: logger, + domainName: domainName, } s.executionInfo = &persistence.WorkflowExecutionInfo{ DecisionVersion: common.EmptyVersion, @@ -177,8 +180,9 @@ func newMutableStateBuilderWithReplicationState( logger log.Logger, version int64, replicationPolicy cache.ReplicationPolicy, + domainName string, ) *mutableStateBuilder { - s := newMutableStateBuilder(shard, eventsCache, logger) + s := newMutableStateBuilder(shard, eventsCache, logger, domainName) s.replicationState = &persistence.ReplicationState{ StartVersion: version, CurrentVersion: version, @@ -586,7 +590,14 @@ func (e *mutableStateBuilder) assignTaskIDToEvents() error { } func (e *mutableStateBuilder) IsStickyTaskListEnabled() bool { - return len(e.executionInfo.StickyTaskList) > 0 + if e.executionInfo.StickyTaskList == "" { + return false + } + maxDu := e.config.StickyTTL(e.domainName) + if e.timeSource.Now().After(e.executionInfo.LastUpdatedTimestamp.Add(maxDu)) { + return false + } + return true } func (e *mutableStateBuilder) CreateNewHistoryEvent(eventType workflow.EventType) *workflow.HistoryEvent { @@ -2886,9 +2897,10 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent( e.logger, e.GetCurrentVersion(), e.replicationPolicy, + e.domainName, ) } else { - newStateBuilder = newMutableStateBuilder(e.shard, e.eventsCache, e.logger) + newStateBuilder = newMutableStateBuilder(e.shard, e.eventsCache, e.logger, e.domainName) } domainID := domainEntry.GetInfo().ID startedEvent, err := newStateBuilder.addWorkflowExecutionStartedEventForContinueAsNew(domainEntry, parentInfo, newExecution, e, attributes, firstRunID) diff --git a/service/history/mutableStateBuilder_test.go b/service/history/mutableStateBuilder_test.go index 4b4439c2a5d..28eff11c726 100644 --- a/service/history/mutableStateBuilder_test.go +++ b/service/history/mutableStateBuilder_test.go @@ -72,7 +72,7 @@ func (s *mutableStateSuite) SetupTest() { } s.mockEventsCache = &MockEventsCache{} s.msBuilder = newMutableStateBuilder(s.mockShard, s.mockEventsCache, - s.logger) + s.logger, "") } func (s *mutableStateSuite) TearDownTest() { diff --git a/service/history/service.go b/service/history/service.go index 36aeadbdca4..fc670488522 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -157,6 +157,9 @@ type Config struct { SearchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithDomainFilter SearchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithDomainFilter SearchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithDomainFilter + + // StickyTTL is to expire a sticky tasklist if no update more than this duration + StickyTTL dynamicconfig.DurationPropertyFnWithDomainFilter } const ( @@ -254,6 +257,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, enableVisibilit SearchAttributesNumberOfKeysLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.SearchAttributesNumberOfKeysLimit, 100), SearchAttributesSizeOfValueLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.SearchAttributesSizeOfValueLimit, 2*1024), SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024), + StickyTTL: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.StickyTTL, time.Hour*24*365), } return cfg diff --git a/service/history/stateBuilder.go b/service/history/stateBuilder.go index f6cdac4cbe2..e067a60d0d7 100644 --- a/service/history/stateBuilder.go +++ b/service/history/stateBuilder.go @@ -520,6 +520,7 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha b.logger, newRunStartedEvent.GetVersion(), domainEntry.GetReplicationPolicy(), + domainEntry.GetInfo().Name, ) newRunStateBuilder := newStateBuilder(b.shard, newRunMutableStateBuilder, b.logger) diff --git a/service/history/stateBuilder_test.go b/service/history/stateBuilder_test.go index ea2146299a0..f937c3be8f7 100644 --- a/service/history/stateBuilder_test.go +++ b/service/history/stateBuilder_test.go @@ -581,6 +581,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA s.logger, newRunStartedEvent.GetVersion(), cache.ReplicationPolicyMultiCluster, + domainName, ) err = expectedNewRunStateBuilder.ReplicateWorkflowExecutionStartedEvent( cache.NewLocalDomainCacheEntryForTest(&persistence.DomainInfo{ID: domainID}, &persistence.DomainConfig{}, "", nil), @@ -911,6 +912,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA s.logger, newRunStartedEvent.GetVersion(), cache.ReplicationPolicyMultiCluster, + domainName, ) err = expectedNewRunStateBuilder.ReplicateWorkflowExecutionStartedEvent( cache.NewLocalDomainCacheEntryForTest(&persistence.DomainInfo{ID: domainID}, &persistence.DomainConfig{}, "", nil), diff --git a/service/history/timerBuilder_test.go b/service/history/timerBuilder_test.go index ae21f3d2043..9e3fecf015d 100644 --- a/service/history/timerBuilder_test.go +++ b/service/history/timerBuilder_test.go @@ -86,7 +86,7 @@ func (s *timerBuilderProcessorSuite) TestTimerBuilderSingleUserTimer() { tb := newTimerBuilder(s.logger, &mockTimeSource{currTime: time.Now()}) // Add one timer. - msb := newMutableStateBuilder(s.mockShard, s.mockEventsCache, s.logger) + msb := newMutableStateBuilder(s.mockShard, s.mockEventsCache, s.logger, "") msb.Load(&persistence.WorkflowMutableState{ ExecutionInfo: &persistence.WorkflowExecutionInfo{NextEventID: int64(201)}, TimerInfos: make(map[string]*persistence.TimerInfo), @@ -119,7 +119,7 @@ func (s *timerBuilderProcessorSuite) TestTimerBuilderMulitpleUserTimer() { // Add two timers. (before and after) tp := &persistence.TimerInfo{TimerID: "tid1", StartedID: 201, TaskID: 101, ExpiryTime: time.Now().Add(10 * time.Second)} timerInfos := map[string]*persistence.TimerInfo{"tid1": tp} - msb := newMutableStateBuilder(s.mockShard, s.mockEventsCache, s.logger) + msb := newMutableStateBuilder(s.mockShard, s.mockEventsCache, s.logger, "") msb.Load(&persistence.WorkflowMutableState{ ExecutionInfo: &persistence.WorkflowExecutionInfo{NextEventID: int64(202)}, TimerInfos: timerInfos, @@ -148,7 +148,7 @@ func (s *timerBuilderProcessorSuite) TestTimerBuilderMulitpleUserTimer() { tb = newTimerBuilder(s.logger, &mockTimeSource{currTime: time.Now()}) tp2 := &persistence.TimerInfo{TimerID: "tid1", StartedID: 201, TaskID: TimerTaskStatusNone, ExpiryTime: time.Now().Add(10 * time.Second)} timerInfos = map[string]*persistence.TimerInfo{"tid1": tp2} - msb = newMutableStateBuilder(s.mockShard, s.mockEventsCache, s.logger) + msb = newMutableStateBuilder(s.mockShard, s.mockEventsCache, s.logger, "") msb.Load(&persistence.WorkflowMutableState{ ExecutionInfo: &persistence.WorkflowExecutionInfo{NextEventID: int64(203)}, TimerInfos: timerInfos, @@ -176,7 +176,7 @@ func (s *timerBuilderProcessorSuite) TestTimerBuilderMulitpleUserTimer() { func (s *timerBuilderProcessorSuite) TestTimerBuilderDuplicateTimerID() { tp := &persistence.TimerInfo{TimerID: "tid-exist", StartedID: 201, TaskID: 101, ExpiryTime: time.Now().Add(10 * time.Second)} timerInfos := map[string]*persistence.TimerInfo{"tid-exist": tp} - msb := newMutableStateBuilder(s.mockShard, s.mockEventsCache, s.logger) + msb := newMutableStateBuilder(s.mockShard, s.mockEventsCache, s.logger, "") msb.Load(&persistence.WorkflowMutableState{ ExecutionInfo: &persistence.WorkflowExecutionInfo{NextEventID: int64(203)}, TimerInfos: timerInfos, @@ -192,7 +192,7 @@ func (s *timerBuilderProcessorSuite) TestTimerBuilderDuplicateTimerID() { func (s *timerBuilderProcessorSuite) TestTimerBuilder_GetActivityTimer() { // ScheduleToStart being more than HB. - builder := newMutableStateBuilder(s.mockShard, s.mockEventsCache, s.logger) + builder := newMutableStateBuilder(s.mockShard, s.mockEventsCache, s.logger, "") ase, ai, err := builder.AddActivityTaskScheduledEvent(common.EmptyEventID, &workflow.ScheduleActivityTaskDecisionAttributes{ ActivityId: common.StringPtr("test-id"), diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index 28317ec5545..247c969fb01 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -38,6 +38,7 @@ import ( type ( workflowExecutionContext interface { + getDomainName() string getDomainID() string getExecution() *workflow.WorkflowExecution getLogger() log.Logger @@ -252,6 +253,7 @@ func (c *workflowExecutionContextImpl) loadWorkflowExecutionInternal() error { c.shard, c.shard.GetEventsCache(), c.logger, + c.getDomainName(), ) c.msBuilder.Load(response.State) c.stats = response.State.ExecutionStats diff --git a/service/history/workflowResetor.go b/service/history/workflowResetor.go index 29f1c2d7dbf..8c3890abd07 100644 --- a/service/history/workflowResetor.go +++ b/service/history/workflowResetor.go @@ -623,9 +623,10 @@ func (w *workflowResetorImpl) replayHistoryEvents( w.eng.logger, firstEvent.GetVersion(), domainEntry.GetReplicationPolicy(), + domainEntry.GetInfo().Name, ) } else { - resetMutableState = newMutableStateBuilder(w.eng.shard, w.eng.shard.GetEventsCache(), w.eng.logger) + resetMutableState = newMutableStateBuilder(w.eng.shard, w.eng.shard.GetEventsCache(), w.eng.logger, domainEntry.GetInfo().Name) } resetMutableState.executionInfo.EventStoreVersion = persistence.EventStoreVersionV2 @@ -830,6 +831,11 @@ func (w *workflowResetorImpl) replicateResetEvent( var sBuilder stateBuilder var wfTimeoutSecs int64 + domainEntry, retError := w.eng.shard.GetDomainCache().GetDomainByID(domainID) + if retError != nil { + return + } + // replay old history from beginning of the baseRun upto decisionFinishEventID(exclusive) var nextPageToken []byte var lastEvent *workflow.HistoryEvent @@ -861,6 +867,7 @@ func (w *workflowResetorImpl) replicateResetEvent( // if can see replication task, meaning that domain is // global domain with > 1 target clusters cache.ReplicationPolicyMultiCluster, + domainEntry.GetInfo().Name, ) newMsBuilder.GetExecutionInfo().EventStoreVersion = persistence.EventStoreVersionV2 sBuilder = newStateBuilder(w.eng.shard, newMsBuilder, w.eng.logger)