Skip to content

Commit

Permalink
Bugfix: timer active / failover processor should notify active proces…
Browse files Browse the repository at this point in the history
…sor (uber#1347)
  • Loading branch information
wxing1292 authored Dec 19, 2018
1 parent 720ddb5 commit bff2bc2
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
11 changes: 5 additions & 6 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ Update_History_Loop:
// Update the task ID tracking the corresponding timer task.
ti.TaskID = TimerTaskStatusCreated
msBuilder.UpdateUserTimer(ti.TimerID, ti)
defer t.notifyNewTimers(timerTasks)
}

// Done!
Expand Down Expand Up @@ -457,7 +456,6 @@ Update_History_Loop:
return err
}

t.notifyNewTimers(timerTasks)
return nil
}

Expand Down Expand Up @@ -754,12 +752,13 @@ Update_History_Loop:

timersToNotify := append(timerTasks, msBuilder.GetContinueAsNew().TimerTasks...)
err = context.continueAsNewWorkflowExecution(nil, continueAsNewBuilder, transferTasks, timerTasks, transactionID)
t.notifyNewTimers(timersToNotify)

if err != nil {
if err == ErrConflict {
continue Update_History_Loop
}
} else {
t.historyService.timerProcessor.NotifyNewTimers(t.currentClusterName, t.shard.GetCurrentTime(t.currentClusterName), timersToNotify)
}
return err
}
Expand Down Expand Up @@ -789,7 +788,7 @@ func (t *timerQueueActiveProcessorImpl) updateWorkflowExecution(
tBuilder := t.historyService.getTimerBuilder(context.getExecution())
tranT, timerT, err := t.historyService.getDeleteWorkflowTasks(executionInfo.DomainID, executionInfo.WorkflowID, tBuilder)
if err != nil {
return nil
return err
}
transferTasks = append(transferTasks, tranT)
timerTasks = append(timerTasks, timerT)
Expand All @@ -811,6 +810,6 @@ func (t *timerQueueActiveProcessorImpl) updateWorkflowExecution(
return err
}

t.notifyNewTimers(timerTasks)
return err
t.historyService.timerProcessor.NotifyNewTimers(t.currentClusterName, t.shard.GetCurrentTime(t.currentClusterName), timerTasks)
return nil
}
35 changes: 19 additions & 16 deletions service/history/timerQueueProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type (

mockMetadataMgr *mocks.MetadataManager
mockVisibilityMgr *mocks.VisibilityManager
mockMatchingClient *mocks.MatchingClient
mockClusterMetadata *mocks.ClusterMetadata
}
)
Expand Down Expand Up @@ -80,6 +81,7 @@ func (s *timerQueueProcessorSuite) SetupTest() {
s.ShardContext.config.TransferProcessorUpdateAckInterval = dynamicconfig.GetDurationPropertyFn(100 * time.Millisecond)
s.ShardContext.config.TimerProcessorUpdateAckInterval = dynamicconfig.GetDurationPropertyFn(100 * time.Millisecond)

s.mockMatchingClient = &mocks.MatchingClient{}
s.mockClusterMetadata = &mocks.ClusterMetadata{}
s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName)

Expand All @@ -101,6 +103,7 @@ func (s *timerQueueProcessorSuite) SetupTest() {
s.engineImpl.txProcessor = newTransferQueueProcessor(
s.ShardContext, s.engineImpl, s.mockVisibilityMgr, &mocks.KafkaProducer{}, &mocks.MatchingClient{}, &mocks.HistoryClient{}, s.logger,
)
s.engineImpl.timerProcessor = newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.mockMatchingClient, s.logger)
}

