Skip to content

Commit

Permalink
Wire up priority task processor implementation (#3146)
Browse files Browse the repository at this point in the history
* Use ShardContext as the key for worker pool
  • Loading branch information
yycptt committed Mar 31, 2020
1 parent a3e60fa commit 97e0c83
Show file tree
Hide file tree
Showing 30 changed files with 756 additions and 349 deletions.
15 changes: 15 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ var keys = map[Key]string{
EnableBatcher: "worker.enableBatcher",
EnableParentClosePolicyWorker: "system.enableParentClosePolicyWorker",
EnableStickyQuery: "system.enableStickyQuery",
EnablePriorityTaskProcessor: "system.enablePriorityTaskProcessor",

// size limit
BlobSizeLimitError: "limit.blobSize.error",
Expand Down Expand Up @@ -145,6 +146,10 @@ var keys = map[Key]string{
StandbyTaskMissingEventsResendDelay: "history.standbyTaskMissingEventsResendDelay",
StandbyTaskMissingEventsDiscardDelay: "history.standbyTaskMissingEventsDiscardDelay",
TaskProcessRPS: "history.taskProcessRPS",
TaskSchedulerType: "history.taskSchedulerType",
TaskSchedulerWorkerCount: "history.taskSchedulerWorkerCount",
TaskSchedulerQueueSize: "history.taskSchedulerQueueSize",
TaskSchedulerRoundRobinWeights: "history.taskSchedulerRoundRobinWeight",
TimerTaskBatchSize: "history.timerTaskBatchSize",
TimerTaskWorkerCount: "history.timerTaskWorkerCount",
TimerTaskMaxRetryCount: "history.timerTaskMaxRetryCount",
Expand Down Expand Up @@ -311,6 +316,8 @@ const (
MaxDecisionStartToCloseSeconds
// DisallowQuery is the key to disallow query for a domain
DisallowQuery
// EnablePriorityTaskProcessor is the key for enabling priority task processor
EnablePriorityTaskProcessor

// BlobSizeLimitError is the per event blob size limit
BlobSizeLimitError
Expand Down Expand Up @@ -458,6 +465,14 @@ const (
StandbyTaskMissingEventsDiscardDelay
// TaskProcessRPS is the task processing rate per second for each domain
TaskProcessRPS
// TaskSchedulerType is the task scheduler type for priority task processor
TaskSchedulerType
// TaskSchedulerWorkerCount is the number of workers per shard in task scheduler
TaskSchedulerWorkerCount
// TaskSchedulerQueueSize is the size of task channel size in task scheduler
TaskSchedulerQueueSize
// TaskSchedulerRoundRobinWeights is the priority weight for weighted round robin task scheduler
TaskSchedulerRoundRobinWeights
// TimerTaskBatchSize is batch size for timer processor to process tasks
TimerTaskBatchSize
// TimerTaskWorkerCount is number of task workers for timer processor
Expand Down
6 changes: 3 additions & 3 deletions common/task/fifoTaskScheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,18 @@ type (
// This scheduler is only for development purpose.
func NewFIFOTaskScheduler(
logger log.Logger,
metricsScope metrics.Scope,
metricsClient metrics.Client,
options *FIFOTaskSchedulerOptions,
) Scheduler {
return &fifoTaskSchedulerImpl{
status: common.DaemonStatusInitialized,
logger: logger,
metricsScope: metricsScope,
metricsScope: metricsClient.Scope(metrics.TaskSchedulerScope),
taskCh: make(chan PriorityTask, options.QueueSize),
shutdownCh: make(chan struct{}),
processor: NewParallelTaskProcessor(
logger,
metricsScope,
metricsClient,
&ParallelTaskProcessorOptions{
QueueSize: options.QueueSize,
WorkerCount: options.WorkerCount,
Expand Down
2 changes: 1 addition & 1 deletion common/task/fifoTaskScheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *fifoTaskSchedulerSuite) SetupTest() {
s.queueSize = 2
s.scheduler = NewFIFOTaskScheduler(
loggerimpl.NewDevelopmentForTest(s.Suite),
metrics.NewClient(tally.NoopScope, metrics.Common).Scope(metrics.TaskSchedulerScope),
metrics.NewClient(tally.NoopScope, metrics.Common),
&FIFOTaskSchedulerOptions{
QueueSize: s.queueSize,
WorkerCount: 1,
Expand Down
4 changes: 2 additions & 2 deletions common/task/parallelTaskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ var (
// NewParallelTaskProcessor creates a new PriorityTaskProcessor
func NewParallelTaskProcessor(
logger log.Logger,
metricsScope metrics.Scope,
metricsClient metrics.Client,
options *ParallelTaskProcessorOptions,
) Processor {
return &parallelTaskProcessorImpl{
status: common.DaemonStatusInitialized,
tasksCh: make(chan Task, options.QueueSize),
shutdownCh: make(chan struct{}),
logger: logger,
metricsScope: metricsScope,
metricsScope: metricsClient.Scope(metrics.ParallelTaskProcessingScope),
options: options,
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/task/parallelTaskProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *parallelTaskProcessorSuite) SetupTest() {

s.processor = NewParallelTaskProcessor(
loggerimpl.NewDevelopmentForTest(s.Suite),
metrics.NewClient(tally.NoopScope, metrics.Common).Scope(metrics.ParallelTaskProcessingScope),
metrics.NewClient(tally.NoopScope, metrics.Common),
&ParallelTaskProcessorOptions{
QueueSize: 0,
WorkerCount: 1,
Expand Down
39 changes: 20 additions & 19 deletions common/task/sequentialTaskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ import (
"sync/atomic"
"time"

"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/collection"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
)

type (
Expand All @@ -44,15 +43,19 @@ type (
taskQueueFactory SequentialTaskQueueFactory
taskqueueChan chan SequentialTaskQueue

metricsScope int
metricsClient metrics.Client
logger log.Logger
metricsScope metrics.Scope
logger log.Logger
}
)

// NewSequentialTaskProcessor create a new sequential tasks processor
func NewSequentialTaskProcessor(coroutineSize int, taskQueueHashFn collection.HashFunc, taskQueueFactory SequentialTaskQueueFactory,
metricsClient metrics.Client, logger log.Logger) Processor {
func NewSequentialTaskProcessor(
coroutineSize int,
taskQueueHashFn collection.HashFunc,
taskQueueFactory SequentialTaskQueueFactory,
metricsClient metrics.Client,
logger log.Logger,
) Processor {

return &sequentialTaskProcessorImpl{
status: common.DaemonStatusInitialized,
Expand All @@ -61,10 +64,8 @@ func NewSequentialTaskProcessor(coroutineSize int, taskQueueHashFn collection.Ha
taskqueues: collection.NewShardedConcurrentTxMap(1024, taskQueueHashFn),
taskQueueFactory: taskQueueFactory,
taskqueueChan: make(chan SequentialTaskQueue, coroutineSize),

metricsScope: metrics.SequentialTaskProcessingScope,
metricsClient: metricsClient,
logger: logger,
metricsScope: metricsClient.Scope(metrics.SequentialTaskProcessingScope),
logger: logger,
}
}

Expand Down Expand Up @@ -94,8 +95,8 @@ func (t *sequentialTaskProcessorImpl) Stop() {

func (t *sequentialTaskProcessorImpl) Submit(task Task) error {

t.metricsClient.IncCounter(t.metricsScope, metrics.SequentialTaskSubmitRequest)
metricsTimer := t.metricsClient.StartTimer(t.metricsScope, metrics.SequentialTaskSubmitLatency)
t.metricsScope.IncCounter(metrics.SequentialTaskSubmitRequest)
metricsTimer := t.metricsScope.StartTimer(metrics.SequentialTaskSubmitLatency)
defer metricsTimer.Stop()

taskqueue := t.taskQueueFactory(task)
Expand All @@ -116,12 +117,12 @@ func (t *sequentialTaskProcessorImpl) Submit(task Task) error {
// if function evaluated, meaning that the task set is
// already dispatched
if fnEvaluated {
t.metricsClient.IncCounter(t.metricsScope, metrics.SequentialTaskSubmitRequestTaskQueueExist)
t.metricsScope.IncCounter(metrics.SequentialTaskSubmitRequestTaskQueueExist)
return nil
}

// need to dispatch this task set
t.metricsClient.IncCounter(t.metricsScope, metrics.SequentialTaskSubmitRequestTaskQueueMissing)
t.metricsScope.IncCounter(metrics.SequentialTaskSubmitRequestTaskQueueMissing)
select {
case <-t.shutdownChan:
case t.taskqueueChan <- taskqueue:
Expand All @@ -138,7 +139,7 @@ func (t *sequentialTaskProcessorImpl) pollAndProcessTaskQueue() {
case <-t.shutdownChan:
return
case taskqueue := <-t.taskqueueChan:
metricsTimer := t.metricsClient.StartTimer(t.metricsScope, metrics.SequentialTaskQueueProcessingLatency)
metricsTimer := t.metricsScope.StartTimer(metrics.SequentialTaskQueueProcessingLatency)
t.processTaskQueue(taskqueue)
metricsTimer.Stop()
}
Expand All @@ -152,7 +153,7 @@ func (t *sequentialTaskProcessorImpl) processTaskQueue(taskqueue SequentialTaskQ
return
default:
queueSize := taskqueue.Len()
t.metricsClient.RecordTimer(t.metricsScope, metrics.SequentialTaskQueueSize, time.Duration(queueSize))
t.metricsScope.RecordTimer(metrics.SequentialTaskQueueSize, time.Duration(queueSize))
if queueSize > 0 {
t.processTaskOnce(taskqueue)
} else {
Expand All @@ -171,7 +172,7 @@ func (t *sequentialTaskProcessorImpl) processTaskQueue(taskqueue SequentialTaskQ
}

func (t *sequentialTaskProcessorImpl) processTaskOnce(taskqueue SequentialTaskQueue) {
metricsTimer := t.metricsClient.StartTimer(t.metricsScope, metrics.SequentialTaskTaskProcessingLatency)
metricsTimer := t.metricsScope.StartTimer(metrics.SequentialTaskTaskProcessingLatency)
defer metricsTimer.Stop()

task := taskqueue.Remove()
Expand Down
74 changes: 64 additions & 10 deletions common/task/weightedRoundRobinTaskScheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ package task
import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -38,7 +40,7 @@ import (
type (
// WeightedRoundRobinTaskSchedulerOptions configs WRR task scheduler
WeightedRoundRobinTaskSchedulerOptions struct {
Weights map[int]dynamicconfig.IntPropertyFn
Weights dynamicconfig.MapPropertyFn
QueueSize int
WorkerCount int
RetryPolicy backoff.RetryPolicy
Expand All @@ -48,6 +50,7 @@ type (
sync.RWMutex

status int32
weights atomic.Value // store the currently used weights
taskChs map[int]chan PriorityTask
shutdownCh chan struct{}
notifyCh chan struct{}
Expand All @@ -61,7 +64,8 @@ type (
)

const (
wRRTaskProcessorQueueSize = 1
wRRTaskProcessorQueueSize = 1
defaultUpdateWeightsInterval = 5 * time.Second
)

var (
Expand All @@ -72,31 +76,39 @@ var (
// NewWeightedRoundRobinTaskScheduler creates a new WRR task scheduler
func NewWeightedRoundRobinTaskScheduler(
logger log.Logger,
metricsScope metrics.Scope,
metricsClient metrics.Client,
options *WeightedRoundRobinTaskSchedulerOptions,
) (Scheduler, error) {
if len(options.Weights) == 0 {
weights, err := convertWeightsFromDynamicConfig(options.Weights())
if err != nil {
return nil, err
}

if len(weights) == 0 {
return nil, errors.New("weight is not specified in the scheduler option")
}

return &weightedRoundRobinTaskSchedulerImpl{
scheduler := &weightedRoundRobinTaskSchedulerImpl{
status: common.DaemonStatusInitialized,
taskChs: make(map[int]chan PriorityTask),
shutdownCh: make(chan struct{}),
notifyCh: make(chan struct{}, 1),
logger: logger,
metricsScope: metricsScope,
metricsScope: metricsClient.Scope(metrics.TaskSchedulerScope),
options: options,
processor: NewParallelTaskProcessor(
logger,
metricsScope,
metricsClient,
&ParallelTaskProcessorOptions{
QueueSize: wRRTaskProcessorQueueSize,
WorkerCount: options.WorkerCount,
RetryPolicy: options.RetryPolicy,
},
),
}, nil
}
scheduler.weights.Store(weights)

return scheduler, nil
}

func (w *weightedRoundRobinTaskSchedulerImpl) Start() {
Expand All @@ -108,6 +120,7 @@ func (w *weightedRoundRobinTaskSchedulerImpl) Start() {

w.dispatcherWG.Add(1)
go w.dispatcher()
go w.updateWeights()

w.logger.Info("Weighted round robin task scheduler started.")
}
Expand Down Expand Up @@ -187,8 +200,9 @@ func (w *weightedRoundRobinTaskSchedulerImpl) dispatcher() {

outstandingTasks = false
w.updateTaskChs(taskChs)
weights := w.getWeights()
for priority, taskCh := range taskChs {
for i := 0; i < w.options.Weights[priority](); i++ {
for i := 0; i < weights[priority]; i++ {
select {
case task := <-taskCh:
// dispatched at least one task in this round
Expand All @@ -212,7 +226,7 @@ func (w *weightedRoundRobinTaskSchedulerImpl) dispatcher() {
func (w *weightedRoundRobinTaskSchedulerImpl) getOrCreateTaskChan(
priority int,
) (chan PriorityTask, error) {
if _, ok := w.options.Weights[priority]; !ok {
if _, ok := w.getWeights()[priority]; !ok {
return nil, fmt.Errorf("unknown task priority: %v", priority)
}

Expand Down Expand Up @@ -252,3 +266,43 @@ func (w *weightedRoundRobinTaskSchedulerImpl) notifyDispatcher() {
// do not block if there's already a notification
}
}

func (w *weightedRoundRobinTaskSchedulerImpl) getWeights() map[int]int {
return w.weights.Load().(map[int]int)
}

func (w *weightedRoundRobinTaskSchedulerImpl) updateWeights() {
ticker := time.NewTicker(defaultUpdateWeightsInterval)
for {
select {
case <-ticker.C:
weights, err := convertWeightsFromDynamicConfig(w.options.Weights())
if err != nil {
w.logger.Error("failed to update weight for round robin task scheduler", tag.Error(err))
} else {
w.weights.Store(weights)
}
case <-w.shutdownCh:
ticker.Stop()
return
}
}
}

func convertWeightsFromDynamicConfig(
weightsFromDC map[string]interface{},
) (map[int]int, error) {
weights := make(map[int]int)
for key, value := range weightsFromDC {
priority, err := strconv.Atoi(strings.TrimSpace(key))
if err != nil {
return nil, err
}
weight, ok := value.(int)
if !ok {
return nil, fmt.Errorf("failed to convert weight %v", value)
}
weights[priority] = weight
}
return weights, nil
}
Loading

0 comments on commit 97e0c83

Please sign in to comment.