diff --git a/common/prque/lazyqueue.go b/common/prque/lazyqueue.go index 035b8ff9e7b99..fc6b9b2f26e9e 100644 --- a/common/prque/lazyqueue.go +++ b/common/prque/lazyqueue.go @@ -26,9 +26,10 @@ import ( // LazyQueue is a priority queue data structure where priorities can change over // time and are only evaluated on demand. // Two callbacks are required: -// - priority evaluates the actual priority of an item -// - maxPriority gives an upper estimate for the priority in any moment between -// now and the given absolute time +// - priority evaluates the actual priority of an item +// - maxPriority gives an upper estimate for the priority in any moment between +// now and the given absolute time +// // If the upper estimate is exceeded then Update should be called for that item. // A global Refresh function should also be called periodically. type LazyQueue struct { @@ -36,14 +37,15 @@ type LazyQueue struct { // Items are stored in one of two internal queues ordered by estimated max // priority until the next and the next-after-next refresh. Update and Refresh // always places items in queue[1]. - queue [2]*sstack - popQueue *sstack - period time.Duration - maxUntil mclock.AbsTime - indexOffset int - setIndex SetIndexCallback - priority PriorityCallback - maxPriority MaxPriorityCallback + queue [2]*sstack + popQueue *sstack + period time.Duration + maxUntil mclock.AbsTime + indexOffset int + setIndex SetIndexCallback + priority PriorityCallback + maxPriority MaxPriorityCallback + lastRefresh1, lastRefresh2 mclock.AbsTime } type ( @@ -54,14 +56,17 @@ type ( // NewLazyQueue creates a new lazy queue func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue { q := &LazyQueue{ - popQueue: newSstack(nil), - setIndex: setIndex, - priority: priority, - maxPriority: maxPriority, - clock: clock, - period: refreshPeriod} + popQueue: newSstack(nil), + setIndex: setIndex, + priority: priority, + maxPriority: maxPriority, + clock: clock, + period: refreshPeriod, + lastRefresh1: clock.Now(), + lastRefresh2: clock.Now(), + } q.Reset() - q.Refresh() + q.refresh(clock.Now()) return q } @@ -71,9 +76,19 @@ func (q *LazyQueue) Reset() { q.queue[1] = newSstack(q.setIndex1) } -// Refresh should be called at least with the frequency specified by the refreshPeriod parameter +// Refresh performs queue re-evaluation if necessary func (q *LazyQueue) Refresh() { - q.maxUntil = q.clock.Now() + mclock.AbsTime(q.period) + now := q.clock.Now() + for time.Duration(now-q.lastRefresh2) >= q.period*2 { + q.refresh(now) + q.lastRefresh2 = q.lastRefresh1 + q.lastRefresh1 = now + } +} + +// refresh re-evaluates items in the older queue and swaps the two queues +func (q *LazyQueue) refresh(now mclock.AbsTime) { + q.maxUntil = now + mclock.AbsTime(q.period) for q.queue[0].Len() != 0 { q.Push(heap.Pop(q.queue[0]).(*item).value) } @@ -139,6 +154,7 @@ func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) boo } return } + nextIndex = q.peekIndex() // re-check because callback is allowed to push items back } } }