Skip to content

Commit 94f3ea6

Browse files
committed
io: corrected error handling for streams
After #32309, we became much more eager to forget about normal errors and more eager to throw an EOFError at the wrong place. This is intended to fix that oversight.
1 parent 67cdc55 commit 94f3ea6

File tree

2 files changed

+94
-78
lines changed

2 files changed

+94
-78
lines changed

base/stream.jl

Lines changed: 81 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,16 @@ abstract type LibuvStream <: IO end
4444
bytesavailable(s::LibuvStream) = bytesavailable(s.buffer)
4545

4646
function eof(s::LibuvStream)
47-
if isopen(s) # fast path
48-
bytesavailable(s) <= 0 || return false
49-
else
50-
return bytesavailable(s) <= 0
51-
end
47+
bytesavailable(s) > 0 && return false
5248
wait_readnb(s, 1)
53-
return !isopen(s) && bytesavailable(s) <= 0
49+
# This function is race-y if used from multiple threads, but we guarantee
50+
# it to never return false until the stream is definitively exhausted
51+
# and that we won't return true if there's a readerror pending (it'll instead get thrown).
52+
# This requires some careful ordering here (TODO: atomic loads)
53+
bytesavailable(s) > 0 && return false
54+
open = isopen(s) # must preceed readerror check
55+
s.readerror === nothing || throw(s.readerror)
56+
return !open
5457
end
5558

5659
# Limit our default maximum read and buffer size,
@@ -327,17 +330,25 @@ function check_open(x::Union{LibuvStream, LibuvServer})
327330
end
328331

329332
function wait_readnb(x::LibuvStream, nb::Int)
333+
# fast path before iolock acquire
334+
bytesavailable(x.buffer) >= nb && return
335+
open = isopen(x) # must preceed readerror check
336+
x.readerror === nothing || throw(x.readerror)
337+
open || return
330338
iolock_begin()
331-
if !isopen(x) || bytesavailable(x.buffer) >= nb # fast path
332-
iolock_end()
333-
return
334-
end
339+
# repeat fast path after iolock acquire, before other expensive work
340+
bytesavailable(x.buffer) >= nb && (iolock_end(); return)
341+
open = isopen(x)
342+
x.readerror === nothing || throw(x.readerror)
343+
open || (iolock_end(); return)
344+
# now do the "real" work
335345
oldthrottle = x.throttle
336346
preserve_handle(x)
337347
lock(x.cond)
338348
try
339-
while isopen(x) && bytesavailable(x.buffer) < nb
349+
while bytesavailable(x.buffer) < nb
340350
x.readerror === nothing || throw(x.readerror)
351+
isopen(x) || break
341352
x.throttle = max(nb, x.throttle)
342353
start_reading(x) # ensure we are reading
343354
iolock_end()
@@ -351,6 +362,8 @@ function wait_readnb(x::LibuvStream, nb::Int)
351362
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
352363
end
353364
if oldthrottle <= x.throttle <= nb
365+
# if we're interleaving readers, we might not get back to the "original" throttle
366+
# but we consider that an acceptable "risk", since we can't be quite sure what the intended value is now
354367
x.throttle = oldthrottle
355368
end
356369
unpreserve_handle(x)
@@ -543,7 +556,6 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
543556
# remind the client that stream.buffer is full
544557
notify(stream.cond)
545558
elseif nread == UV_EOF
546-
stream.readerror = EOFError()
547559
if isa(stream, TTY)
548560
stream.status = StatusEOF # libuv called uv_stop_reading already
549561
notify(stream.cond)
@@ -589,7 +601,6 @@ function reseteof(x::TTY)
589601
iolock_begin()
590602
if x.status == StatusEOF
591603
x.status = StatusOpen
592-
x.readerror isa EOFError && (x.readerror = nothing)
593604
end
594605
iolock_end()
595606
nothing
@@ -772,40 +783,33 @@ function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int)
772783
@assert sbuf.seekable == false
773784
@assert sbuf.maxsize >= nb
774785

775-
local nread
776-
if bytesavailable(sbuf) >= nb
777-
nread = readbytes!(sbuf, a, nb)
778-
iolock_end()
779-
return nread
780-
end
781-
782-
if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
783-
while isopen(s) && bytesavailable(sbuf) < nb
786+
function wait_locked(s, buf, nb)
787+
while bytesavailable(buf) < nb
788+
s.readerror === nothing || throw(s.readerror)
789+
isopen(s) || break
784790
iolock_end()
785791
wait_readnb(s, nb)
786792
iolock_begin()
787793
end
788-
nread = readbytes!(sbuf, a, nb)
789-
iolock_end()
790-
return nread
791794
end
792795

793-
nread = try
794-
stop_reading(s) # Just playing it safe, since we are going to switch buffers.
795-
newbuf = PipeBuffer(a, maxsize = nb)
796+
if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
797+
wait_locked(s, sbuf, nb)
798+
end
799+
if bytesavailable(sbuf) >= nb
800+
nread = readbytes!(sbuf, a, nb)
801+
else
802+
newbuf = PipeBuffer(a, maxsize=nb)
796803
newbuf.size = 0 # reset the write pointer to the beginning
797-
s.buffer = newbuf
798-
write(newbuf, sbuf)
799-
iolock_end()
800-
wait_readnb(s, Int(nb))
801-
iolock_begin()
802-
compact(newbuf)
803-
bytesavailable(newbuf)
804-
finally
805-
s.buffer = sbuf
806-
if !isempty(s.cond)
807-
start_reading(s) # resume reading iff there are currently other read clients of the stream
804+
nread = try
805+
s.buffer = newbuf
806+
write(newbuf, sbuf)
807+
wait_locked(s, newbuf, nb)
808+
bytesavailable(newbuf)
809+
finally
810+
s.buffer = sbuf
808811
end
812+
compact(newbuf)
809813
end
810814
iolock_end()
811815
return nread
@@ -825,31 +829,30 @@ function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt)
825829
@assert sbuf.seekable == false
826830
@assert sbuf.maxsize >= nb
827831

