Skip to content

Commit b0491bf

Browse files
committed
Introduce Queue#waiting_count and PriorityQueue#waiting_count.
Generally for statistics purposes only.
1 parent 8617afa commit b0491bf

File tree

5 files changed

+49
-25
lines changed

5 files changed

+49
-25
lines changed

fixtures/async/a_queue.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,18 @@ module Async
4141
expect(queue.pop).to be == :item
4242
expect(queue.size).to be == 0
4343
end
44+
45+
it "can block until an item is available" do
46+
child = reactor.async do
47+
queue.pop
48+
end
49+
50+
expect(queue).to have_attributes(size: be == 0, waiting_count: be == 1)
51+
52+
queue.push(:item)
53+
54+
expect(child.wait).to be == :item
55+
end
4456
end
4557

4658
with "#each" do

lib/async/priority_queue.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,15 @@ def empty?
102102
end
103103

104104
# @returns [Integer] The number of fibers waiting to dequeue.
105-
def waiting
105+
def waiting_count
106106
@mutex.synchronize do
107107
@waiting.size
108108
end
109109
end
110110

111+
# @deprecated Use {#waiting_count} instead.
112+
alias waiting waiting_count
113+
111114
# Add an item to the queue.
112115
#
113116
# @parameter item [Object] The item to add to the queue.

lib/async/queue.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ def empty?
5353
@delegate.empty?
5454
end
5555

56+
# @returns [Integer] The number of tasks waiting for an item.
57+
def waiting_count
58+
@delegate.num_waiting
59+
end
60+
5661
# Add an item to the queue.
5762
def push(item)
5863
@delegate.push(item)

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Introduce `Queue#waiting_count` and `PriorityQueue#waiting_count`. Generally for statistics/testing purposes only.
6+
37
## v2.31.0
48

59
- Introduce `Async::Deadline` for precise timeout management in compound operations.

test/async/priority_queue.rb

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
end
9393

9494
# Confirm all consumers are waiting:
95-
expect(queue.waiting).to be == 3
95+
expect(queue.waiting_count).to be == 3
9696

9797
# Add items:
9898
queue.push(:item1)
@@ -134,7 +134,7 @@
134134
end
135135

136136
# Confirm all consumers are waiting:
137-
expect(queue.waiting).to be == 4
137+
expect(queue.waiting_count).to be == 4
138138

139139
# Add items:
140140
4.times {|i| queue.push("item#{i}")}
@@ -162,7 +162,7 @@
162162
results << [:high, queue.dequeue(priority: 10)]
163163
end
164164

165-
expect(queue.waiting).to be == 2
165+
expect(queue.waiting_count).to be == 2
166166

167167
# Add one item - should go to high priority consumer:
168168
queue.push(:item)
@@ -198,7 +198,7 @@
198198
end
199199

200200
# Confirm low priority consumer is waiting:
201-
expect(queue.waiting).to be == 1
201+
expect(queue.waiting_count).to be == 1
202202

203203
# Add an item - now we have waiters:
204204
queue.push(:available_item)
@@ -209,15 +209,15 @@
209209
end
210210

211211
# Confirm second low priority consumer is waiting (first one got the item):
212-
expect(queue.waiting).to be == 1
212+
expect(queue.waiting_count).to be == 1
213213

214214
# Now a high priority consumer should jump ahead of remaining waiters:
215215
high = reactor.async do
216216
results << [:high, queue.dequeue(priority: 10)]
217217
end
218218

219219
# Confirm high priority consumer is also waiting (total 2 waiting):
220-
expect(queue.waiting).to be == 2
220+
expect(queue.waiting_count).to be == 2
221221

222222
# Add more items to satisfy all waiters:
223223
queue.push(:item2)
@@ -248,7 +248,7 @@
248248
end
249249

250250
# Confirm low priority waiter got item1 and finished:
251-
expect(queue.waiting).to be == 0
251+
expect(queue.waiting_count).to be == 0
252252

253253
# The low priority waiter should have taken item1.
254254
# High priority consumer gets item2:
@@ -261,21 +261,21 @@
261261

262262
with "#waiting" do
263263
it "returns the number of waiting fibers" do
264-
expect(queue.waiting).to be == 0
264+
expect(queue.waiting_count).to be == 0
265265

266266
task1 = reactor.async {queue.dequeue}
267-
expect(queue.waiting).to be == 1
267+
expect(queue.waiting_count).to be == 1
268268

