Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1586,13 +1586,25 @@ NOTE: The outbound queue has a separate configuration: outboundQueuePendingTaskM
)
QueueMaxPredicateSize = NewGlobalIntSetting(
"history.queueMaxPredicateSize",
0,
10*1024,
`The max size of the multi-cursor predicate structure stored in the shard info record. 0 is considered
unlimited. When the predicate size is surpassed for a given scope, the predicate is converted to a universal predicate,
which causes all tasks in the scope's range to eventually be reprocessed without applying any filtering logic.
NOTE: The outbound queue has a separate configuration: outboundQueueMaxPredicateSize.
`,
)
QueueMoveGroupTaskCountBase = NewGlobalIntSetting(
"history.queueMoveGroupTaskCountBase",
500,
`The base number of pending tasks count for a task group to be moved to the next level reader.
The actual count is calculated as base * (multiplier ^ level)`,
)
QueueMoveGroupTaskCountMultiplier = NewGlobalFloatSetting(
"history.queueMoveGroupTaskCountMultiplier",
3.0,
`The multiplier used to calculate the number of pending tasks for a task group to be moved to the next level reader.
The actual count is calculated as base * (multiplier ^ level)`,
)

TaskSchedulerEnableRateLimiter = NewGlobalBoolSetting(
"history.taskSchedulerEnableRateLimiter",
Expand Down
2 changes: 2 additions & 0 deletions service/history/archival_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ func (f *archivalQueueFactory) newScheduledQueue(shard historyi.ShardContext, ex
CheckpointInterval: f.Config.ArchivalProcessorUpdateAckInterval,
CheckpointIntervalJitterCoefficient: f.Config.ArchivalProcessorUpdateAckIntervalJitterCoefficient,
MaxReaderCount: f.Config.ArchivalQueueMaxReaderCount,
MoveGroupTaskCountBase: f.Config.QueueMoveGroupTaskCountBase,
MoveGroupTaskCountMultiplier: f.Config.QueueMoveGroupTaskCountMultiplier,
},
f.HostReaderRateLimiter,
logger,
Expand Down
24 changes: 14 additions & 10 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,13 @@ type Config struct {
StandbyTaskMissingEventsResendDelay dynamicconfig.DurationPropertyFnWithTaskTypeFilter
StandbyTaskMissingEventsDiscardDelay dynamicconfig.DurationPropertyFnWithTaskTypeFilter

QueuePendingTaskCriticalCount dynamicconfig.IntPropertyFn
QueueReaderStuckCriticalAttempts dynamicconfig.IntPropertyFn
QueueCriticalSlicesCount dynamicconfig.IntPropertyFn
QueuePendingTaskMaxCount dynamicconfig.IntPropertyFn
QueueMaxPredicateSize dynamicconfig.IntPropertyFn
QueuePendingTaskCriticalCount dynamicconfig.IntPropertyFn
QueueReaderStuckCriticalAttempts dynamicconfig.IntPropertyFn
QueueCriticalSlicesCount dynamicconfig.IntPropertyFn
QueuePendingTaskMaxCount dynamicconfig.IntPropertyFn
QueueMaxPredicateSize dynamicconfig.IntPropertyFn
QueueMoveGroupTaskCountBase dynamicconfig.IntPropertyFn
QueueMoveGroupTaskCountMultiplier dynamicconfig.FloatPropertyFn

TaskDLQEnabled dynamicconfig.BoolPropertyFn
TaskDLQUnexpectedErrorAttempts dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -451,11 +453,13 @@ func NewConfig(
StandbyTaskMissingEventsResendDelay: dynamicconfig.StandbyTaskMissingEventsResendDelay.Get(dc),
StandbyTaskMissingEventsDiscardDelay: dynamicconfig.StandbyTaskMissingEventsDiscardDelay.Get(dc),

QueuePendingTaskCriticalCount: dynamicconfig.QueuePendingTaskCriticalCount.Get(dc),
QueueReaderStuckCriticalAttempts: dynamicconfig.QueueReaderStuckCriticalAttempts.Get(dc),
QueueCriticalSlicesCount: dynamicconfig.QueueCriticalSlicesCount.Get(dc),
QueuePendingTaskMaxCount: dynamicconfig.QueuePendingTaskMaxCount.Get(dc),
QueueMaxPredicateSize: dynamicconfig.QueueMaxPredicateSize.Get(dc),
QueuePendingTaskCriticalCount: dynamicconfig.QueuePendingTaskCriticalCount.Get(dc),
QueueReaderStuckCriticalAttempts: dynamicconfig.QueueReaderStuckCriticalAttempts.Get(dc),
QueueCriticalSlicesCount: dynamicconfig.QueueCriticalSlicesCount.Get(dc),
QueuePendingTaskMaxCount: dynamicconfig.QueuePendingTaskMaxCount.Get(dc),
QueueMaxPredicateSize: dynamicconfig.QueueMaxPredicateSize.Get(dc),
QueueMoveGroupTaskCountBase: dynamicconfig.QueueMoveGroupTaskCountBase.Get(dc),
QueueMoveGroupTaskCountMultiplier: dynamicconfig.QueueMoveGroupTaskCountMultiplier.Get(dc),

TaskDLQEnabled: dynamicconfig.HistoryTaskDLQEnabled.Get(dc),
TaskDLQUnexpectedErrorAttempts: dynamicconfig.HistoryTaskDLQUnexpectedErrorAttempts.Get(dc),
Expand Down
2 changes: 2 additions & 0 deletions service/history/outbound_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ func (f *outboundQueueFactory) CreateQueue(
CheckpointInterval: f.Config.OutboundProcessorUpdateAckInterval,
CheckpointIntervalJitterCoefficient: f.Config.OutboundProcessorUpdateAckIntervalJitterCoefficient,
MaxReaderCount: f.Config.OutboundQueueMaxReaderCount,
MoveGroupTaskCountBase: f.Config.QueueMoveGroupTaskCountBase,
MoveGroupTaskCountMultiplier: f.Config.QueueMoveGroupTaskCountMultiplier,
},
f.hostReaderRateLimiter,
queues.GrouperStateMachineNamespaceIDAndDestination{},
Expand Down
2 changes: 1 addition & 1 deletion service/history/queues/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ type (
// It is created and run by Mitigator upon receiving an Alert.
Action interface {
Name() string
Run(*ReaderGroup)
Run(*ReaderGroup) (actionTaken bool)
}
)
106 changes: 106 additions & 0 deletions service/history/queues/action_move_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package queues

import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
)

type actionMoveGroup struct {
maxReaderCount int
grouper Grouper
moveGroupTaskCountBase int
moveGroupTaskCountMultiplier float64
logger log.Logger
}

func newMoveGroupAction(
maxReaderCount int,
grouper Grouper,
moveGroupTaskCountBase int,
moveGroupTaskCountMultiplier float64,
logger log.Logger,
) *actionMoveGroup {
return &actionMoveGroup{
maxReaderCount: maxReaderCount,
grouper: grouper,
moveGroupTaskCountBase: moveGroupTaskCountBase,
moveGroupTaskCountMultiplier: moveGroupTaskCountMultiplier,
logger: logger,
}
}

func (a *actionMoveGroup) Name() string {
return "move-group"
}

func (a *actionMoveGroup) Run(readerGroup *ReaderGroup) bool {

// Move task groups from reader x to x+1 if the # of pending tasks for a group is higher than
// a threshold. The threshold is calculated as:
// moveGroupTaskCountBase * (moveGroupTaskCountMultiplier ^ x)
//
// If after moving a group to reader x+1, the # of pending tasks for that group becomes higher than
// the threshold for reader x+1, it will be moved to reader x+2 in the next iteration.

// TODO: instead of moving task groups down by just one reader, directly move it to the reader level
// based on the total number of pending tasks across all readers.

moved := false
moveGroupMinTaskCount := a.moveGroupTaskCountBase
for readerID := DefaultReaderId; readerID+1 < int64(a.maxReaderCount); readerID++ {
if readerID != DefaultReaderId {
moveGroupMinTaskCount = int(float64(moveGroupMinTaskCount) * a.moveGroupTaskCountMultiplier)
}

reader, ok := readerGroup.ReaderByID(readerID)
if !ok {
continue
}

pendingTaskPerGroup := make(map[any]int)
reader.WalkSlices(func(s Slice) {
for key, pendingTaskCount := range s.TaskStats().PendingPerKey {
pendingTaskPerGroup[key] += pendingTaskCount
}
})

groupsToMove := make([]any, 0, len(pendingTaskPerGroup))
for key, pendingTaskCount := range pendingTaskPerGroup {
if pendingTaskCount >= moveGroupMinTaskCount {
groupsToMove = append(groupsToMove, key)
a.logger.Info("Too many pending tasks, moving group to next reader",
tag.QueueReaderID(readerID),
tag.Counter(pendingTaskCount),
tag.Value(key),
)
}
}

if len(groupsToMove) == 0 {
continue
}

predicateForSplit := a.grouper.Predicate(groupsToMove)

var slicesToMove []Slice
reader.SplitSlices(func(s Slice) ([]Slice, bool) {
// Technically we don't need this empty scope check, but it helps avoid
// unnecessary allocation and task movement.
scope := s.Scope()
splitScope, _ := scope.SplitByPredicate(predicateForSplit)
if splitScope.IsEmpty() {
return nil, false
}

split, remain := s.SplitByPredicate(predicateForSplit)
slicesToMove = append(slicesToMove, split)
return []Slice{remain}, true
})

nextReader := readerGroup.GetOrCreateReader(readerID + 1)
nextReader.MergeSlices(slicesToMove...)
moved = true
}

return moved
}
11 changes: 6 additions & 5 deletions service/history/queues/action_pending_task_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@ func (a *actionQueuePendingTask) Name() string {
return "queue-pending-task"
}

func (a *actionQueuePendingTask) Run(readerGroup *ReaderGroup) {
func (a *actionQueuePendingTask) Run(readerGroup *ReaderGroup) bool {
// first check if the alert is still valid
if a.monitor.GetTotalPendingTaskCount() <= a.attributes.CiriticalPendingTaskCount {
return
return false
}

// then try to shrink existing slices, which may reduce pending task count
readers := readerGroup.Readers()
if a.tryShrinkSlice(readers) {
return
if a.shrinkSliceLowTaskCount(readers) {
return false
}

// have to unload pending tasks to reduce pending task count
Expand All @@ -67,9 +67,10 @@ func (a *actionQueuePendingTask) Run(readerGroup *ReaderGroup) {
int(float64(a.attributes.CiriticalPendingTaskCount) * targetLoadFactor),
)
a.splitAndClearSlice(readers, readerGroup)
return true
}

func (a *actionQueuePendingTask) tryShrinkSlice(
func (a *actionQueuePendingTask) shrinkSliceLowTaskCount(
readers map[int64]Reader,
) bool {
for _, reader := range readers {
Expand Down
7 changes: 4 additions & 3 deletions service/history/queues/action_reader_stuck.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ func (a *actionReaderStuck) Name() string {
return "reader-stuck"
}

func (a *actionReaderStuck) Run(readerGroup *ReaderGroup) {
func (a *actionReaderStuck) Run(readerGroup *ReaderGroup) bool {
reader, ok := readerGroup.ReaderByID(a.attributes.ReaderID)
if !ok {
a.logger.Info("Failed to get queue with readerID for reader stuck action", tag.QueueReaderID(a.attributes.ReaderID))
return
return false
}

stuckRange := NewRange(
Expand Down Expand Up @@ -77,9 +77,10 @@ func (a *actionReaderStuck) Run(readerGroup *ReaderGroup) {
})

if len(splitSlices) == 0 {
return
return false
}

nextReader := readerGroup.GetOrCreateReader(a.attributes.ReaderID + 1)
nextReader.MergeSlices(splitSlices...)
return true
}
28 changes: 13 additions & 15 deletions service/history/queues/action_slice_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ func (a *actionSliceCount) Name() string {
return "slice-count"
}

func (a *actionSliceCount) Run(readerGroup *ReaderGroup) {
func (a *actionSliceCount) Run(readerGroup *ReaderGroup) (actionTaken bool) {
// first check if the alert is still valid
if a.monitor.GetTotalSliceCount() <= a.attributes.CriticalSliceCount {
return
return false
}

// then try to shrink existing slices, which may reduce slice count
Expand All @@ -47,7 +47,7 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) {
}
currentSliceCount := a.monitor.GetTotalSliceCount()
if currentSliceCount <= a.attributes.CriticalSliceCount {
return
return false
}

// have to compact (force merge) slices to reduce slice count
Expand All @@ -72,52 +72,50 @@ func (a *actionSliceCount) Run(readerGroup *ReaderGroup) {
// So compact slices with non-univerisal predicate first to minimize the impact
// on other namespaces upon shard reload.

if a.findAndCompactCandidates(
actionTaken = true
if a.findAndCompactLowSliceCount(
readers,
isNotDefaultReader,
isNotUniversalPredicate,
preferredSliceCount,
) {
return
return actionTaken
}

if a.findAndCompactCandidates(
if a.findAndCompactLowSliceCount(
readers,
isDefaultReader,
isNotUniversalPredicate,
preferredSliceCount,
) {
return
return actionTaken
}

if a.findAndCompactCandidates(
if a.findAndCompactLowSliceCount(
readers,
isNotDefaultReader,
isUniversalPredicate,
a.attributes.CriticalSliceCount,
) {
return
return actionTaken
}

a.findAndCompactCandidates(
_ = a.findAndCompactLowSliceCount(
readers,
isDefaultReader,
isUniversalPredicate,
a.attributes.CriticalSliceCount,
)
return actionTaken
}

func (a *actionSliceCount) findAndCompactCandidates(
func (a *actionSliceCount) findAndCompactLowSliceCount(
readers map[int64]Reader,
readerPredicate func(int64) bool,
slicePredicate SlicePredicate,
targetSliceCount int,
) bool {
currentSliceCount := a.monitor.GetTotalSliceCount()
if currentSliceCount <= targetSliceCount {
return true
}

candidates := make([]compactCandidate, 0, currentSliceCount)
for readerID, reader := range readers {
if !readerPredicate(readerID) {
Expand Down
11 changes: 6 additions & 5 deletions service/history/queues/action_slice_predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ func (a *slicePredicateAction) Name() string {
return "slice-predicate"
}

func (a *slicePredicateAction) Run(readerGroup *ReaderGroup) {
func (a *slicePredicateAction) Run(readerGroup *ReaderGroup) bool {
reader, ok := readerGroup.ReaderByID(DefaultReaderId)
if !ok {
return
return false
}

if int64(a.maxReaderCount) <= DefaultReaderId+1 {
return
return false
}

sliceCount := a.monitor.GetSliceCount(DefaultReaderId)
Expand All @@ -72,7 +72,7 @@ func (a *slicePredicateAction) Run(readerGroup *ReaderGroup) {
if !hasNonUniversalPredicate ||
(pendingTasks < moveSliceDefaultReaderMinPendingTaskCount &&
sliceCount < moveSliceDefaultReaderMinSliceCount) {
return
return false
}

var moveSlices []Slice
Expand All @@ -86,9 +86,10 @@ func (a *slicePredicateAction) Run(readerGroup *ReaderGroup) {
})

if len(moveSlices) == 0 {
return
return false
}

nextReader := readerGroup.GetOrCreateReader(DefaultReaderId + 1)
nextReader.MergeSlices(moveSlices...)
return true
}
Loading
Loading