828-
if bytesavailable(sbuf) >= nb
829-
unsafe_read(sbuf, p, nb)
830-
elseif nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
831-
while isopen(s) && bytesavailable(sbuf) < nb
832+
function wait_locked(s, buf, nb)
833+
while bytesavailable(buf) < nb
834+
s.readerror === nothing || throw(s.readerror)
835+
isopen(s) || throw(EOFError())
832836
iolock_end()
833-
wait_readnb(s, Int(nb))
837+
wait_readnb(s, nb)
834838
iolock_begin()
835839
end
840+
end
841+
842+
if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
843+
wait_locked(s, sbuf, Int(nb))
844+
end
845+
if bytesavailable(sbuf) >= nb
836846
unsafe_read(sbuf, p, nb)
837847
else
848+
newbuf = PipeBuffer(unsafe_wrap(Array, p, nb), maxsize=Int(nb))
849+
newbuf.size = 0 # reset the write pointer to the beginning
838850
try
839-
stop_reading(s) # Just playing it safe, since we are going to switch buffers.
840-
newbuf = PipeBuffer(unsafe_wrap(Array, p, nb), maxsize = Int(nb))
841-
newbuf.size = 0 # reset the write pointer to the beginning
842851
s.buffer = newbuf
843852
write(newbuf, sbuf)
844-
iolock_end()
845-
wait_readnb(s, Int(nb))
846-
iolock_begin()
847-
nb == bytesavailable(newbuf) || throw(EOFError())
853+
wait_locked(s, newbuf, Int(nb))
848854
finally
849855
s.buffer = sbuf
850-
if !isempty(s.cond)
851-
start_reading(s) # resume reading iff there are currently other read clients of the stream
852-
end
853856
end
854857
end
855858
iolock_end()
@@ -860,9 +863,9 @@ function read(this::LibuvStream, ::Type{UInt8})
860863
iolock_begin()
861864
sbuf = this.buffer
862865
@assert sbuf.seekable == false
863-
while isopen(this) && bytesavailable(sbuf) < 1
866+
while bytesavailable(sbuf) < 1
864867
iolock_end()
865-
wait_readnb(this, 1)
868+
eof(this) && throw(EOFError())
866869
iolock_begin()
867870
end
868871
c = read(sbuf, UInt8)
@@ -871,7 +874,7 @@ function read(this::LibuvStream, ::Type{UInt8})
871874
end
872875

873876
function readavailable(this::LibuvStream)
874-
wait_readnb(this, 1)
877+
wait_readnb(this, 1) # unlike the other `read` family of functions, this one doesn't guarantee error reporting
875878
iolock_begin()
876879
buf = this.buffer
877880
@assert buf.seekable == false
@@ -884,25 +887,29 @@ function readuntil(x::LibuvStream, c::UInt8; keep::Bool=false)
884887
iolock_begin()
885888
buf = x.buffer
886889
@assert buf.seekable == false
887-
if isopen(x) && !occursin(c, buf) # fast path
888-
preserve_handle(x)
889-
lock(x.cond)
890-
try
891-
while isopen(x) && !occursin(c, x.buffer)
892-
x.readerror === nothing || throw(x.readerror)
893-
start_reading(x) # ensure we are reading
894-
iolock_end()
895-
wait(x.cond)
890+
if !occursin(c, buf) # fast path checks first
891+
x.readerror === nothing || throw(x.readerror)
892+
if isopen(x)
893+
preserve_handle(x)
894+
lock(x.cond)
895+
try
896+
while !occursin(c, x.buffer)
897+
x.readerror === nothing || throw(x.readerror)
898+
isopen(x) || break
899+
start_reading(x) # ensure we are reading
900+
iolock_end()
901+
wait(x.cond)
902+
unlock(x.cond)
903+
iolock_begin()
904+
lock(x.cond)
905+
end
906+
finally
907+
if isempty(x.cond)
908+
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
909+
end
896910
unlock(x.cond)
897-
iolock_begin()
898-
lock(x.cond)
911+
unpreserve_handle(x)
899912
end
900-
finally
901-
if isempty(x.cond)
902-
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
903-
end
904-
unlock(x.cond)
905-
unpreserve_handle(x)
906913
end
907914
end
908915
bytes = readuntil(buf, c, keep=keep)

stdlib/REPL/test/repl.jl

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -750,11 +750,20 @@ let exename = Base.julia_cmd()
750750
@test output == "1\r\nexit()\r\n1\r\n\r\njulia> "
751751
end
752752
@test bytesavailable(pty_master) == 0
753-
@test try # possibly consume child-exited notification
754-
eof(pty_master)
755-
catch ex
756-
(ex isa Base.IOError && ex.code == Base.UV_EIO) || rethrow()
753+
@test if Sys.iswindows() || Sys.isbsd()
757754
eof(pty_master)
755+
else
756+
# Some platforms (such as linux) report EIO instead of EOF
757+
# possibly consume child-exited notification
758+
# for example, see discussion in https://bugs.python.org/issue5380
759+
try
760+
eof(pty_master) && !Sys.islinux()
761+
catch ex
762+
(ex isa Base.IOError && ex.code == Base.UV_EIO) || rethrow()
763+
@test_throws ex eof(pty_master) # make sure the error is sticky
764+
pty_master.readerror = nothing
765+
eof(pty_master)
766+
end
758767
end
759768
@test read(pty_master, String) == ""
760769
wait(p)

0 commit comments

Comments
 (0)