Skip to content

Commit

Permalink
Handle deletion of task during failover (uber#976)
Browse files Browse the repository at this point in the history
* Adding failover info persistence struct
* Adding awareness of failover info during transfer / timer task cleanup

Note: missing persistence to database, the persistence to database of
failover cursor should be done along with balk failover for performance.
  • Loading branch information
wxing1292 authored Jul 24, 2018
1 parent 9b832fa commit 287a0da
Show file tree
Hide file tree
Showing 20 changed files with 447 additions and 149 deletions.
18 changes: 18 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,27 @@ type (
TimerAckLevel time.Time
ClusterTransferAckLevel map[string]int64
ClusterTimerAckLevel map[string]time.Time
TransferFailoverLevels map[string]TransferFailoverLevel // uuid -> TransferFailoverLevel
TimerFailoverLevels map[string]TimerFailoverLevel // uuid -> TimerFailoverLevel
DomainNotificationVersion int64
}

// TransferFailoverLevel contains corresponding start / end level
TransferFailoverLevel struct {
MinLevel int64
CurrentLevel int64
MaxLevel int64
DomainIDs []string
}

// TimerFailoverLevel contains domain IDs and corresponding start / end level
TimerFailoverLevel struct {
MinLevel time.Time
CurrentLevel time.Time
MaxLevel time.Time
DomainIDs []string
}

// WorkflowExecutionInfo describes a workflow execution
WorkflowExecutionInfo struct {
DomainID string
Expand Down
13 changes: 13 additions & 0 deletions service/history/MockProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,16 @@ func (_m *MockProcessor) updateAckLevel(taskID int64) error {
}
return r0
}

// queueShutdown is mock implementation for queueShutdown of Processor
func (_m *MockProcessor) queueShutdown() error {
ret := _m.Called()

var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
1 change: 1 addition & 0 deletions service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type (
readTasks(readLevel int64) ([]queueTaskInfo, bool, error)
completeTask(taskID int64) error
updateAckLevel(taskID int64) error
queueShutdown() error
}

transferQueueProcessor interface {
Expand Down
60 changes: 60 additions & 0 deletions service/history/historyTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,66 @@ func (s *TestShardContext) UpdateTimerClusterAckLevel(cluster string, ackLevel t
return nil
}

// UpdateTransferFailoverLevel test implementation
func (s *TestShardContext) UpdateTransferFailoverLevel(failoverID string, level persistence.TransferFailoverLevel) error {
s.Lock()
defer s.Unlock()

s.shardInfo.TransferFailoverLevels[failoverID] = level
return nil
}

// DeleteTransferFailoverLevel test implementation
func (s *TestShardContext) DeleteTransferFailoverLevel(failoverID string) error {
s.Lock()
defer s.Unlock()

delete(s.shardInfo.TransferFailoverLevels, failoverID)
return nil
}

// GetAllTransferFailoverLevels test implementation
func (s *TestShardContext) GetAllTransferFailoverLevels() map[string]persistence.TransferFailoverLevel {
s.RLock()
defer s.RUnlock()

ret := map[string]persistence.TransferFailoverLevel{}
for k, v := range s.shardInfo.TransferFailoverLevels {
ret[k] = v
}
return ret
}

// UpdateTimerFailoverLevel test implementation
func (s *TestShardContext) UpdateTimerFailoverLevel(failoverID string, level persistence.TimerFailoverLevel) error {
s.Lock()
defer s.Unlock()

s.shardInfo.TimerFailoverLevels[failoverID] = level
return nil
}

// DeleteTimerFailoverLevel test implementation
func (s *TestShardContext) DeleteTimerFailoverLevel(failoverID string) error {
s.Lock()
defer s.Unlock()

delete(s.shardInfo.TimerFailoverLevels, failoverID)
return nil
}

// GetAllTimerFailoverLevels test implementation
func (s *TestShardContext) GetAllTimerFailoverLevels() map[string]persistence.TimerFailoverLevel {
s.RLock()
defer s.RUnlock()

ret := map[string]persistence.TimerFailoverLevel{}
for k, v := range s.shardInfo.TimerFailoverLevels {
ret[k] = v
}
return ret
}

// GetDomainNotificationVersion test implementation
func (s *TestShardContext) GetDomainNotificationVersion() int64 {
s.RLock()
Expand Down
17 changes: 8 additions & 9 deletions service/history/queueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,19 +191,18 @@ MoveAckLevelLoop:
a.ackLevel = ackLevel

if a.isFailover && a.isReadFinished && len(a.outstandingTasks) == 0 {
a.Unlock()
// this means in failover mode, all possible failover transfer tasks
// are processed and we are free to shundown
a.logger.Debugf("Queue ack manager shutdoen.")
a.logger.Debugf("Queue ack manager shutdown.")
a.finishedChan <- struct{}{}
a.processor.queueShutdown()
return
}
a.Unlock()

if !a.isFailover {
if err := a.processor.updateAckLevel(ackLevel); err != nil {
a.metricsClient.IncCounter(a.options.MetricScope, metrics.AckLevelUpdateFailedCounter)
logging.LogOperationFailedEvent(a.logger, "Error updating ack level for shard", err)
}
} else {
// TODO deal with failover ack level persistence, issue #646
a.Unlock()
if err := a.processor.updateAckLevel(ackLevel); err != nil {
a.metricsClient.IncCounter(a.options.MetricScope, metrics.AckLevelUpdateFailedCounter)
logging.LogOperationFailedEvent(a.logger, "Error updating ack level for shard", err)
}
}
6 changes: 4 additions & 2 deletions service/history/queueAckMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (s *queueFailoverAckMgrSuite) TearDownTest() {

}

func (s *queueFailoverAckMgrSuite) TestReadTimerTasks() {
func (s *queueFailoverAckMgrSuite) TestReadQueueTasks() {
readLevel := s.queueFailoverAckMgr.readLevel
// when the ack manager is first initialized, read == ack level
s.Equal(s.queueFailoverAckMgr.getQueueAckLevel(), readLevel)
Expand Down Expand Up @@ -410,7 +410,7 @@ func (s *queueFailoverAckMgrSuite) TestReadTimerTasks() {
s.True(s.queueFailoverAckMgr.isReadFinished)
}

func (s *queueFailoverAckMgrSuite) TestReadCompleteTimerTasks() {
func (s *queueFailoverAckMgrSuite) TestReadCompleteQueueTasks() {
readLevel := s.queueFailoverAckMgr.readLevel
// when the ack manager is first initialized, read == ack level
s.Equal(s.queueFailoverAckMgr.getQueueAckLevel(), readLevel)
Expand Down Expand Up @@ -450,6 +450,7 @@ func (s *queueFailoverAckMgrSuite) TestReadCompleteTimerTasks() {
s.mockProcessor.On("completeTask", taskID2).Return(nil).Once()
s.queueFailoverAckMgr.completeQueueTask(taskID2)
s.Equal(map[int64]bool{taskID1: false, taskID2: true}, s.queueFailoverAckMgr.outstandingTasks)
s.mockProcessor.On("updateAckLevel", s.queueFailoverAckMgr.getQueueAckLevel()).Return(nil)
s.queueFailoverAckMgr.updateQueueAckLevel()
select {
case <-s.queueFailoverAckMgr.getFinishedChan():
Expand All @@ -460,6 +461,7 @@ func (s *queueFailoverAckMgrSuite) TestReadCompleteTimerTasks() {
s.mockProcessor.On("completeTask", taskID1).Return(nil).Once()
s.queueFailoverAckMgr.completeQueueTask(taskID1)
s.Equal(map[int64]bool{taskID1: true, taskID2: true}, s.queueFailoverAckMgr.outstandingTasks)
s.mockProcessor.On("queueShutdown").Return(nil)
s.queueFailoverAckMgr.updateQueueAckLevel()
select {
case <-s.queueFailoverAckMgr.getFinishedChan():
Expand Down
11 changes: 6 additions & 5 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ func (p *queueProcessorBase) processorPump() {

jitter := backoff.NewJitter()
pollTimer := time.NewTimer(jitter.JitDuration(p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient()))
updateAckTimer := time.NewTimer(p.options.UpdateAckInterval())
defer pollTimer.Stop()

updateAckTicker := time.NewTicker(p.options.UpdateAckInterval())
defer updateAckTicker.Stop()

processorPumpLoop:
for {
Expand All @@ -173,9 +176,8 @@ processorPumpLoop:
if p.lastPollTime.Add(p.options.MaxPollInterval()).Before(time.Now()) {
p.processBatch(tasksCh)
}
case <-updateAckTimer.C:
case <-updateAckTicker.C:
p.ackMgr.updateQueueAckLevel()
updateAckTimer = time.NewTimer(p.options.UpdateAckInterval())
}
}

Expand All @@ -185,8 +187,7 @@ processorPumpLoop:
if success := common.AwaitWaitGroup(&workerWG, 10*time.Second); !success {
p.logger.Warn("Queue processor timedout on worker shutdown.")
}
updateAckTimer.Stop()
pollTimer.Stop()

}

func (p *queueProcessorBase) processBatch(tasksCh chan<- queueTaskInfo) {
Expand Down
5 changes: 5 additions & 0 deletions service/history/replicatorQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ func (p *replicatorQueueProcessorImpl) process(qTask queueTaskInfo) error {
return err
}

func (p *replicatorQueueProcessorImpl) queueShutdown() error {
// there is no shutdown specific behavior for replication queue
return nil
}

func (p *replicatorQueueProcessorImpl) processHistoryReplicationTask(task *persistence.ReplicationTaskInfo) error {
p.metricsClient.IncCounter(metrics.ReplicatorTaskHistoryScope, metrics.TaskRequests)
sw := p.metricsClient.StartTimer(metrics.ReplicatorTaskHistoryScope, metrics.TaskLatency)
Expand Down
70 changes: 70 additions & 0 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ type (
UpdateTimerAckLevel(ackLevel time.Time) error
GetTimerClusterAckLevel(cluster string) time.Time
UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error
UpdateTransferFailoverLevel(failoverID string, level persistence.TransferFailoverLevel) error
DeleteTransferFailoverLevel(failoverID string) error
GetAllTransferFailoverLevels() map[string]persistence.TransferFailoverLevel
UpdateTimerFailoverLevel(failoverID string, level persistence.TimerFailoverLevel) error
DeleteTimerFailoverLevel(failoverID string) error
GetAllTimerFailoverLevels() map[string]persistence.TimerFailoverLevel
GetDomainNotificationVersion() int64
UpdateDomainNotificationVersion(domainNotificationVersion int64) error
CreateWorkflowExecution(request *persistence.CreateWorkflowExecutionRequest) (
Expand Down Expand Up @@ -232,6 +238,60 @@ func (s *shardContextImpl) UpdateTimerClusterAckLevel(cluster string, ackLevel t
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) UpdateTransferFailoverLevel(failoverID string, level persistence.TransferFailoverLevel) error {
s.Lock()
defer s.Unlock()

s.shardInfo.TransferFailoverLevels[failoverID] = level
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) DeleteTransferFailoverLevel(failoverID string) error {
s.Lock()
defer s.Unlock()

delete(s.shardInfo.TransferFailoverLevels, failoverID)
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) GetAllTransferFailoverLevels() map[string]persistence.TransferFailoverLevel {
s.RLock()
defer s.RUnlock()

ret := map[string]persistence.TransferFailoverLevel{}
for k, v := range s.shardInfo.TransferFailoverLevels {
ret[k] = v
}
return ret
}

func (s *shardContextImpl) UpdateTimerFailoverLevel(failoverID string, level persistence.TimerFailoverLevel) error {
s.Lock()
defer s.Unlock()

s.shardInfo.TimerFailoverLevels[failoverID] = level
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) DeleteTimerFailoverLevel(failoverID string) error {
s.Lock()
defer s.Unlock()

delete(s.shardInfo.TimerFailoverLevels, failoverID)
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) GetAllTimerFailoverLevels() map[string]persistence.TimerFailoverLevel {
s.RLock()
defer s.RUnlock()

ret := map[string]persistence.TimerFailoverLevel{}
for k, v := range s.shardInfo.TimerFailoverLevels {
ret[k] = v
}
return ret
}

func (s *shardContextImpl) GetDomainNotificationVersion() int64 {
s.RLock()
defer s.RUnlock()
Expand Down Expand Up @@ -756,6 +816,14 @@ func acquireShard(shardID int, svc service.Service, shardManager persistence.Sha
}

func copyShardInfo(shardInfo *persistence.ShardInfo) *persistence.ShardInfo {
transferFailoverLevels := map[string]persistence.TransferFailoverLevel{}
for k, v := range shardInfo.TransferFailoverLevels {
transferFailoverLevels[k] = v
}
timerFailoverLevels := map[string]persistence.TimerFailoverLevel{}
for k, v := range shardInfo.TimerFailoverLevels {
timerFailoverLevels[k] = v
}
clusterTransferAckLevel := make(map[string]int64)
for k, v := range shardInfo.ClusterTransferAckLevel {
clusterTransferAckLevel[k] = v
Expand All @@ -772,6 +840,8 @@ func copyShardInfo(shardInfo *persistence.ShardInfo) *persistence.ShardInfo {
ReplicationAckLevel: shardInfo.ReplicationAckLevel,
TransferAckLevel: shardInfo.TransferAckLevel,
TimerAckLevel: shardInfo.TimerAckLevel,
TransferFailoverLevels: transferFailoverLevels,
TimerFailoverLevels: timerFailoverLevels,
ClusterTransferAckLevel: clusterTransferAckLevel,
ClusterTimerAckLevel: clusterTimerAckLevel,
DomainNotificationVersion: shardInfo.DomainNotificationVersion,
Expand Down
8 changes: 8 additions & 0 deletions service/history/shardController_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ func (s *shardControllerSuite) TestAcquireShardSuccess() {
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
TransferFailoverLevels: map[string]persistence.TransferFailoverLevel{},
TimerFailoverLevels: map[string]persistence.TimerFailoverLevel{},
},
PreviousRangeID: 5,
}).Return(nil).Once()
Expand Down Expand Up @@ -247,6 +249,8 @@ func (s *shardControllerSuite) TestAcquireShardRenewSuccess() {
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
TransferFailoverLevels: map[string]persistence.TransferFailoverLevel{},
TimerFailoverLevels: map[string]persistence.TimerFailoverLevel{},
},
PreviousRangeID: 5,
}).Return(nil).Once()
Expand Down Expand Up @@ -320,6 +324,8 @@ func (s *shardControllerSuite) TestAcquireShardRenewLookupFailed() {
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
TransferFailoverLevels: map[string]persistence.TransferFailoverLevel{},
TimerFailoverLevels: map[string]persistence.TimerFailoverLevel{},
},
PreviousRangeID: 5,
}).Return(nil).Once()
Expand Down Expand Up @@ -614,6 +620,8 @@ func (s *shardControllerSuite) setupMocksForAcquireShard(shardID int, mockEngine
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
TransferFailoverLevels: map[string]persistence.TransferFailoverLevel{},
TimerFailoverLevels: map[string]persistence.TimerFailoverLevel{},
},
PreviousRangeID: currentRangeID,
}).Return(nil).Once()
Expand Down
Loading

0 comments on commit 287a0da

Please sign in to comment.