Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire up priority task processor implementation #3146

Merged
merged 4 commits into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ var keys = map[Key]string{
EnableBatcher: "worker.enableBatcher",
EnableParentClosePolicyWorker: "system.enableParentClosePolicyWorker",
EnableStickyQuery: "system.enableStickyQuery",
EnablePriorityTaskProcessor: "system.enablePriorityTaskProcessor",

// size limit
BlobSizeLimitError: "limit.blobSize.error",
Expand Down Expand Up @@ -145,6 +146,10 @@ var keys = map[Key]string{
StandbyTaskMissingEventsResendDelay: "history.standbyTaskMissingEventsResendDelay",
StandbyTaskMissingEventsDiscardDelay: "history.standbyTaskMissingEventsDiscardDelay",
TaskProcessRPS: "history.taskProcessRPS",
TaskSchedulerType: "history.taskSchedulerType",
TaskSchedulerWorkerCount: "history.taskSchedulerWorkerCount",
TaskSchedulerQueueSize: "history.taskSchedulerQueueSize",
TaskSchedulerRoundRobinWeights: "history.taskSchedulerRoundRobinWeight",
TimerTaskBatchSize: "history.timerTaskBatchSize",
TimerTaskWorkerCount: "history.timerTaskWorkerCount",
TimerTaskMaxRetryCount: "history.timerTaskMaxRetryCount",
Expand Down Expand Up @@ -311,6 +316,8 @@ const (
MaxDecisionStartToCloseSeconds
// DisallowQuery is the key to disallow query for a domain
DisallowQuery
// EnablePriorityTaskProcessor is the key for enabling priority task processor
EnablePriorityTaskProcessor

// BlobSizeLimitError is the per event blob size limit
BlobSizeLimitError
Expand Down Expand Up @@ -458,6 +465,14 @@ const (
StandbyTaskMissingEventsDiscardDelay
// TaskProcessRPS is the task processing rate per second for each domain
TaskProcessRPS
// TaskSchedulerType is the task scheduler type for priority task processor
TaskSchedulerType
// TaskSchedulerWorkerCount is the number of workers per shard in task scheduler
TaskSchedulerWorkerCount
// TaskSchedulerQueueSize is the size of task channel size in task scheduler
TaskSchedulerQueueSize
// TaskSchedulerRoundRobinWeights is the priority weight for weighted round robin task scheduler
TaskSchedulerRoundRobinWeights
// TimerTaskBatchSize is batch size for timer processor to process tasks
TimerTaskBatchSize
// TimerTaskWorkerCount is number of task workers for timer processor
Expand Down
6 changes: 3 additions & 3 deletions common/task/fifoTaskScheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,18 @@ type (
// This scheduler is only for development purpose.
func NewFIFOTaskScheduler(
logger log.Logger,
metricsScope metrics.Scope,
metricsClient metrics.Client,
options *FIFOTaskSchedulerOptions,
) Scheduler {
return &fifoTaskSchedulerImpl{
status: common.DaemonStatusInitialized,
logger: logger,
metricsScope: metricsScope,
metricsScope: metricsClient.Scope(metrics.TaskSchedulerScope),
taskCh: make(chan PriorityTask, options.QueueSize),
shutdownCh: make(chan struct{}),
processor: NewParallelTaskProcessor(
logger,
metricsScope,
metricsClient,
&ParallelTaskProcessorOptions{
QueueSize: options.QueueSize,
WorkerCount: options.WorkerCount,
Expand Down
2 changes: 1 addition & 1 deletion common/task/fifoTaskScheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *fifoTaskSchedulerSuite) SetupTest() {
s.queueSize = 2
s.scheduler = NewFIFOTaskScheduler(
loggerimpl.NewDevelopmentForTest(s.Suite),
metrics.NewClient(tally.NoopScope, metrics.Common).Scope(metrics.TaskSchedulerScope),
metrics.NewClient(tally.NoopScope, metrics.Common),
&FIFOTaskSchedulerOptions{
QueueSize: s.queueSize,
WorkerCount: 1,
Expand Down
4 changes: 2 additions & 2 deletions common/task/parallelTaskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ var (
// NewParallelTaskProcessor creates a new PriorityTaskProcessor
func NewParallelTaskProcessor(
logger log.Logger,
metricsScope metrics.Scope,
metricsClient metrics.Client,
options *ParallelTaskProcessorOptions,
) Processor {
return &parallelTaskProcessorImpl{
status: common.DaemonStatusInitialized,
tasksCh: make(chan Task, options.QueueSize),
shutdownCh: make(chan struct{}),
logger: logger,
metricsScope: metricsScope,
metricsScope: metricsClient.Scope(metrics.ParallelTaskProcessingScope),
options: options,
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/task/parallelTaskProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *parallelTaskProcessorSuite) SetupTest() {

s.processor = NewParallelTaskProcessor(
loggerimpl.NewDevelopmentForTest(s.Suite),
metrics.NewClient(tally.NoopScope, metrics.Common).Scope(metrics.ParallelTaskProcessingScope),
metrics.NewClient(tally.NoopScope, metrics.Common),
&ParallelTaskProcessorOptions{
QueueSize: 0,
WorkerCount: 1,
Expand Down
39 changes: 20 additions & 19 deletions common/task/sequentialTaskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ import (
"sync/atomic"
"time"

"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/collection"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
)

type (
Expand All @@ -44,15 +43,19 @@ type (
taskQueueFactory SequentialTaskQueueFactory
taskqueueChan chan SequentialTaskQueue

metricsScope int
metricsClient metrics.Client
logger log.Logger
metricsScope metrics.Scope
logger log.Logger
}
)

// NewSequentialTaskProcessor create a new sequential tasks processor
func NewSequentialTaskProcessor(coroutineSize int, taskQueueHashFn collection.HashFunc, taskQueueFactory SequentialTaskQueueFactory,
metricsClient metrics.Client, logger log.Logger) Processor {
func NewSequentialTaskProcessor(
coroutineSize int,
taskQueueHashFn collection.HashFunc,
taskQueueFactory SequentialTaskQueueFactory,
metricsClient metrics.Client,
logger log.Logger,
) Processor {

return &sequentialTaskProcessorImpl{
status: common.DaemonStatusInitialized,
Expand All @@ -61,10 +64,8 @@ func NewSequentialTaskProcessor(coroutineSize int, taskQueueHashFn collection.Ha
taskqueues: collection.NewShardedConcurrentTxMap(1024, taskQueueHashFn),
taskQueueFactory: taskQueueFactory,
taskqueueChan: make(chan SequentialTaskQueue, coroutineSize),

metricsScope: metrics.SequentialTaskProcessingScope,
metricsClient: metricsClient,
logger: logger,
metricsScope: metricsClient.Scope(metrics.SequentialTaskProcessingScope),
logger: logger,
}
}

Expand Down Expand Up @@ -94,8 +95,8 @@ func (t *sequentialTaskProcessorImpl) Stop() {

func (t *sequentialTaskProcessorImpl) Submit(task Task) error {

t.metricsClient.IncCounter(t.metricsScope, metrics.SequentialTaskSubmitRequest)
metricsTimer := t.metricsClient.StartTimer(t.metricsScope, metrics.SequentialTaskSubmitLatency)
t.metricsScope.IncCounter(metrics.SequentialTaskSubmitRequest)
metricsTimer := t.metricsScope.StartTimer(metrics.SequentialTaskSubmitLatency)
defer metricsTimer.Stop()

taskqueue := t.taskQueueFactory(task)
Expand All @@ -116,12 +117,12 @@ func (t *sequentialTaskProcessorImpl) Submit(task Task) error {
// if function evaluated, meaning that the task set is
// already dispatched
if fnEvaluated {
t.metricsClient.IncCounter(t.metricsScope, metrics.SequentialTaskSubmitRequestTaskQueueExist)
t.metricsScope.IncCounter(metrics.SequentialTaskSubmitRequestTaskQueueExist)
return nil
}

// need to dispatch this task set
t.metricsClient.IncCounter(t.metricsScope, metrics.SequentialTaskSubmitRequestTaskQueueMissing)
t.metricsScope.IncCounter(metrics.SequentialTaskSubmitRequestTaskQueueMissing)
select {
case <-t.shutdownChan:
case t.taskqueueChan <- taskqueue:
Expand All @@ -138,7 +139,7 @@ func (t *sequentialTaskProcessorImpl) pollAndProcessTaskQueue() {
case <-t.shutdownChan:
return
case taskqueue := <-t.taskqueueChan:
metricsTimer := t.metricsClient.StartTimer(t.metricsScope, metrics.SequentialTaskQueueProcessingLatency)
metricsTimer := t.metricsScope.StartTimer(metrics.SequentialTaskQueueProcessingLatency)
t.processTaskQueue(taskqueue)
metricsTimer.Stop()
}
Expand All @@ -152,7 +153,7 @@ func (t *sequentialTaskProcessorImpl) processTaskQueue(taskqueue SequentialTaskQ
return
default:
queueSize := taskqueue.Len()
t.metricsClient.RecordTimer(t.metricsScope, metrics.SequentialTaskQueueSize, time.Duration(queueSize))
t.metricsScope.RecordTimer(metrics.SequentialTaskQueueSize, time.Duration(queueSize))
if queueSize > 0 {
t.processTaskOnce(taskqueue)
} else {
Expand All @@ -171,7 +172,7 @@ func (t *sequentialTaskProcessorImpl) processTaskQueue(taskqueue SequentialTaskQ
}

func (t *sequentialTaskProcessorImpl) processTaskOnce(taskqueue SequentialTaskQueue) {
metricsTimer := t.metricsClient.StartTimer(t.metricsScope, metrics.SequentialTaskTaskProcessingLatency)
metricsTimer := t.metricsScope.StartTimer(metrics.SequentialTaskTaskProcessingLatency)
defer metricsTimer.Stop()

task := taskqueue.Remove()
Expand Down
74 changes: 64 additions & 10 deletions common/task/weightedRoundRobinTaskScheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ package task
import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -38,7 +40,7 @@ import (
type (
// WeightedRoundRobinTaskSchedulerOptions configs WRR task scheduler
WeightedRoundRobinTaskSchedulerOptions struct {
Weights map[int]dynamicconfig.IntPropertyFn
Weights dynamicconfig.MapPropertyFn
QueueSize int
WorkerCount int
RetryPolicy backoff.RetryPolicy
Expand All @@ -48,6 +50,7 @@ type (
sync.RWMutex

status int32
weights atomic.Value // store the currently used weights
taskChs map[int]chan PriorityTask
shutdownCh chan struct{}
notifyCh chan struct{}
Expand All @@ -61,7 +64,8 @@ type (
)

const (
wRRTaskProcessorQueueSize = 1
wRRTaskProcessorQueueSize = 1
defaultUpdateWeightsInterval = 5 * time.Second
)

var (
Expand All @@ -72,31 +76,39 @@ var (
// NewWeightedRoundRobinTaskScheduler creates a new WRR task scheduler
func NewWeightedRoundRobinTaskScheduler(
logger log.Logger,
metricsScope metrics.Scope,
metricsClient metrics.Client,
options *WeightedRoundRobinTaskSchedulerOptions,
) (Scheduler, error) {
if len(options.Weights) == 0 {
weights, err := convertWeightsFromDynamicConfig(options.Weights())
if err != nil {
return nil, err
}

if len(weights) == 0 {
return nil, errors.New("weight is not specified in the scheduler option")
}

return &weightedRoundRobinTaskSchedulerImpl{
scheduler := &weightedRoundRobinTaskSchedulerImpl{
status: common.DaemonStatusInitialized,
taskChs: make(map[int]chan PriorityTask),
shutdownCh: make(chan struct{}),
notifyCh: make(chan struct{}, 1),
logger: logger,
metricsScope: metricsScope,
metricsScope: metricsClient.Scope(metrics.TaskSchedulerScope),
options: options,
processor: NewParallelTaskProcessor(
logger,
metricsScope,
metricsClient,
&ParallelTaskProcessorOptions{
QueueSize: wRRTaskProcessorQueueSize,
WorkerCount: options.WorkerCount,
RetryPolicy: options.RetryPolicy,
},
),
}, nil
}
scheduler.weights.Store(weights)

return scheduler, nil
}

func (w *weightedRoundRobinTaskSchedulerImpl) Start() {
Expand All @@ -108,6 +120,7 @@ func (w *weightedRoundRobinTaskSchedulerImpl) Start() {

w.dispatcherWG.Add(1)
go w.dispatcher()
go w.updateWeights()

w.logger.Info("Weighted round robin task scheduler started.")
}
Expand Down Expand Up @@ -187,8 +200,9 @@ func (w *weightedRoundRobinTaskSchedulerImpl) dispatcher() {

outstandingTasks = false
w.updateTaskChs(taskChs)
weights := w.getWeights()
for priority, taskCh := range taskChs {
for i := 0; i < w.options.Weights[priority](); i++ {
for i := 0; i < weights[priority]; i++ {
select {
case task := <-taskCh:
// dispatched at least one task in this round
Expand All @@ -212,7 +226,7 @@ func (w *weightedRoundRobinTaskSchedulerImpl) dispatcher() {
func (w *weightedRoundRobinTaskSchedulerImpl) getOrCreateTaskChan(
priority int,
) (chan PriorityTask, error) {
if _, ok := w.options.Weights[priority]; !ok {
if _, ok := w.getWeights()[priority]; !ok {
return nil, fmt.Errorf("unknown task priority: %v", priority)
}

Expand Down Expand Up @@ -252,3 +266,43 @@ func (w *weightedRoundRobinTaskSchedulerImpl) notifyDispatcher() {
// do not block if there's already a notification
}
}

func (w *weightedRoundRobinTaskSchedulerImpl) getWeights() map[int]int {
return w.weights.Load().(map[int]int)
}

func (w *weightedRoundRobinTaskSchedulerImpl) updateWeights() {
ticker := time.NewTicker(defaultUpdateWeightsInterval)
for {
select {
case <-ticker.C:
weights, err := convertWeightsFromDynamicConfig(w.options.Weights())
if err != nil {
w.logger.Error("failed to update weight for round robin task scheduler", tag.Error(err))
} else {
w.weights.Store(weights)
}
case <-w.shutdownCh:
ticker.Stop()
return
}
}
}

func convertWeightsFromDynamicConfig(
yycptt marked this conversation as resolved.
Show resolved Hide resolved
weightsFromDC map[string]interface{},
) (map[int]int, error) {
weights := make(map[int]int)
for key, value := range weightsFromDC {
priority, err := strconv.Atoi(strings.TrimSpace(key))
if err != nil {
return nil, err
}
weight, ok := value.(int)
if !ok {
return nil, fmt.Errorf("failed to convert weight %v", value)
}
weights[priority] = weight
}
return weights, nil
}
Loading