Skip to content

Revert "IO: tie lifetime of handle field to container" #43924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions base/asyncevent.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ Use [`isopen`](@ref) to check whether it is still active.
This provides an implicit acquire & release memory ordering between the sending and waiting threads.
"""
mutable struct AsyncCondition
@atomic handle::Ptr{Cvoid}
handle::Ptr{Cvoid}
cond::ThreadSynchronizer
@atomic isopen::Bool
isopen::Bool
@atomic set::Bool

function AsyncCondition()
Expand Down Expand Up @@ -71,9 +71,9 @@ Note: `interval` is subject to accumulating time skew. If you need precise event
absolute time, create a new timer at each expiration with the difference to the next time computed.
"""
mutable struct Timer
@atomic handle::Ptr{Cvoid}
handle::Ptr{Cvoid}
cond::ThreadSynchronizer
@atomic isopen::Bool
isopen::Bool
@atomic set::Bool

function Timer(timeout::Real; interval::Real = 0.0)
Expand Down Expand Up @@ -143,13 +143,12 @@ function wait(t::Union{Timer, AsyncCondition})
end


isopen(t::Union{Timer, AsyncCondition}) = t.isopen && t.handle != C_NULL
isopen(t::Union{Timer, AsyncCondition}) = t.isopen

function close(t::Union{Timer, AsyncCondition})
iolock_begin()
if isopen(t)
@atomic :monotonic t.isopen = false
preserve_handle(t)
if t.handle != C_NULL && isopen(t)
t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
iolock_end()
Expand All @@ -160,11 +159,13 @@ function uvfinalize(t::Union{Timer, AsyncCondition})
iolock_begin()
lock(t.cond)
try
if isopen(t)
if t.handle != C_NULL
disassociate_julia_struct(t.handle) # not going to call the usual close hooks
@atomic :monotonic t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle)
@atomic :monotonic t.handle = C_NULL
if t.isopen
t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
t.handle = C_NULL
notify(t.cond, false)
end
finally
Expand All @@ -177,9 +178,8 @@ end
function _uv_hook_close(t::Union{Timer, AsyncCondition})
lock(t.cond)
try
@atomic :monotonic t.isopen = false
unpreserve_handle(t)
@atomic :monotonic t.handle = C_NULL
t.isopen = false
t.handle = C_NULL
notify(t.cond, t.set)
finally
unlock(t.cond)
Expand Down
7 changes: 2 additions & 5 deletions base/libuv.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,8 @@ function preserve_handle(x)
end
function unpreserve_handle(x)
lock(preserve_handle_lock)
v = get(uvhandles, x, 0)::Int
if v == 0
unlock(preserve_handle_lock)
error("unbalanced call to unpreserve_handle for $(typeof(x))")
elseif v == 1
v = uvhandles[x]::Int
if v == 1
pop!(uvhandles, x)
else
uvhandles[x] = v - 1
Expand Down
7 changes: 3 additions & 4 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ function uv_return_spawn(p::Ptr{Cvoid}, exit_status::Int64, termsignal::Int32)
proc = unsafe_pointer_to_objref(data)::Process
proc.exitcode = exit_status
proc.termsignal = termsignal
disassociate_julia_struct(proc.handle)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), proc.handle)
proc.handle = C_NULL
lock(proc.exitnotify)
Expand All @@ -66,7 +65,7 @@ end

# called when the libuv handle is destroyed
function _uv_hook_close(proc::Process)
Libc.free(@atomicswap :not_atomic proc.handle = C_NULL)
proc.handle = C_NULL
nothing
end

