-
Notifications
You must be signed in to change notification settings - Fork 741
/
Copy pathbounded_nonblocking_queue.go
90 lines (77 loc) · 2.32 KB
/
bounded_nonblocking_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package buffer
import "errors"
var (
_ Queue[struct{}] = (*boundedQueue[struct{}])(nil)
errInvalidMaxSize = errors.New("maxSize must be greater than 0")
)
// A FIFO queue.
type Queue[T any] interface {
// Pushes [elt] onto the queue.
// If the queue is full, the oldest element is evicted to make space.
Push(T)
// Pops the oldest element from the queue.
// Returns false if the queue is empty.
Pop() (T, bool)
// Returns the oldest element without removing it.
// Returns false if the queue is empty.
Peek() (T, bool)
// Returns the element at the given index without removing it.
// Index(0) returns the oldest element.
// Index(Len() - 1) returns the newest element.
// Returns false if there is no element at that index.
Index(int) (T, bool)
// Returns the number of elements in the queue.
Len() int
// Returns the queue elements from oldest to newest.
// This is an O(n) operation and should be used sparingly.
List() []T
}
// Keeps up to [maxSize] entries in an ordered buffer
// and calls [onEvict] on any item that is evicted.
// Not safe for concurrent use.
type boundedQueue[T any] struct {
deque Deque[T]
maxSize int
onEvict func(T)
}
// Returns a new bounded, non-blocking queue that holds up to [maxSize] elements.
// When an element is evicted, [onEvict] is called with the evicted element.
// If [onEvict] is nil, this is a no-op.
// [maxSize] must be >= 1.
// Not safe for concurrent use.
func NewBoundedQueue[T any](maxSize int, onEvict func(T)) (Queue[T], error) {
if maxSize < 1 {
return nil, errInvalidMaxSize
}
return &boundedQueue[T]{
deque: NewUnboundedDeque[T](maxSize + 1), // +1 so we never resize
maxSize: maxSize,
onEvict: onEvict,
}, nil
}
func (b *boundedQueue[T]) Push(elt T) {
if b.deque.Len() == b.maxSize {
evicted, _ := b.deque.PopLeft()
if b.onEvict != nil {
b.onEvict(evicted)
}
}
_ = b.deque.PushRight(elt)
}
func (b *boundedQueue[T]) Pop() (T, bool) {
return b.deque.PopLeft()
}
func (b *boundedQueue[T]) Peek() (T, bool) {
return b.deque.PeekLeft()
}
func (b *boundedQueue[T]) Index(i int) (T, bool) {
return b.deque.Index(i)
}
func (b *boundedQueue[T]) Len() int {
return b.deque.Len()
}
func (b *boundedQueue[T]) List() []T {
return b.deque.List()
}