Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ New library features
write the output to a stream rather than returning a string ([#48625]).
* `sizehint!(s, n)` now supports an optional `shrink` argument to disable shrinking ([#51929]).
* New function `Docs.hasdoc(module, symbol)` tells whether a name has a docstring ([#52139]).
* Passing an IOBuffer as a stdout argument for Process spawn now works as
expected, synchronized with `wait` or `success`, so a `Base.BufferStream` is
no longer required there for correctness to avoid data-races ([#TBD]).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops — "TBD" should have been updated.

* After a process exits, `closewrite` will no longer be automatically called on
the stream passed to it. Call `wait` on the process instead to ensure the
content is fully written, then call `closewrite` manually to avoid
data-races. Or use the callback form of `open` to have all that handled
automatically.

Standard library changes
------------------------
Expand Down
1 change: 1 addition & 0 deletions base/coreio.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ struct DevNull <: IO end
const devnull = DevNull()
write(::DevNull, ::UInt8) = 1
unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n
closewrite(::DevNull) = nothing
close(::DevNull) = nothing
wait_close(::DevNull) = wait()
bytesavailable(io::DevNull) = 0
Expand Down
10 changes: 10 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,16 @@ public
@locals,
@propagate_inbounds,

# IO
# types
BufferStream,
IOServer,
OS_HANDLE,
PipeEndpoint,
TTY,
# functions
reseteof,

# misc
notnothing,
runtests,
Expand Down
2 changes: 2 additions & 0 deletions base/filesystem.jl
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ function close(f::File)
nothing
end

closewrite(f::File) = nothing

# sendfile is the most efficient way to copy from a file descriptor
function sendfile(dst::File, src::File, src_offset::Int64, bytes::Int)
check_open(dst)
Expand Down
12 changes: 12 additions & 0 deletions base/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ end

lock(::IO) = nothing
unlock(::IO) = nothing

"""
reseteof(io)

Clear the EOF flag from IO so that further reads (and possibly writes) are
again allowed. Note that it may immediately get re-set, if the underlying
stream object is at EOF and cannot be resumed.
"""
reseteof(x::IO) = nothing

const SZ_UNBUFFERED_IO = 65536
Expand Down Expand Up @@ -68,6 +76,10 @@ Shutdown the write half of a full-duplex I/O stream. Performs a [`flush`](@ref)
first. Notify the other end that no more data will be written to the underlying
file. This is not supported by all IO types.

If implemented, `closewrite` causes subsequent `read` or `eof` calls that would
block to instead throw EOF or return true, respectively. If the stream is
already closed, this is idempotent.

# Examples
```jldoctest
julia> io = Base.BufferStream(); # this never blocks, so we can read and write on the same Task
Expand Down
1 change: 0 additions & 1 deletion base/iobuffer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,6 @@ eof(io::GenericIOBuffer) = (io.ptr-1 == io.size)

function closewrite(io::GenericIOBuffer)
io.writable = false
# OR throw(_UVError("closewrite", UV_ENOTSOCK))
nothing
end

Expand Down
2 changes: 2 additions & 0 deletions base/iostream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ function close(s::IOStream)
systemerror("close", bad)
end

closewrite(s::IOStream) = nothing

function flush(s::IOStream)
sigatomic_begin()
bad = @_lock_ios s ccall(:ios_flush, Cint, (Ptr{Cvoid},), s.ios) != 0
Expand Down
131 changes: 84 additions & 47 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ mutable struct Process <: AbstractPipe
in::IO
out::IO
err::IO
syncd::Vector{Task}
exitcode::Int64
termsignal::Int32
exitnotify::ThreadSynchronizer
function Process(cmd::Cmd, handle::Ptr{Cvoid})
this = new(cmd, handle, devnull, devnull, devnull,
function Process(cmd::Cmd, handle::Ptr{Cvoid}, syncd::Vector{Task})
this = new(cmd, handle, devnull, devnull, devnull, syncd,
typemin(fieldtype(Process, :exitcode)),
typemin(fieldtype(Process, :termsignal)),
ThreadSynchronizer())
Expand All @@ -35,6 +36,15 @@ end
pipe_reader(p::ProcessChain) = p.out
pipe_writer(p::ProcessChain) = p.in

# a lightweight pair of a child OS_HANDLE and associated Task that will
# complete only after all content has been read from it for synchronizing
# state without the kernel to aide
struct SyncCloseFD
fd
t::Task
end
rawhandle(io::SyncCloseFD) = rawhandle(io.fd)

# release ownership of the libuv handle
function uvfinalize(proc::Process)
if proc.handle != C_NULL
Expand Down Expand Up @@ -74,8 +84,8 @@ function _uv_hook_close(proc::Process)
nothing
end

const SpawnIO = Union{IO, RawFD, OS_HANDLE}
const SpawnIOs = Vector{SpawnIO} # convenience name for readability
const SpawnIO = Union{IO, RawFD, OS_HANDLE, SyncCloseFD} # internal copy of Redirectable, removing FileRedirect and adding SyncCloseFD
const SpawnIOs = Memory{SpawnIO} # convenience name for readability (used for dispatch also to clearly distinguish from Vector{Redirectable})

function as_cpumask(cpus::Vector{UInt16})
n = max(Int(maximum(cpus)), Int(ccall(:uv_cpumask_size, Cint, ())))
Expand All @@ -100,6 +110,7 @@ end
error("invalid spawn handle $h from $io")
end
for io in stdio]
syncd = Task[io.t for io in stdio if io isa SyncCloseFD]
handle = Libc.malloc(_sizeof_uv_process)
disassociate_julia_struct(handle)
(; exec, flags, env, dir) = cmd
Expand All @@ -117,7 +128,7 @@ end
cpumask === nothing ? 0 : length(cpumask),
@cfunction(uv_return_spawn, Cvoid, (Ptr{Cvoid}, Int64, Int32)))
if err == 0
pp = Process(cmd, handle)
pp = Process(cmd, handle, syncd)
associate_julia_struct(handle, pp)
else
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), handle) # will call free on handle eventually
Expand All @@ -130,23 +141,24 @@ end
return pp
end

_spawn(cmds::AbstractCmd) = _spawn(cmds, SpawnIO[])
_spawn(cmds::AbstractCmd) = _spawn(cmds, SpawnIOs())

# optimization: we can spawn `Cmd` directly without allocating the ProcessChain
function _spawn(cmd::Cmd, stdios::SpawnIOs)
isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command"))
function _spawn(cmd::AbstractCmd, stdios::Vector{Redirectable})
pp = setup_stdios(stdios) do stdios
return _spawn_primitive(cmd.exec[1], cmd, stdios)
return _spawn(cmd, stdios)
end
return pp
end

# optimization: we can spawn `Cmd` directly without allocating the ProcessChain
function _spawn(cmd::Cmd, stdios::SpawnIOs)
isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command"))
return _spawn_primitive(cmd.exec[1], cmd, stdios)
end

# assume that having a ProcessChain means that the stdio are setup
function _spawn(cmds::AbstractCmd, stdios::SpawnIOs)
pp = setup_stdios(stdios) do stdios
return _spawn(cmds, stdios, ProcessChain())
end
return pp
return _spawn(cmds, stdios, ProcessChain())
end

# helper function for making a copy of a SpawnIOs, with replacement
Expand Down Expand Up @@ -212,7 +224,7 @@ end


# open the child end of each element of `stdios`, and initialize the parent end
function setup_stdios(f, stdios::SpawnIOs)
function setup_stdios(f, stdios::Vector{Redirectable})
nstdio = length(stdios)
open_io = SpawnIOs(undef, nstdio)
close_io = falses(nstdio)
Expand Down Expand Up @@ -295,25 +307,26 @@ function setup_stdio(stdio::IO, child_readable::Bool)
child = child_readable ? rd : wr
try
let in = (child_readable ? parent : stdio),
out = (child_readable ? stdio : parent)
@async try
out = (child_readable ? stdio : parent),
t = @async try
write(in, out)
catch ex
@warn "Process I/O error" exception=(ex, catch_backtrace())
rethrow()
finally
close(parent)
child_readable || closewrite(stdio)
end
return (SyncCloseFD(child, t), true)
end
catch
close_pipe_sync(child)
rethrow()
end
return (child, true)
end

close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio)
close_stdio(stdio) = close(stdio)
close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio)
close_stdio(stdio::SyncCloseFD) = close_stdio(stdio.fd)

# INTERNAL
# pad out stdio to have at least three elements,
Expand All @@ -325,19 +338,19 @@ close_stdio(stdio) = close(stdio)
# - An Filesystem.File or IOStream object to redirect the output to
# - A FileRedirect, containing a string specifying a filename to be opened for the child

spawn_opts_swallow(stdios::StdIOSet) = SpawnIO[stdios...]
spawn_opts_inherit(stdios::StdIOSet) = SpawnIO[stdios...]
spawn_opts_swallow(stdios::StdIOSet) = Redirectable[stdios...]
spawn_opts_inherit(stdios::StdIOSet) = Redirectable[stdios...]
spawn_opts_swallow(in::Redirectable=devnull, out::Redirectable=devnull, err::Redirectable=devnull) =
SpawnIO[in, out, err]
Redirectable[in, out, err]
# pass original descriptors to child processes by default, because we might
# have already exhausted and closed the libuv object for our standard streams.
# ref issue #8529
spawn_opts_inherit(in::Redirectable=RawFD(0), out::Redirectable=RawFD(1), err::Redirectable=RawFD(2)) =
SpawnIO[in, out, err]
Redirectable[in, out, err]

function eachline(cmd::AbstractCmd; keep::Bool=false)
out = PipeEndpoint()
processes = _spawn(cmd, SpawnIO[devnull, out, stderr])
processes = _spawn(cmd, Redirectable[devnull, out, stderr])
# if the user consumes all the data, also check process exit status for success
ondone = () -> (success(processes) || pipeline_error(processes); nothing)
return EachLine(out, keep=keep, ondone=ondone)::EachLine
Expand Down Expand Up @@ -385,20 +398,20 @@ function open(cmds::AbstractCmd, stdio::Redirectable=devnull; write::Bool=false,
stdio === devnull || throw(ArgumentError("no stream can be specified for `stdio` in read-write mode"))
in = PipeEndpoint()
out = PipeEndpoint()
processes = _spawn(cmds, SpawnIO[in, out, stderr])
processes = _spawn(cmds, Redirectable[in, out, stderr])
processes.in = in
processes.out = out
elseif read
out = PipeEndpoint()
processes = _spawn(cmds, SpawnIO[stdio, out, stderr])
processes = _spawn(cmds, Redirectable[stdio, out, stderr])
processes.out = out
elseif write
in = PipeEndpoint()
processes = _spawn(cmds, SpawnIO[in, stdio, stderr])
processes = _spawn(cmds, Redirectable[in, stdio, stderr])
processes.in = in
else
stdio === devnull || throw(ArgumentError("no stream can be specified for `stdio` in no-access mode"))
processes = _spawn(cmds, SpawnIO[devnull, devnull, stderr])
processes = _spawn(cmds, Redirectable[devnull, devnull, stderr])
end
return processes
end
Expand All @@ -415,12 +428,18 @@ function open(f::Function, cmds::AbstractCmd, args...; kwargs...)
P = open(cmds, args...; kwargs...)
function waitkill(P::Union{Process,ProcessChain})
close(P)
# 0.1 seconds after we hope it dies (from closing stdio),
# we kill the process with SIGTERM (15)
local t = Timer(0.1) do t
# shortly after we hope it starts cleanup and dies (from closing
# stdio), we kill the process with SIGTERM (15) so that we can proceed
# with throwing the error and hope it will exit soon from that
local t = Timer(2) do t
process_running(P) && kill(P)
end
wait(P)
# pass false to indicate that we do not care about data-races on the
# Julia stdio objects after this point, since we already know this is
# an error path and the state of them is fairly unpredictable anyways
# in that case. Since we closed P some of those should come crumbling
# down already, and we don't want to throw that error here either.
wait(P, false)
close(t)
end
ret = try
Expand All @@ -430,10 +449,23 @@ function open(f::Function, cmds::AbstractCmd, args...; kwargs...)
rethrow()
end
close(P.in)
closestdio = @async begin
# wait for P to complete (including sync'd), then mark the output streams for EOF (if applicable to that stream type)
wait(P)
err = P.err
applicable(closewrite, err) && closewrite(err)
out = P.out
applicable(closewrite, out) && closewrite(out)
nothing
end
# now verify that the output stream is at EOF, and the user didn't fail to consume it successfully
# (we do not currently verify the user dealt with the stderr stream)
if !(eof(P.out)::Bool)
waitkill(P)
throw(_UVError("open(do)", UV_EPIPE))
end
# make sure to closestdio is completely done to avoid data-races later
wait(closestdio)
success(P) || pipeline_error(P)
return ret
end
Expand Down Expand Up @@ -650,26 +682,31 @@ function process_status(s::Process)
error("process status error")
end

function wait(x::Process)
process_exited(x) && return
iolock_begin()
function wait(x::Process, syncd::Bool=true)
if !process_exited(x)
preserve_handle(x)
lock(x.exitnotify)
iolock_end()
try
wait(x.exitnotify)
finally
unlock(x.exitnotify)
unpreserve_handle(x)
iolock_begin()
if !process_exited(x)
preserve_handle(x)
lock(x.exitnotify)
iolock_end()
try
wait(x.exitnotify)
finally
unlock(x.exitnotify)
unpreserve_handle(x)
end
else
iolock_end()
end
else
iolock_end()
end
# and make sure all sync'd Tasks are complete too
syncd && for t in x.syncd
wait(t)
end
nothing
end

wait(x::ProcessChain) = foreach(wait, x.processes)
wait(x::ProcessChain, syncd::Bool=true) = foreach(p -> wait(p, syncd), x.processes)

show(io::IO, p::Process) = print(io, "Process(", p.cmd, ", ", process_status(p), ")")

Expand Down
Loading