func (s *timerQueueProcessorSuite) TearDownTest() {
Expand Down Expand Up @@ -280,7 +283,7 @@ func (s *timerQueueProcessorSuite) TestSingleTimerTask() {
s.NotEmpty(timerInfo, "Expected non empty timers list")
s.Equal(1, len(timerInfo))

processor := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
processor := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
processor.Start()
processor.NotifyNewTimers(cluster.TestCurrentClusterName, s.ShardContext.GetCurrentTime(cluster.TestCurrentClusterName), tt)

Expand All @@ -303,7 +306,7 @@ func (s *timerQueueProcessorSuite) TestManyTimerTasks() {
s.NotEmpty(timerInfo, "Expected non empty timers list")
s.Equal(1, len(timerInfo))

processor := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
processor := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
processor.Start()
processor.NotifyNewTimers(cluster.TestCurrentClusterName, s.ShardContext.GetCurrentTime(cluster.TestCurrentClusterName), tt)

Expand All @@ -326,7 +329,7 @@ func (s *timerQueueProcessorSuite) TestTimerTaskAfterProcessorStart() {
s.NoError(err, "No error expected.")
s.Empty(timerInfo, "Expected empty timers list")

processor := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
processor := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
processor.Start()

tBuilder := newTimerBuilder(s.ShardContext.GetConfig(), s.logger, common.NewRealTimeSource())
Expand Down Expand Up @@ -401,7 +404,7 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskScheduleToStart_WithOutS
s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{})

// TimeoutTypeScheduleToStart - Without Start
processor := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
processor := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
processor.Start()

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
Expand Down Expand Up @@ -444,7 +447,7 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskScheduleToStart_WithStar
s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{})

// TimeoutTypeScheduleToStart - With Start
p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
p := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
p.Start()

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
Expand Down Expand Up @@ -489,7 +492,7 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskScheduleToStart_MoreThan
s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{})

// TimeoutTypeScheduleToStart - Without Start
processor := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
processor := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
processor.Start()

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
Expand Down Expand Up @@ -534,7 +537,7 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskStartToClose_WithStart()
s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{})

// TimeoutTypeStartToClose - Just start.
p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
p := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
p.Start()

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
Expand Down Expand Up @@ -577,7 +580,7 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskStartToClose_CompletedAc
s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{})

// TimeoutTypeStartToClose - Start and Completed activity.
p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
p := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
p.Start()

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
Expand Down Expand Up @@ -625,7 +628,7 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskScheduleToClose_JustSche
s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{})

// TimeoutTypeScheduleToClose - Just Scheduled.
p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
p := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
p.Start()

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
Expand Down Expand Up @@ -668,7 +671,7 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskScheduleToClose_Started(
s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{})

// TimeoutTypeScheduleToClose - Scheduled and started.
p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
p := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
p.Start()

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
Expand Down Expand Up @@ -713,7 +716,7 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskScheduleToClose_Complete
s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{})

// TimeoutTypeScheduleToClose - Scheduled, started, completed.
p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
p := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
p.Start()

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
Expand Down Expand Up @@ -763,7 +766,7 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskHeartBeat_JustStarted()
s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{})

// TimeoutTypeHeartbeat - Scheduled, started.
p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
p := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
p.Start()

tBuilder := newTimerBuilder(s.ShardContext.GetConfig(), s.logger, &mockTimeSource{currTime: time.Now()})
Expand All @@ -788,7 +791,7 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTask_SameExpiry() {
s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{})

// TimeoutTypeScheduleToClose - Scheduled, started, completed.
p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
p := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
p.Start()

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
Expand Down Expand Up @@ -852,7 +855,7 @@ func (s *timerQueueProcessorSuite) TestTimerUserTimers() {
s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{})

// Single timer.
p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
p := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
p.Start()

tBuilder := newTimerBuilder(s.ShardContext.GetConfig(), s.logger, &mockTimeSource{currTime: time.Now()})
Expand All @@ -875,7 +878,7 @@ func (s *timerQueueProcessorSuite) TestTimerUserTimers_SameExpiry() {
s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{})

// Two timers.
p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
p := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
p.Start()

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
Expand Down Expand Up @@ -926,7 +929,7 @@ func (s *timerQueueProcessorSuite) TestTimersOnClosedWorkflow() {
taskList := "closed-workflow-queue"
s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{})

p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.matchingClient, s.logger).(*timerQueueProcessorImpl)
p := s.engineImpl.timerProcessor.(*timerQueueProcessorImpl)
p.Start()

tBuilder := newTimerBuilder(s.ShardContext.GetConfig(), s.logger, &mockTimeSource{currTime: time.Now()})
Expand Down

0 comments on commit bff2bc2

Please sign in to comment.