Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate timer builder #2725

Merged
merged 12 commits into from
Oct 31, 2019
Merged
2 changes: 1 addition & 1 deletion common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ type (
StartedID int64
StartedEvent *workflow.HistoryEvent
StartedTime time.Time
DomainID string
ActivityID string
RequestID string
Details []byte
Expand All @@ -647,7 +648,6 @@ type (
TimerTaskStatus int32
// For retry
Attempt int32
DomainID string
StartedIdentity string
TaskList string
HasRetryPolicy bool
Expand Down
16 changes: 8 additions & 8 deletions service/history/MockTimerQueueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,29 +93,29 @@ func (_m *MockTimerQueueAckMgr) completeTimerTask(timerTask *persistence.TimerTa
_m.Called(timerTask)
}

func (_m *MockTimerQueueAckMgr) getAckLevel() TimerSequenceID {
func (_m *MockTimerQueueAckMgr) getAckLevel() timerKey {
ret := _m.Called()

var r0 TimerSequenceID
if rf, ok := ret.Get(0).(func() TimerSequenceID); ok {
var r0 timerKey
if rf, ok := ret.Get(0).(func() timerKey); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(TimerSequenceID)
r0 = ret.Get(0).(timerKey)
}
}
return r0
}

func (_m *MockTimerQueueAckMgr) getReadLevel() TimerSequenceID {
func (_m *MockTimerQueueAckMgr) getReadLevel() timerKey {
ret := _m.Called()

var r0 TimerSequenceID
if rf, ok := ret.Get(0).(func() TimerSequenceID); ok {
var r0 timerKey
if rf, ok := ret.Get(0).(func() timerKey); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(TimerSequenceID)
r0 = ret.Get(0).(timerKey)
}
}
return r0
Expand Down
10 changes: 3 additions & 7 deletions service/history/decisionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (handler *decisionHandlerImpl) handleDecisionTaskScheduled(
}

return handler.historyEngine.updateWorkflowExecutionWithAction(ctx, domainID, execution,
func(msBuilder mutableState, tBuilder *timerBuilder) (*updateWorkflowAction, error) {
func(msBuilder mutableState) (*updateWorkflowAction, error) {
if !msBuilder.IsWorkflowExecutionRunning() {
return nil, ErrWorkflowCompleted
}
Expand Down Expand Up @@ -155,7 +155,7 @@ func (handler *decisionHandlerImpl) handleDecisionTaskStarted(

var resp *h.RecordDecisionTaskStartedResponse
err = handler.historyEngine.updateWorkflowExecutionWithAction(ctx, domainID, execution,
func(msBuilder mutableState, tBuilder *timerBuilder) (*updateWorkflowAction, error) {
func(msBuilder mutableState) (*updateWorkflowAction, error) {
if !msBuilder.IsWorkflowExecutionRunning() {
return nil, ErrWorkflowCompleted
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func (handler *decisionHandlerImpl) handleDecisionTaskFailed(
}

return handler.historyEngine.updateWorkflowExecution(ctx, domainID, workflowExecution, true,
func(msBuilder mutableState, tBuilder *timerBuilder) error {
func(msBuilder mutableState) error {
if !msBuilder.IsWorkflowExecutionRunning() {
return ErrWorkflowCompleted
}
Expand Down Expand Up @@ -304,9 +304,6 @@ Update_History_Loop:
}

executionInfo := msBuilder.GetExecutionInfo()
timerBuilderProvider := func() *timerBuilder {
return handler.historyEngine.getTimerBuilder(context.getExecution())
}

scheduleID := token.ScheduleID
currentDecision, isRunning := msBuilder.GetDecisionInfo(scheduleID)
Expand Down Expand Up @@ -413,7 +410,6 @@ Update_History_Loop:
handler.decisionAttrValidator,
workflowSizeChecker,
handler.logger,
timerBuilderProvider,
handler.domainCache,
handler.metricsClient,
handler.config,
Expand Down
28 changes: 8 additions & 20 deletions service/history/decisionTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (
)

type (
timerBuilderProvider func() *timerBuilder

decisionAttrValidationFn func() error

decisionTaskHandlerImpl struct {
Expand All @@ -46,7 +44,6 @@ type (

// internal state
hasUnhandledEventsBeforeDecisions bool
timerBuilder *timerBuilder
failDecision bool
failDecisionCause *workflow.DecisionTaskFailedCause
failMessage *string
Expand All @@ -59,11 +56,10 @@ type (
attrValidator *decisionAttrValidator
sizeLimitChecker *workflowSizeChecker

logger log.Logger
timerBuilderProvider timerBuilderProvider
domainCache cache.DomainCache
metricsClient metrics.Client
config *Config
logger log.Logger
domainCache cache.DomainCache
metricsClient metrics.Client
config *Config
}
)

Expand All @@ -75,7 +71,6 @@ func newDecisionTaskHandler(
attrValidator *decisionAttrValidator,
sizeLimitChecker *workflowSizeChecker,
logger log.Logger,
timerBuilderProvider timerBuilderProvider,
domainCache cache.DomainCache,
metricsClient metrics.Client,
config *Config,
Expand All @@ -100,12 +95,10 @@ func newDecisionTaskHandler(
attrValidator: attrValidator,
sizeLimitChecker: sizeLimitChecker,

logger: logger,
timerBuilder: timerBuilderProvider(),
timerBuilderProvider: timerBuilderProvider,
domainCache: domainCache,
metricsClient: metricsClient,
config: config,
logger: logger,
domainCache: domainCache,
metricsClient: metricsClient,
config: config,
}
}

Expand Down Expand Up @@ -503,11 +496,6 @@ func (handler *decisionTaskHandlerImpl) handleDecisionCancelTimer(
handler.identity)
switch err.(type) {
case nil:
// timer deletion is success. we need to rebuild the timer builder
// since timer builder has a local cached version of timers
handler.timerBuilder = handler.timerBuilderProvider()
handler.timerBuilder.loadUserTimers(handler.mutableState)

// timer deletion is a success, we may have deleted a fired timer in
// which case we should reset hasBufferedEvents
// TODO deletion of timer fired event refreshing hasUnhandledEventsBeforeDecisions
Expand Down
18 changes: 9 additions & 9 deletions service/history/historyCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ func (s *historyCacheSuite) TestHistoryCacheBasic() {
mockMS1 := NewMockmutableState(s.controller)
context, release, err := s.cache.getOrCreateWorkflowExecutionForBackground(domainID, execution1)
s.Nil(err)
context.(*workflowExecutionContextImpl).msBuilder = mockMS1
context.(*workflowExecutionContextImpl).mutableState = mockMS1
release(nil)
context, release, err = s.cache.getOrCreateWorkflowExecutionForBackground(domainID, execution1)
s.Nil(err)
s.Equal(mockMS1, context.(*workflowExecutionContextImpl).msBuilder)
s.Equal(mockMS1, context.(*workflowExecutionContextImpl).mutableState)
release(nil)

execution2 := workflow.WorkflowExecution{
Expand All @@ -149,7 +149,7 @@ func (s *historyCacheSuite) TestHistoryCacheBasic() {
}
context, release, err = s.cache.getOrCreateWorkflowExecutionForBackground(domainID, execution2)
s.Nil(err)
s.NotEqual(mockMS1, context.(*workflowExecutionContextImpl).msBuilder)
s.NotEqual(mockMS1, context.(*workflowExecutionContextImpl).mutableState)
release(nil)
}

Expand Down Expand Up @@ -201,21 +201,21 @@ func (s *historyCacheSuite) TestHistoryCacheClear() {
s.Nil(err)
// since we are just testing whether the release function will clear the cache
// all we need is a fake msBuilder
context.(*workflowExecutionContextImpl).msBuilder = &mutableStateBuilder{}
context.(*workflowExecutionContextImpl).mutableState = &mutableStateBuilder{}
release(nil)

// since last time, the release function receive a nil error
// the ms builder will not be cleared
context, release, err = s.cache.getOrCreateWorkflowExecutionForBackground(domainID, we)
s.Nil(err)
s.NotNil(context.(*workflowExecutionContextImpl).msBuilder)
s.NotNil(context.(*workflowExecutionContextImpl).mutableState)
release(errors.New("some random error message"))

// since last time, the release function receive a non-nil error
// the ms builder will be cleared
context, release, err = s.cache.getOrCreateWorkflowExecutionForBackground(domainID, we)
s.Nil(err)
s.Nil(context.(*workflowExecutionContextImpl).msBuilder)
s.Nil(context.(*workflowExecutionContextImpl).mutableState)
release(nil)
}

Expand All @@ -236,10 +236,10 @@ func (s *historyCacheSuite) TestHistoryCacheConcurrentAccess() {
context, release, err := s.cache.getOrCreateWorkflowExecutionForBackground(domainID, we)
s.Nil(err)
// since each time the builder is reset to nil
s.Nil(context.(*workflowExecutionContextImpl).msBuilder)
s.Nil(context.(*workflowExecutionContextImpl).mutableState)
// since we are just testing whether the release function will clear the cache
// all we need is a fake msBuilder
context.(*workflowExecutionContextImpl).msBuilder = &mutableStateBuilder{}
context.(*workflowExecutionContextImpl).mutableState = &mutableStateBuilder{}
release(errors.New("some random error message"))
waitGroup.Done()
}
Expand All @@ -255,6 +255,6 @@ func (s *historyCacheSuite) TestHistoryCacheConcurrentAccess() {
s.Nil(err)
// since we are just testing whether the release function will clear the cache
// all we need is a fake msBuilder
s.Nil(context.(*workflowExecutionContextImpl).msBuilder)
s.Nil(context.(*workflowExecutionContextImpl).mutableState)
release(nil)
}
Loading