Skip to content

Commit

Permalink
Implement task processing queue collection (uber#3260)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored and yux0 committed May 4, 2021
1 parent a65063e commit c9c2fc2
Show file tree
Hide file tree
Showing 6 changed files with 966 additions and 33 deletions.
8 changes: 5 additions & 3 deletions service/history/queue/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type (
State() ProcessingQueueState
Split(ProcessingQueueSplitPolicy) []ProcessingQueue
Merge(ProcessingQueue) []ProcessingQueue
AddTasks(map[task.Key]task.Task)
AddTasks(map[task.Key]task.Task, bool)
UpdateAckLevel()
// TODO: add Offload() method
}
Expand All @@ -69,11 +69,13 @@ type (
// ProcessingQueueCollection manages a list of non-overlapping ProcessingQueues
// and keep track of the current active ProcessingQueue
ProcessingQueueCollection interface {
Level() int
Queues() []ProcessingQueue
ActiveQueue() ProcessingQueue
AddTasks(map[task.Key]task.Task, bool)
UpdateAckLevels()
Split(ProcessingQueueSplitPolicy) []ProcessingQueue
Merge([]ProcessingQueue)
AddTasks(map[task.Key]task.Task)
ActiveQueue() ProcessingQueue
// TODO: add Offload() method
}

Expand Down
32 changes: 23 additions & 9 deletions service/history/queue/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions service/history/queue/processing_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,11 @@ func (q *processingQueueImpl) Merge(
))
}

overlappingQueueAckLevel := maxTaskKey(q1.state.ackLevel, q2.state.ackLevel)
newQueueStates = append(newQueueStates, newProcessingQueueState(
q1.state.level,
maxTaskKey(q1.state.ackLevel, q2.state.ackLevel),
minTaskKey(q1.state.readLevel, q2.state.readLevel),
overlappingQueueAckLevel,
maxTaskKey(minTaskKey(q1.state.readLevel, q2.state.readLevel), overlappingQueueAckLevel),
minTaskKey(q1.state.maxLevel, q2.state.maxLevel),
q1.state.domainFilter.Merge(q2.state.domainFilter),
))
Expand All @@ -234,6 +235,7 @@ func (q *processingQueueImpl) Merge(

func (q *processingQueueImpl) AddTasks(
tasks map[task.Key]task.Task,
more bool,
) {
for key, task := range tasks {
if _, loaded := q.outstandingTasks[key]; loaded {
Expand All @@ -255,6 +257,10 @@ func (q *processingQueueImpl) AddTasks(
q.state.readLevel = key
}
}

if !more {
q.state.readLevel = q.state.maxLevel
}
}

func (q *processingQueueImpl) UpdateAckLevel() {
Expand All @@ -275,6 +281,10 @@ func (q *processingQueueImpl) UpdateAckLevel() {
q.state.ackLevel = key
delete(q.outstandingTasks, key)
}

if len(q.outstandingTasks) == 0 && q.state.readLevel == q.state.maxLevel {
q.state.ackLevel = q.state.maxLevel
}
}

func splitProcessingQueue(
Expand Down
173 changes: 173 additions & 0 deletions service/history/queue/processing_queue_collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:

// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.

// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package queue

import (
"sort"

"github.com/uber/cadence/service/history/task"
)

type (
processingQueueCollection struct {
level int
queues []ProcessingQueue
activeQueue ProcessingQueue
}
)

// NewProcessingQueueCollection creates a new collection for non-overlapping queues
func NewProcessingQueueCollection(
level int,
queues []ProcessingQueue,
) ProcessingQueueCollection {
sortProcessingQueue(queues)
queueCollection := &processingQueueCollection{
level: level,
queues: queues,
}
queueCollection.resetActiveQueue()

return queueCollection
}

func (c *processingQueueCollection) Level() int {
return c.level
}

func (c *processingQueueCollection) Queues() []ProcessingQueue {
return c.queues
}

func (c *processingQueueCollection) ActiveQueue() ProcessingQueue {
return c.activeQueue
}

func (c *processingQueueCollection) AddTasks(
tasks map[task.Key]task.Task,
more bool,
) {
c.ActiveQueue().AddTasks(tasks, more)

if !more {
c.resetActiveQueue()
}
}

func (c *processingQueueCollection) UpdateAckLevels() {
remainingQueues := make([]ProcessingQueue, 0, len(c.queues))

for _, queue := range c.queues {
queue.UpdateAckLevel()
if !taskKeyEquals(queue.State().AckLevel(), queue.State().MaxLevel()) {
remainingQueues = append(remainingQueues, queue)
continue
}
}

c.queues = remainingQueues
}

func (c *processingQueueCollection) Split(
policy ProcessingQueueSplitPolicy,
) []ProcessingQueue {
newQueues := make([]ProcessingQueue, 0, len(c.queues))
nextLevelQueues := []ProcessingQueue{}

for _, queue := range c.queues {
splitQueues := queue.Split(policy)
sortProcessingQueue(splitQueues)
for _, splitQueue := range splitQueues {
if splitQueue.State().Level() != c.level {
nextLevelQueues = append(nextLevelQueues, splitQueue)
} else {
newQueues = append(newQueues, splitQueue)
}
}
}

c.queues = newQueues

c.resetActiveQueue()

return nextLevelQueues
}

func (c *processingQueueCollection) Merge(
incomingQueues []ProcessingQueue,
) {
sortProcessingQueue(incomingQueues)

newQueues := make([]ProcessingQueue, 0, len(c.queues)+len(incomingQueues))

currentQueueIdx := 0
incomingQueueIdx := 0
for incomingQueueIdx < len(incomingQueues) && currentQueueIdx < len(c.queues) {
mergedQueues := c.queues[currentQueueIdx].Merge(incomingQueues[incomingQueueIdx])
sortProcessingQueue(mergedQueues)
newQueues = append(newQueues, mergedQueues[:len(mergedQueues)-1]...)

lastMergedQueue := mergedQueues[len(mergedQueues)-1]
if currentQueueIdx+1 == len(c.queues) ||
!c.queues[currentQueueIdx+1].State().AckLevel().Less(lastMergedQueue.State().MaxLevel()) {

newQueues = append(newQueues, lastMergedQueue)
incomingQueueIdx++
} else {
incomingQueues[incomingQueueIdx] = lastMergedQueue
}

currentQueueIdx++
}

if incomingQueueIdx < len(incomingQueues) {
newQueues = append(newQueues, incomingQueues[incomingQueueIdx:]...)
}

if currentQueueIdx < len(c.queues) {
newQueues = append(newQueues, c.queues[currentQueueIdx:]...)
}

c.queues = newQueues

c.resetActiveQueue()
}

func (c *processingQueueCollection) resetActiveQueue() {
for _, queue := range c.queues {
if !taskKeyEquals(queue.State().ReadLevel(), queue.State().MaxLevel()) {
c.activeQueue = queue
return
}
}
c.activeQueue = nil
}

func sortProcessingQueue(
queues []ProcessingQueue,
) {
sort.Slice(queues, func(i, j int) bool {
if queues[i].State().Level() == queues[j].State().Level() {
return queues[i].State().AckLevel().Less(queues[j].State().AckLevel())
}
return queues[i].State().Level() < queues[j].State().Level()
})
}
Loading

0 comments on commit c9c2fc2

Please sign in to comment.