Skip to content

Fix handling of IO#close interruption across threads/fibers. #369

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def block(blocker, timeout)
# @parameter blocker [Object] The object that was blocking the fiber.
# @parameter fiber [Fiber] The fiber to unblock.
def unblock(blocker, fiber)
# $stderr.puts "unblock(#{blocker}, #{fiber})"
# Fiber.blocking{$stderr.puts "unblock(#{blocker}, #{fiber})"}

# This operation is protected by the GVL:
if selector = @selector
Expand All @@ -250,6 +250,8 @@ def unblock(blocker, fiber)
#
# @parameter duration [Numeric | Nil] The time in seconds to sleep, or if nil, indefinitely.
def kernel_sleep(duration = nil)
# Fiber.blocking{$stderr.puts "kernel_sleep(#{duration}, #{Fiber.current})"}

if duration
self.block(nil, duration)
else
Expand Down Expand Up @@ -348,6 +350,34 @@ def io_write(io, buffer, length, offset = 0)
end
end

# Used to defer stopping the current task until later.
class FiberInterrupt
# Create a new stop later operation.
#
# @parameter task [Task] The task to stop later.
def initialize(fiber, exception)
@fiber = fiber
@exception = exception
end

# @returns [Boolean] Whether the task is alive.
def alive?
@fiber.alive?
end

# Transfer control to the operation - this will stop the task.
def transfer
# Fiber.blocking{$stderr.puts "FiberInterrupt#transfer(#{@fiber}, #{@exception})"}
@fiber.raise(@exception)
end
end

# Raise an exception on the specified fiber, waking up the event loop if necessary.
def fiber_interrupt(fiber, exception)
# Fiber.blocking{$stderr.puts "fiber_interrupt(#{fiber}, #{exception})"}
unblock(nil, FiberInterrupt.new(fiber, exception))
end

# Wait for the specified process ID to exit.
#
# @public Since *Async v2*.
Expand Down
26 changes: 26 additions & 0 deletions test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env ruby

require_relative 'lib/async'

Async do
r, w = IO.pipe

read_thread = Thread.new do
Thread.current.report_on_exception = false
r.read(5)
end

# Wait until read_thread blocks on I/O
Thread.pass until read_thread.status == "sleep"

close_task = Async do
r.close
end

close_task.wait
begin
read_thread.join
rescue => error
puts "Caught exception: #{error.class} - #{error.message}"
end
end
134 changes: 134 additions & 0 deletions test/io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,138 @@
out.close
end
end

with "#close" do
it "can interrupt reading fiber when closing" do
skip_unless_minimum_ruby_version("3.5")

r, w = IO.pipe

read_task = Async do
expect do
r.read(5)
end.to raise_exception(IOError, message: be =~ /stream closed/)
end

r.close
read_task.wait
end

it "can interrupt reading fiber when closing from another fiber" do
skip_unless_minimum_ruby_version("3.5")

r, w = IO.pipe

read_task = Async do
expect do
r.read(5)
end.to raise_exception(IOError, message: be =~ /stream closed/)
end

close_task = Async do
r.close
end

close_task.wait
read_task.wait
end

it "can interrupt reading fiber when closing from a new thread" do
skip_unless_minimum_ruby_version("3.5")

r, w = IO.pipe

read_task = Async do
expect do
r.read(5)
end.to raise_exception(IOError, message: be =~ /stream closed/)
end

close_thread = Thread.new do
r.close
end

close_thread.value
read_task.wait
end

it "can interrupt reading fiber when closing from a fiber in a new thread" do
skip_unless_minimum_ruby_version("3.5")

r, w = IO.pipe

read_task = Async do
expect do
r.read(5)
end.to raise_exception(IOError, message: be =~ /stream closed/)
end

close_thread = Thread.new do
close_task = Async do
r.close
end
close_task.wait
end

close_thread.value
read_task.wait
end

it "can interrupt reading thread when closing from a fiber" do
skip_unless_minimum_ruby_version("3.5")

$stderr.puts "---------------------------------"
r, w = IO.pipe

read_thread = Thread.new do
Thread.current.report_on_exception = false
puts "Reading in thread #{Thread.current}"
r.read(5)
ensure
puts "Thread #{Thread.current} finished reading"
end

# Wait until read_thread blocks on I/O
Thread.pass until read_thread.status == "sleep"

close_task = Async do
puts "Closing in fiber #{Thread.current}"
r.close
ensure
puts "Closed in fiber #{Thread.current}"
end

close_task.wait

expect do
read_thread.join
end.to raise_exception(IOError, message: be =~ /stream closed/)
end

it "can interrupt reading fiber in a new thread when closing from a fiber" do
skip_unless_minimum_ruby_version("3.5")

r, w = IO.pipe

read_thread = Thread.new do
Thread.current.report_on_exception = false
read_task = Async do
expect do
r.read(5)
end.to raise_exception(IOError, message: be =~ /stream closed/)
end
read_task.wait
end

# Wait until read_thread blocks on I/O
Thread.pass until read_thread.status == "sleep"

close_task = Async do
r.close
end
close_task.wait

read_thread.value
end
end
end
Loading