Skip to content

Commit

Permalink
Priority Task Processor Improvements (uber#3284)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored May 27, 2020
1 parent da4cffa commit 34bac76
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 30 deletions.
13 changes: 13 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ const (
retryKafkaOperationMaxInterval = 10 * time.Second
retryKafkaOperationExpirationInterval = 30 * time.Second

retryTaskProcessingInitialInterval = 50 * time.Millisecond
retryTaskProcessingMaxInterval = 100 * time.Microsecond
retryTaskProcessingMaxAttempts = 3

contextExpireThreshold = 10 * time.Millisecond

// FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit
Expand Down Expand Up @@ -176,6 +180,15 @@ func CreateKafkaOperationRetryPolicy() backoff.RetryPolicy {
return policy
}

// CreateTaskProcessingRetryPolicy creates a retry policy for task processing
func CreateTaskProcessingRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(retryTaskProcessingInitialInterval)
policy.SetMaximumInterval(retryTaskProcessingMaxInterval)
policy.SetMaximumAttempts(retryTaskProcessingMaxAttempts)

return policy
}

// IsPersistenceTransientError checks if the error is a transient persistence error
func IsPersistenceTransientError(err error) bool {
switch err.(type) {
Expand Down
4 changes: 2 additions & 2 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,14 @@ func (h *Handler) Start() {
processorOptions.FifoSchedulerOptions = &t.FIFOTaskSchedulerOptions{
QueueSize: h.config.TaskSchedulerQueueSize(),
WorkerCount: h.config.TaskSchedulerWorkerCount(),
RetryPolicy: common.CreatePersistanceRetryPolicy(),
RetryPolicy: common.CreateTaskProcessingRetryPolicy(),
}
case t.SchedulerTypeWRR:
processorOptions.WRRSchedulerOptions = &t.WeightedRoundRobinTaskSchedulerOptions{
Weights: h.config.TaskSchedulerRoundRobinWeights,
QueueSize: h.config.TaskSchedulerQueueSize(),
WorkerCount: h.config.TaskSchedulerWorkerCount(),
RetryPolicy: common.CreatePersistanceRetryPolicy(),
RetryPolicy: common.CreateTaskProcessingRetryPolicy(),
}
default:
h.GetLogger().Fatal("Unknown task scheduler type", tag.Value(schedulerType))
Expand Down
2 changes: 1 addition & 1 deletion service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func newQueueProcessorBase(
) *queueProcessorBase {

var taskProcessor *taskProcessor
if !options.EnablePriorityTaskProcessor() {
if queueTaskProcessor == nil || !options.EnablePriorityTaskProcessor() {
taskProcessorOptions := taskProcessorOptions{
queueSize: options.BatchSize(),
workerCount: options.WorkerCount(),
Expand Down
14 changes: 9 additions & 5 deletions service/history/task/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,14 @@ type (
)

const (
// QueueTypeTransfer is the queue type for transfer queue
QueueTypeTransfer QueueType = iota + 1
// QueueTypeTimer is the queue type for timer queue
QueueTypeTimer
// QueueTypeReplication is the queue type for replication queue
// QueueTypeActiveTransfer is the queue type for active transfer queue processor
QueueTypeActiveTransfer QueueType = iota + 1
// QueueTypeStandbyTransfer is the queue type for standby transfer queue processor
QueueTypeStandbyTransfer
// QueueTypeActiveTimer is the queue type for active timer queue processor
QueueTypeActiveTimer
// QueueTypeStandbyTimer is the queue type for standby timer queue processor
QueueTypeStandbyTimer
// QueueTypeReplication is the queue type for replication queue processor
QueueTypeReplication
)
24 changes: 19 additions & 5 deletions service/history/task/priority_assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,40 @@ func NewPriorityAssigner(
func (a *priorityAssignerImpl) Assign(
queueTask Task,
) error {
if queueTask.GetQueueType() == QueueTypeReplication {
queueType := queueTask.GetQueueType()

if queueType == QueueTypeReplication {
queueTask.SetPriority(task.GetTaskPriority(task.LowPriorityClass, task.DefaultPrioritySubclass))
return nil
}

// timer of transfer task, first check if domain is active or not
domainName, active, err := a.getDomainInfo(queueTask.GetDomainID())
// timer or transfer task, first check if task is active or not and if domain is active or not
isActiveTask := queueType == QueueTypeActiveTimer || queueType == QueueTypeActiveTransfer
domainName, isActiveDomain, err := a.getDomainInfo(queueTask.GetDomainID())
if err != nil {
return err
}

if !active {
// there are four cases here:
// 1. active task for active domain
// 2. active task for standby domain
// 3. standby task for active domain
// 4. standby task for standby domain

if !isActiveTask && !isActiveDomain {
// only assign low priority to tasks in the fourth case
queueTask.SetPriority(task.GetTaskPriority(task.LowPriorityClass, task.DefaultPrioritySubclass))
return nil
}

// for case 1 we should give the task a high priority
// for case 2 and 3 the task will be a no-op in most cases, also give it a high priority so that
// it can be quickly verified/acked and won't prevent the ack level in the processor from advancing
// (especially for active processor)
if !a.getRateLimiter(domainName).Allow() {
queueTask.SetPriority(task.GetTaskPriority(task.DefaultPriorityClass, task.DefaultPrioritySubclass))
taggedScope := a.scope.Tagged(metrics.DomainTag(domainName))
if queueTask.GetQueueType() == QueueTypeTransfer {
if queueType == QueueTypeActiveTransfer || queueType == QueueTypeStandbyTransfer {
taggedScope.IncCounter(metrics.TransferTaskThrottledCounter)
} else {
taggedScope.IncCounter(metrics.TimerTaskThrottledCounter)
Expand Down
42 changes: 35 additions & 7 deletions service/history/task/priority_assigner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,39 +146,67 @@ func (s *taskPriorityAssignerSuite) TestAssign_ReplicationTask() {
s.NoError(err)
}

func (s *taskPriorityAssignerSuite) TestAssign_StandbyTask() {
func (s *taskPriorityAssignerSuite) TestAssign_StandbyTask_StandbyDomain() {
constants.TestGlobalDomainEntry.GetReplicationConfig().ActiveClusterName = cluster.TestAlternativeClusterName
defer func() {
constants.TestGlobalDomainEntry.GetReplicationConfig().ActiveClusterName = cluster.TestCurrentClusterName
}()
s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil)

mockTask := NewMockTask(s.controller)
mockTask.EXPECT().GetQueueType().Return(QueueTypeTransfer).Times(1)
mockTask.EXPECT().GetQueueType().Return(QueueTypeStandbyTransfer).AnyTimes()
mockTask.EXPECT().GetDomainID().Return(constants.TestDomainID).Times(1)
mockTask.EXPECT().SetPriority(task.GetTaskPriority(task.LowPriorityClass, task.DefaultPrioritySubclass)).Times(1)

err := s.priorityAssigner.Assign(mockTask)
s.NoError(err)
}

func (s *taskPriorityAssignerSuite) TestAssign_TransferTask() {
func (s *taskPriorityAssignerSuite) TestAssign_StandbyTask_ActiveDomain() {
s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil)

mockTask := NewMockTask(s.controller)
mockTask.EXPECT().GetQueueType().Return(QueueTypeTransfer).AnyTimes()
mockTask.EXPECT().GetQueueType().Return(QueueTypeStandbyTransfer).AnyTimes()
mockTask.EXPECT().GetDomainID().Return(constants.TestDomainID).Times(1)
mockTask.EXPECT().SetPriority(task.GetTaskPriority(task.HighPriorityClass, task.DefaultPrioritySubclass)).Times(1)

err := s.priorityAssigner.Assign(mockTask)
s.NoError(err)
}

func (s *taskPriorityAssignerSuite) TestAssign_TimerTask() {
func (s *taskPriorityAssignerSuite) TestAssign_ActiveTask_StandbyDomain() {
constants.TestGlobalDomainEntry.GetReplicationConfig().ActiveClusterName = cluster.TestAlternativeClusterName
defer func() {
constants.TestGlobalDomainEntry.GetReplicationConfig().ActiveClusterName = cluster.TestCurrentClusterName
}()
s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil)

mockTask := NewMockTask(s.controller)
mockTask.EXPECT().GetQueueType().Return(QueueTypeActiveTimer).AnyTimes()
mockTask.EXPECT().GetDomainID().Return(constants.TestDomainID).Times(1)
mockTask.EXPECT().SetPriority(task.GetTaskPriority(task.HighPriorityClass, task.DefaultPrioritySubclass)).Times(1)

err := s.priorityAssigner.Assign(mockTask)
s.NoError(err)
}

func (s *taskPriorityAssignerSuite) TestAssign_ActiveTransferTask_ActiveDomain() {
s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil)

mockTask := NewMockTask(s.controller)
mockTask.EXPECT().GetQueueType().Return(QueueTypeActiveTransfer).AnyTimes()
mockTask.EXPECT().GetDomainID().Return(constants.TestDomainID).Times(1)
mockTask.EXPECT().SetPriority(task.GetTaskPriority(task.HighPriorityClass, task.DefaultPrioritySubclass)).Times(1)

err := s.priorityAssigner.Assign(mockTask)
s.NoError(err)
}

func (s *taskPriorityAssignerSuite) TestAssign_ActiveTimerTask_ActiveDomain() {
s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil)

mockTask := NewMockTask(s.controller)
mockTask.EXPECT().GetQueueType().Return(QueueTypeTimer).AnyTimes()
mockTask.EXPECT().GetQueueType().Return(QueueTypeActiveTimer).AnyTimes()
mockTask.EXPECT().GetDomainID().Return(constants.TestDomainID).Times(1)
mockTask.EXPECT().SetPriority(task.GetTaskPriority(task.HighPriorityClass, task.DefaultPrioritySubclass)).Times(1)

Expand All @@ -191,7 +219,7 @@ func (s *taskPriorityAssignerSuite) TestAssign_ThrottledTask() {

for i := 0; i != s.testTaskProcessRPS*2; i++ {
mockTask := NewMockTask(s.controller)
mockTask.EXPECT().GetQueueType().Return(QueueTypeTimer).AnyTimes()
mockTask.EXPECT().GetQueueType().Return(QueueTypeActiveTimer).AnyTimes()
mockTask.EXPECT().GetDomainID().Return(constants.TestDomainID).Times(1)
if i < s.testTaskProcessRPS {
mockTask.EXPECT().SetPriority(task.GetTaskPriority(task.HighPriorityClass, task.DefaultPrioritySubclass)).Times(1)
Expand Down
21 changes: 12 additions & 9 deletions service/history/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ type (
taskExecutor Executor
maxRetryCount dynamicconfig.IntPropertyFn

// TODO: following two fields should be removed after new task lifecycle is implemented
// TODO: following three fields should be removed after new task lifecycle is implemented
taskFilter Filter
queueType QueueType
shouldProcessTask bool
}

Expand All @@ -102,6 +103,7 @@ type (
func NewTimerTask(
shard shard.Context,
taskInfo Info,
queueType QueueType,
scope metrics.Scope,
logger log.Logger,
taskFilter Filter,
Expand All @@ -115,6 +117,7 @@ func NewTimerTask(
taskBase: newQueueTaskBase(
shard,
taskInfo,
queueType,
scope,
logger,
taskFilter,
Expand All @@ -131,6 +134,7 @@ func NewTimerTask(
func NewTransferTask(
shard shard.Context,
taskInfo Info,
queueType QueueType,
scope metrics.Scope,
logger log.Logger,
taskFilter Filter,
Expand All @@ -144,6 +148,7 @@ func NewTransferTask(
taskBase: newQueueTaskBase(
shard,
taskInfo,
queueType,
scope,
logger,
taskFilter,
Expand All @@ -159,6 +164,7 @@ func NewTransferTask(
func newQueueTaskBase(
shard shard.Context,
queueTaskInfo Info,
queueType QueueType,
scope metrics.Scope,
logger log.Logger,
taskFilter Filter,
Expand All @@ -170,6 +176,7 @@ func newQueueTaskBase(
Info: queueTaskInfo,
shard: shard,
state: ctask.TaskStatePending,
queueType: queueType,
scope: scope,
logger: logger,
attempt: 0,
Expand Down Expand Up @@ -199,10 +206,6 @@ func (t *timerTask) Nack() {
t.redispatchQueue.Add(t)
}

func (t *timerTask) GetQueueType() QueueType {
return QueueTypeTimer
}

func (t *transferTask) Ack() {
t.taskBase.Ack()

Expand All @@ -217,10 +220,6 @@ func (t *transferTask) Nack() {
t.redispatchQueue.Add(t)
}

func (t *transferTask) GetQueueType() QueueType {
return QueueTypeTransfer
}

func (t *taskBase) Execute() error {
// TODO: after mergering active and standby queue,
// the task should be smart enough to tell if it should be
Expand Down Expand Up @@ -361,3 +360,7 @@ func (t *taskBase) GetAttempt() int {

return t.attempt
}

func (t *taskBase) GetQueueType() QueueType {
return t.queueType
}
1 change: 1 addition & 0 deletions service/history/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func (s *queueTaskSuite) newTestQueueTaskBase(
return newQueueTaskBase(
s.mockShard,
s.mockQueueTaskInfo,
QueueTypeActiveTransfer,
s.scope,
s.logger,
taskFilter,
Expand Down
2 changes: 2 additions & 0 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func newTimerQueueActiveProcessor(
return task.NewTimerTask(
shard,
taskInfo,
task.QueueTypeActiveTimer,
historyService.metricsClient.Scope(
task.GetTimerTaskMetricScope(taskInfo.GetTaskType(), true),
),
Expand Down Expand Up @@ -225,6 +226,7 @@ func newTimerQueueFailoverProcessor(
return task.NewTimerTask(
shard,
taskInfo,
task.QueueTypeActiveTimer,
historyService.metricsClient.Scope(
task.GetTimerTaskMetricScope(taskInfo.GetTaskType(), true),
),
Expand Down
2 changes: 1 addition & 1 deletion service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func newTimerQueueProcessorBase(
config := shard.GetConfig()

var taskProcessor *taskProcessor
if !config.TimerProcessorEnablePriorityTaskProcessor() {
if queueTaskProcessor == nil || !config.TimerProcessorEnablePriorityTaskProcessor() {
options := taskProcessorOptions{
workerCount: config.TimerTaskWorkerCount(),
queueSize: config.TimerTaskWorkerCount() * config.TimerTaskBatchSize(),
Expand Down
1 change: 1 addition & 0 deletions service/history/timerQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func newTimerQueueStandbyProcessor(
return task.NewTimerTask(
shard,
taskInfo,
task.QueueTypeStandbyTimer,
historyService.metricsClient.Scope(
task.GetTimerTaskMetricScope(taskInfo.GetTaskType(), false),
),
Expand Down
2 changes: 2 additions & 0 deletions service/history/transferQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func newTransferQueueActiveProcessor(
return task.NewTransferTask(
shard,
taskInfo,
task.QueueTypeActiveTransfer,
historyService.metricsClient.Scope(
task.GetTransferTaskMetricsScope(taskInfo.GetTaskType(), true),
),
Expand Down Expand Up @@ -278,6 +279,7 @@ func newTransferQueueFailoverProcessor(
return task.NewTransferTask(
shard,
taskInfo,
task.QueueTypeActiveTransfer,
historyService.metricsClient.Scope(
task.GetTransferTaskMetricsScope(taskInfo.GetTaskType(), true),
),
Expand Down
1 change: 1 addition & 0 deletions service/history/transferQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func newTransferQueueStandbyProcessor(
return task.NewTransferTask(
shard,
taskInfo,
task.QueueTypeStandbyTransfer,
historyService.metricsClient.Scope(
task.GetTransferTaskMetricsScope(taskInfo.GetTaskType(), false),
),
Expand Down

0 comments on commit 34bac76

Please sign in to comment.