-
Notifications
You must be signed in to change notification settings - Fork 13
/
blocking.go
297 lines (221 loc) · 6.51 KB
/
blocking.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
package queue
import (
"sync"
)
var _ Queue[any] = (*Blocking[any])(nil)
// Blocking is a Queue implementation that additionally supports operations
// that wait for the queue to have available items, and wait for a slot to
// become available in case the queue is full.
// ! The Blocking Queue shares most functionality with channels. If you do
// not make use of Peek, Reset and Contains methods you are safe to use channels instead.
//
// It supports operations for retrieving and adding elements to a FIFO queue.
// If there are no elements available the retrieve operations wait until
// elements are added to the queue.
type Blocking[T comparable] struct {
// elements queue
elements []T
elementsIndex int
initialLen int
capacity *int
// synchronization
lock sync.RWMutex
notEmptyCond *sync.Cond
notFullCond *sync.Cond
}
// NewBlocking returns a new Blocking Queue containing the given elements.
func NewBlocking[T comparable](
elems []T,
opts ...Option,
) *Blocking[T] {
options := options{
capacity: nil,
}
for _, o := range opts {
o.apply(&options)
}
queue := &Blocking[T]{
elements: elems,
elementsIndex: 0,
initialLen: len(elems),
capacity: options.capacity,
lock: sync.RWMutex{},
}
queue.notEmptyCond = sync.NewCond(&queue.lock)
queue.notFullCond = sync.NewCond(&queue.lock)
if queue.capacity != nil {
if len(queue.elements) > *queue.capacity {
queue.elements = queue.elements[:*queue.capacity]
}
}
return queue
}
// ==================================Insertion=================================
// OfferWait inserts the element to the tail the queue.
// It waits for necessary space to become available.
func (bq *Blocking[T]) OfferWait(elem T) {
bq.lock.Lock()
defer bq.lock.Unlock()
for bq.isFull() {
bq.notFullCond.Wait()
}
bq.elements = append(bq.elements, elem)
bq.notEmptyCond.Signal()
}
// Offer inserts the element to the tail the queue.
// If the queue is full it returns the ErrQueueIsFull error.
func (bq *Blocking[T]) Offer(elem T) error {
bq.lock.Lock()
defer bq.lock.Unlock()
if bq.isFull() {
return ErrQueueIsFull
}
bq.elements = append(bq.elements, elem)
bq.notEmptyCond.Signal()
return nil
}
// Reset sets the queue elements index to 0. The queue will be in its initial
// state.
func (bq *Blocking[T]) Reset() {
bq.lock.Lock()
defer bq.lock.Unlock()
bq.elementsIndex = 0
bq.elements = bq.elements[:bq.initialLen]
bq.notEmptyCond.Broadcast()
}
// ===================================Removal==================================
// GetWait removes and returns the head of the elements queue.
// If no element is available it waits until the queue
// has an element available.
//
// It does not actually remove elements from the elements slice, but
// it's incrementing the underlying index.
func (bq *Blocking[T]) GetWait() (v T) {
bq.lock.Lock()
defer bq.lock.Unlock()
defer bq.notFullCond.Signal()
idx := bq.getNextIndexOrWait()
elem := bq.elements[idx]
bq.elementsIndex++
return elem
}
// Get removes and returns the head of the elements queue.
// If no element is available it returns an ErrNoElementsAvailable error.
//
// It does not actually remove elements from the elements slice, but
// it's incrementing the underlying index.
func (bq *Blocking[T]) Get() (v T, _ error) {
bq.lock.Lock()
defer bq.lock.Unlock()
return bq.get()
}
// Clear removes and returns all elements from the queue.
func (bq *Blocking[T]) Clear() []T {
bq.lock.Lock()
defer bq.lock.Unlock()
defer bq.notFullCond.Broadcast()
removed := bq.elements[bq.elementsIndex:]
bq.elementsIndex += len(removed)
return removed
}
// Iterator returns an iterator over the elements in this queue.
// Iterator returns an iterator over the elements in the queue.
// It removes the elements from the queue.
func (bq *Blocking[T]) Iterator() <-chan T {
bq.lock.RLock()
defer bq.lock.RUnlock()
// use a buffered channel to avoid blocking the iterator.
iteratorCh := make(chan T, bq.size())
// close the channel when the function returns.
defer close(iteratorCh)
// iterate over the elements and send them to the channel.
for {
elem, err := bq.get()
if err != nil {
break
}
iteratorCh <- elem
}
return iteratorCh
}
// =================================Examination================================
// Peek retrieves but does not return the head of the queue.
// If no element is available it returns an ErrNoElementsAvailable error.
func (bq *Blocking[T]) Peek() (v T, _ error) {
bq.lock.RLock()
defer bq.lock.RUnlock()
if bq.isEmpty() {
return v, ErrNoElementsAvailable
}
elem := bq.elements[bq.elementsIndex]
return elem, nil
}
// PeekWait retrieves but does not return the head of the queue.
// If no element is available it waits until the queue
// has an element available.
func (bq *Blocking[T]) PeekWait() T {
bq.lock.Lock()
defer bq.lock.Unlock()
for bq.isEmpty() {
bq.notEmptyCond.Wait()
}
elem := bq.elements[bq.elementsIndex]
// send the not empty signal again in case any remove method waits.
bq.notEmptyCond.Signal()
return elem
}
// Size returns the number of elements in the queue.
func (bq *Blocking[T]) Size() int {
bq.lock.RLock()
defer bq.lock.RUnlock()
return bq.size()
}
// Contains returns true if the queue contains the given element.
func (bq *Blocking[T]) Contains(elem T) bool {
bq.lock.RLock()
defer bq.lock.RUnlock()
for i := range bq.elements[bq.elementsIndex:] {
if bq.elements[i] == elem {
return true
}
}
return false
}
// IsEmpty returns true if the queue is empty.
func (bq *Blocking[T]) IsEmpty() bool {
bq.lock.RLock()
defer bq.lock.RUnlock()
return bq.isEmpty()
}
// ===================================Helpers==================================
// getNextIndexOrWait returns the next available index of the elements slice.
func (bq *Blocking[T]) getNextIndexOrWait() int {
if !bq.isEmpty() {
return bq.elementsIndex
}
bq.notEmptyCond.Wait()
return bq.getNextIndexOrWait()
}
// isEmpty returns true if the queue is empty.
func (bq *Blocking[T]) isEmpty() bool {
return bq.elementsIndex >= len(bq.elements)
}
// isFull returns true if the queue is full.
func (bq *Blocking[T]) isFull() bool {
if bq.capacity == nil {
return false
}
return len(bq.elements)-bq.elementsIndex >= *bq.capacity
}
func (bq *Blocking[T]) size() int {
return len(bq.elements) - bq.elementsIndex
}
func (bq *Blocking[T]) get() (v T, _ error) {
defer bq.notFullCond.Signal()
if bq.isEmpty() {
return v, ErrNoElementsAvailable
}
elem := bq.elements[bq.elementsIndex]
bq.elementsIndex++
return elem, nil
}