Expand Down Expand Up @@ -588,10 +587,10 @@ Get the child process ID, if it still exists.
This function requires at least Julia 1.1.
"""
function Libc.getpid(p::Process)
# TODO: due to threading, this method is only weakly synchronized with the user application
# TODO: due to threading, this method is no longer synchronized with the user application
iolock_begin()
ppid = Int32(0)
if p.handle != C_NULL # e.g. process_running
if p.handle != C_NULL
ppid = ccall(:jl_uv_process_pid, Int32, (Ptr{Cvoid},), p.handle)
end
iolock_end()
Expand Down
27 changes: 10 additions & 17 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ if OS_HANDLE != RawFD
end

function isopen(x::Union{LibuvStream, LibuvServer})
if x.status == StatusUninit || x.status == StatusInit || x.handle === C_NULL
if x.status == StatusUninit || x.status == StatusInit
throw(ArgumentError("$x is not initialized"))
end
return x.status != StatusClosed
Expand Down Expand Up @@ -496,39 +496,34 @@ end

function close(stream::Union{LibuvStream, LibuvServer})
iolock_begin()
should_wait = false
if stream.status == StatusInit
preserve_handle(stream)
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
elseif isopen(stream)
should_wait = uv_handle_data(stream) != C_NULL
if stream.status != StatusClosing
preserve_handle(stream)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
end
iolock_end()
wait_close(stream)
should_wait && wait_close(stream)
nothing
end

function uvfinalize(uv::Union{LibuvStream, LibuvServer})
uv.handle == C_NULL && return
iolock_begin()
if uv.handle != C_NULL
disassociate_julia_struct(uv.handle) # not going to call the usual close hooks
if uv.status == StatusUninit
Libc.free(uv.handle)
elseif uv.status == StatusInit
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
elseif isopen(uv)
if uv.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
end
elseif uv.status == StatusClosed
if uv.status != StatusUninit
close(uv)
else
Libc.free(uv.handle)
end
uv.handle = C_NULL
uv.status = StatusClosed
uv.handle = C_NULL
end
iolock_end()
nothing
Expand Down Expand Up @@ -672,15 +667,13 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
notify(stream.cond)
else
# underlying stream is no longer useful: begin finalization
preserve_handle(stream)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
end
else
stream.readerror = _UVError("read", nread)
# This is a fatal connection error
preserve_handle(stream)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
Expand Down Expand Up @@ -718,9 +711,9 @@ function reseteof(x::TTY)
end

function _uv_hook_close(uv::Union{LibuvStream, LibuvServer})
unpreserve_handle(uv)
lock(uv.cond)
try
uv.handle = C_NULL
uv.status = StatusClosed
# notify any listeners that exist on this libuv stream type
notify(uv.cond)
Expand Down
3 changes: 2 additions & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ static void jl_close_item_atexit(uv_handle_t *handle)
switch(handle->type) {
case UV_PROCESS:
// cause Julia to forget about the Process object
handle->data = NULL;
if (handle->data)
jl_uv_call_close_callback((jl_value_t*)handle->data);
// and make libuv think it is already dead
((uv_process_t*)handle)->pid = 0;
// fall-through
Expand Down
50 changes: 27 additions & 23 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ JL_DLLEXPORT void jl_iolock_end(void)
}


static void jl_uv_call_close_callback(jl_value_t *val)
void jl_uv_call_close_callback(jl_value_t *val)
{
jl_value_t *args[2];
args[0] = jl_get_global(jl_base_relative_to(((jl_datatype_t*)jl_typeof(val))->name->module),
Expand Down Expand Up @@ -105,7 +105,6 @@ static void jl_uv_closeHandle(uv_handle_t *handle)
ct->world_age = jl_atomic_load_acquire(&jl_world_counter);
jl_uv_call_close_callback((jl_value_t*)handle->data);
ct->world_age = last_age;
return;
}
if (handle == (uv_handle_t*)&signal_async)
return;
Expand All @@ -126,10 +125,6 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status)
free(req);
return;
}
if (uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
free(req);
return;
}
if (status == 0 && uv_is_writable(stream) && stream->write_queue_size != 0) {
// new data was written, wait for it to flush too
uv_buf_t buf;
Expand All @@ -139,9 +134,12 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status)
if (uv_write(req, stream, &buf, 1, (uv_write_cb)jl_uv_flush_close_callback) == 0)
return;
}
if (stream->type == UV_TTY)
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
if (!uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
if (stream->type == UV_TTY)
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
}
free(req);
}

static void uv_flush_callback(uv_write_t *req, int status)
Expand Down Expand Up @@ -224,41 +222,47 @@ static void jl_proc_exit_cleanup_cb(uv_process_t *process, int64_t exit_status,

JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle)
{
JL_UV_LOCK();
if (handle->type == UV_PROCESS && ((uv_process_t*)handle)->pid != 0) {
// take ownership of this handle,
// so we can waitpid for the resource to exit and avoid leaving zombies
assert(handle->data == NULL); // make sure Julia has forgotten about it already
((uv_process_t*)handle)->exit_cb = jl_proc_exit_cleanup_cb;
return;
}
else if (handle->type == UV_FILE) {
JL_UV_LOCK();
if (handle->type == UV_FILE) {
uv_fs_t req;
jl_uv_file_t *fd = (jl_uv_file_t*)handle;
if ((ssize_t)fd->file != -1) {
uv_fs_close(handle->loop, &req, fd->file, NULL);
fd->file = (uv_os_fd_t)(ssize_t)-1;
}
jl_uv_closeHandle(handle); // synchronous (ok since the callback is known to not interact with any global state)
JL_UV_UNLOCK();
return;
}
else if (!uv_is_closing(handle)) { // avoid double-closing the stream
if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
// flush the stream write-queue first
uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t));
req->handle = (uv_stream_t*)handle;
jl_uv_flush_close_callback(req, 0);
}
else {
uv_close(handle, &jl_uv_closeHandle);
}

if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t));
req->handle = (uv_stream_t*)handle;
jl_uv_flush_close_callback(req, 0);
JL_UV_UNLOCK();
return;
}

// avoid double-closing the stream
if (!uv_is_closing(handle)) {
uv_close(handle, &jl_uv_closeHandle);
}
JL_UV_UNLOCK();
}

JL_DLLEXPORT void jl_forceclose_uv(uv_handle_t *handle)
{
if (!uv_is_closing(handle)) { // avoid double-closing the stream
// avoid double-closing the stream
if (!uv_is_closing(handle)) {
JL_UV_LOCK();
if (!uv_is_closing(handle)) { // double-check
if (!uv_is_closing(handle)) {
uv_close(handle, &jl_uv_closeHandle);
}
JL_UV_UNLOCK();
Expand Down
1 change: 1 addition & 0 deletions src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ JL_DLLEXPORT jl_fptr_args_t jl_get_builtin_fptr(jl_value_t *b);

extern uv_loop_t *jl_io_loop;
void jl_uv_flush(uv_stream_t *stream);
void jl_uv_call_close_callback(jl_value_t *val);

typedef struct jl_typeenv_t {
jl_tvar_t *var;
Expand Down
Loading