Skip to content

Commit 7f571e7

Browse files
Better handling of waiter invalidation in PriorityQueue. (#419)
Add waiter invalidation to prevent abandoned waiters from accumulating in the `@waiting` heap when exceptions occur during dequeue operations. - Add `Waiter#invalidate!` method that nullifies fiber and condition. - Add `Waiter#valid?` method to check if waiter is still usable. - Add ensure block in dequeue to invalidate waiters on exceptions.
1 parent f75b823 commit 7f571e7

File tree

2 files changed

+105
-16
lines changed

2 files changed

+105
-16
lines changed

lib/async/priority_queue.rb

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ def wait_for_value(mutex)
4343
condition.wait(mutex)
4444
return self.value
4545
end
46+
47+
# Invalidate this waiter, making it unusable and detectable as abandoned.
48+
def invalidate!
49+
self.fiber = nil
50+
end
51+
52+
# Check if this waiter has been invalidated.
53+
def valid?
54+
self.fiber&.alive?
55+
end
4656
end
4757

4858
# Create a new priority queue.
@@ -64,12 +74,9 @@ def close
6474
@mutex.synchronize do
6575
@closed = true
6676

67-
# Signal all waiting fibers with nil, skipping dead ones:
77+
# Signal all waiting fibers with nil, skipping dead/invalid ones:
6878
while waiter = @waiting.pop
69-
if waiter.fiber.alive?
70-
waiter.signal(nil)
71-
end
72-
# Dead waiter discarded, continue to next one.
79+
waiter.signal(nil)
7380
end
7481
end
7582
end
@@ -105,14 +112,14 @@ def push(item)
105112

106113
@items << item
107114

108-
# Wake up the highest priority waiter if any, skipping dead waiters:
115+
# Wake up the highest priority waiter if any, skipping dead/invalid waiters:
109116
while waiter = @waiting.pop
110-
if waiter.fiber.alive?
117+
if waiter.valid?
111118
value = @items.shift
112119
waiter.signal(value)
113120
break
114121
end
115-
# Dead waiter discarded, try next one.
122+
# Dead/invalid waiter discarded, try next one.
116123
end
117124
end
118125
end
@@ -133,13 +140,13 @@ def enqueue(*items)
133140

134141
@items.concat(items)
135142

136-
# Wake up waiting fibers in priority order, skipping dead waiters:
143+
# Wake up waiting fibers in priority order, skipping dead/invalid waiters:
137144
while !@items.empty? && (waiter = @waiting.pop)
138-
if waiter.fiber.alive?
145+
if waiter.valid?
139146
value = @items.shift
140147
waiter.signal(value)
141148
end
142-
# Dead waiter discarded, continue to next one.
149+
# Dead/invalid waiter discarded, continue to next one.
143150
end
144151
end
145152
end
@@ -172,12 +179,16 @@ def dequeue(priority: 0)
172179
@sequence += 1
173180

174181
condition = ConditionVariable.new
175-
waiter = Waiter.new(Fiber.current, priority, sequence, condition, nil)
176-
@waiting.push(waiter)
177182

178-
# Wait for our specific condition variable to be signaled:
179-
# The mutex is released during wait, reacquired after:
180-
return waiter.wait_for_value(@mutex)
183+
begin
184+
waiter = Waiter.new(Fiber.current, priority, sequence, condition, nil)
185+
@waiting.push(waiter)
186+
187+
# Wait for our specific condition variable to be signaled:
188+
return waiter.wait_for_value(@mutex)
189+
ensure
190+
waiter&.invalidate!
191+
end
181192
end
182193
end
183194

test/async/priority_queue.rb

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,4 +547,82 @@
547547
]
548548
end
549549
end
550+
551+
with "waiter invalidation" do
552+
it "should invalidate waiters when tasks are stopped to prevent memory leaks" do
553+
# Start a task that will wait and then be stopped
554+
task = reactor.async do
555+
queue.dequeue(priority: 1)
556+
end
557+
558+
expect(queue.waiting).to be == 1
559+
560+
# Stop the task (simulates exception)
561+
task.stop
562+
task.wait
563+
564+
# Now enqueue an item - should not try to wake the invalid waiter
565+
queue.enqueue("test_item")
566+
567+
# The item should still be available for a new waiter
568+
result = nil
569+
new_task = reactor.async do
570+
result = queue.dequeue
571+
end
572+
573+
new_task.wait
574+
expect(result).to be == "test_item"
575+
end
576+
577+
it "should skip invalid waiters during enqueue" do
578+
received_items = []
579+
580+
# Start multiple waiters
581+
tasks = []
582+
3.times do |i|
583+
tasks << reactor.async do
584+
item = queue.dequeue(priority: i)
585+
received_items << [i, item]
586+
end
587+
end
588+
589+
# Give tasks time to start waiting
590+
expect(queue.waiting).to be == 3
591+
592+
# Stop the middle priority task (priority 1)
593+
tasks[1].stop
594+
tasks[1].wait
595+
596+
# Add items to the queue
597+
queue.enqueue("item1", "item2")
598+
599+
tasks[0].wait
600+
tasks[2].wait
601+
602+
# Should have received items in the valid waiters only
603+
# Invalid waiter (priority 1) should be skipped
604+
expect(received_items.size).to be == 2
605+
606+
# Items should go to highest priority waiters (2, then 0)
607+
priorities_served = received_items.map(&:first).sort.reverse
608+
expect(priorities_served).to be == [2, 0]
609+
end
610+
end
611+
612+
describe Async::PriorityQueue::Waiter do
613+
it "should invalidate correctly" do
614+
condition = ConditionVariable.new
615+
fiber = Fiber.current
616+
waiter = Async::PriorityQueue::Waiter.new(fiber, 1, 1, condition, nil)
617+
618+
expect(waiter).to be(:valid?)
619+
expect(waiter.fiber).to be == fiber
620+
expect(waiter.condition).to be == condition
621+
622+
waiter.invalidate!
623+
624+
expect(waiter).not.to be(:valid?)
625+
expect(waiter.fiber).to be_nil
626+
end
627+
end
550628
end

0 commit comments

Comments
 (0)