269269
task2 = reactor.async {queue.dequeue}
270-
expect(queue.waiting).to be == 2
270+
expect(queue.waiting_count).to be == 2
271271

272272
queue.push(:item)
273273
task1.wait
274-
expect(queue.waiting).to be == 1
274+
expect(queue.waiting_count).to be == 1
275275

276276
queue.push(:item)
277277
task2.wait
278-
expect(queue.waiting).to be == 0
278+
expect(queue.waiting_count).to be == 0
279279
end
280280
end
281281

@@ -297,7 +297,7 @@
297297
end
298298

299299
# Confirm both async tasks are waiting:
300-
expect(queue.waiting).to be == 2
300+
expect(queue.waiting_count).to be == 2
301301

302302
# Add items:
303303
queue.push(:item1)
@@ -330,7 +330,7 @@
330330
end
331331

332332
# Confirm all waiters are ready:
333-
expect(queue.waiting).to be == 3
333+
expect(queue.waiting_count).to be == 3
334334

335335
# Add multiple items at once:
336336
queue.enqueue(:item1, :item2, :item3)
@@ -360,7 +360,7 @@
360360
end
361361

362362
# Confirm iterator is waiting:
363-
expect(queue.waiting).to be == 1
363+
expect(queue.waiting_count).to be == 1
364364

365365
# Add items and nil to terminate:
366366
queue.push(:first)
@@ -394,7 +394,7 @@
394394
task = reactor.async {queue.dequeue(priority: 5)}
395395

396396
# Confirm waiter is ready:
397-
expect(queue.waiting).to be == 1
397+
expect(queue.waiting_count).to be == 1
398398

399399
# Close the queue:
400400
queue.close
@@ -437,7 +437,7 @@
437437
end
438438

439439
# Confirm all consumers are waiting:
440-
expect(queue.waiting).to be == num_consumers
440+
expect(queue.waiting_count).to be == num_consumers
441441

442442
# Add items:
443443
num_items.times {|i| queue.push("item#{i}")}
@@ -467,7 +467,7 @@
467467
task = reactor.async {queue.dequeue(priority: 5)}
468468

469469
# Confirm waiter is waiting:
470-
expect(queue.waiting).to be == 1
470+
expect(queue.waiting_count).to be == 1
471471

472472
# Stop the waiting task:
473473
task.stop
@@ -485,7 +485,7 @@
485485
task = reactor.async {queue.dequeue(priority: 5)}
486486

487487
# Confirm waiter is waiting:
488-
expect(queue.waiting).to be == 1
488+
expect(queue.waiting_count).to be == 1
489489

490490
# Stop the waiting task:
491491
task.stop
@@ -507,7 +507,7 @@
507507
task3 = reactor.async {results << [:task3, queue.dequeue(priority: 1)]}
508508

509509
# Confirm all three are waiting:
510-
expect(queue.waiting).to be == 3
510+
expect(queue.waiting_count).to be == 3
511511

512512
# Stop first two tasks:
513513
task1.stop
@@ -522,7 +522,7 @@
522522
# BUG: Currently stopped waiters consume items:
523523
expect(results).to be == [[:task3, :item1]] # Should get first item
524524
expect(queue.size).to be == 1 # Second item should remain
525-
expect(queue.waiting).to be == 0 # No waiters should remain
525+
expect(queue.waiting_count).to be == 0 # No waiters should remain
526526
end
527527

528528
it "maintains correct priority order with stopped waiters" do
@@ -534,7 +534,7 @@
534534
medium_task = reactor.async {results << [:medium, queue.dequeue(priority: 5)]}
535535

536536
# Confirm all are waiting:
537-
expect(queue.waiting).to be == 3
537+
expect(queue.waiting_count).to be == 3
538538

539539
# Stop the high priority waiter (should have been first):
540540
high_task.stop
@@ -560,7 +560,7 @@
560560
queue.dequeue(priority: 1)
561561
end
562562

563-
expect(queue.waiting).to be == 1
563+
expect(queue.waiting_count).to be == 1
564564

565565
# Stop the task (simulates exception)
566566
task.stop
@@ -592,7 +592,7 @@
592592
end
593593

594594
# Give tasks time to start waiting
595-
expect(queue.waiting).to be == 3
595+
expect(queue.waiting_count).to be == 3
596596

597597
# Stop the middle priority task (priority 1)
598598
tasks[1].stop

0 commit comments

Comments
 (0)