Skip to content

Commit b029fbf

Browse files
authored
Wait for all tasks before writing precompile files (#46571)
When the program goes to write out a precompile file, we would like for the process to first reach a point where it is not still running background tasks and work. This ensures that the precompile file is in a consistent state, and isn't forgetting or delaying intended work. In the future, we may want to add an `atexit` hook (after the other `atexit` hooks) which optionally calls this function for regular code too, probably under programmatic control and/or command line argument control for the user to decide. And we would need to decide how to close stdin first, so it doesn't continue to keep the program alive. Add uv_ref and uv_unref internal hooks for this. You probably really don't want to call these (as they may stop you from getting events on these objects also), but very specific internal things will need them for this to work. Also (mostly unrelated) rewrite a Profile test to conform to best-practices. Previously, the loop was expecting to observe the Profile test printing even though nothing kept it alive (there was no reads on stdin). We fix the design of that test, but also include a patch inside `jl_process_events` to ensure the loop is alive and will handle events, to avoid breaking anyone else who was relying on this pattern. To assist package authors fix errors, we automatically print a note if this new functionality is causing delays. They then need to ensure they are calling close explicitly (not relying solely on finalizers), when appropriate, and are cleaning up other resources (or calling the new `Base.uv_unref`) also. Fix #45170
1 parent 87b8896 commit b029fbf

File tree

10 files changed

+146
-17
lines changed

10 files changed

+146
-17
lines changed

base/libuv.jl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ uv_error(prefix::AbstractString, c::Integer) = c < 0 ? throw(_UVError(prefix, c)
103103

104104
eventloop() = ccall(:jl_global_event_loop, Ptr{Cvoid}, ())
105105

106+
uv_unref(h::Ptr{Cvoid}) = ccall(:uv_unref, Cvoid, (Ptr{Cvoid},), h)
107+
uv_ref(h::Ptr{Cvoid}) = ccall(:uv_ref, Cvoid, (Ptr{Cvoid},), h)
108+
106109
function process_events()
107110
return ccall(:jl_process_events, Int32, ())
108111
end

contrib/generate_precompile.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ generate_precompile_statements() = try # Make sure `ansi_enablecursor` is printe
316316
current = current == 4 ? 1 : current + 1
317317
end
318318
end
319+
close(t)
319320
end
320321

321322
# Collect statements from running the script

src/gc.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2958,6 +2958,7 @@ static void jl_gc_queue_thread_local(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp
29582958
}
29592959

29602960
extern jl_value_t *cmpswap_names JL_GLOBALLY_ROOTED;
2961+
extern jl_task_t *wait_empty JL_GLOBALLY_ROOTED;
29612962

29622963
// mark the initial root set
29632964
static void mark_roots(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp)
@@ -2996,6 +2997,8 @@ static void mark_roots(jl_gc_mark_cache_t *gc_cache, jl_gc_mark_sp_t *sp)
29962997
gc_mark_queue_obj(gc_cache, sp, jl_emptytuple_type);
29972998
if (cmpswap_names != NULL)
29982999
gc_mark_queue_obj(gc_cache, sp, cmpswap_names);
3000+
if (wait_empty != NULL)
3001+
gc_mark_queue_obj(gc_cache, sp, wait_empty);
29993002
gc_mark_queue_obj(gc_cache, sp, jl_global_roots_table);
30003003
}
30013004

src/jl_uv.c

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,68 @@ extern "C" {
3030
#endif
3131

3232
static uv_async_t signal_async;
33+
static uv_timer_t wait_empty_worker;
34+
35+
static void walk_print_cb(uv_handle_t *h, void *arg)
36+
{
37+
if (!uv_is_active(h) || !uv_has_ref(h))
38+
return;
39+
const char *type = uv_handle_type_name(h->type);
40+
if (!type)
41+
type = "<unknown>";
42+
uv_os_fd_t fd;
43+
if (h->type == UV_PROCESS)
44+
fd = uv_process_get_pid((uv_process_t*)h);
45+
else if (uv_fileno(h, &fd))
46+
fd = (uv_os_fd_t)-1;
47+
const char *pad = " "; // 16 spaces
48+
int npad = fd == -1 ? 0 : snprintf(NULL, 0, "%zd", (size_t)fd);
49+
if (npad < 0)
50+
npad = 0;
51+
npad += strlen(type);
52+
pad += npad < strlen(pad) ? npad : strlen(pad);
53+
if (fd == -1)
54+
jl_safe_printf(" %s %s@%p->%p\n", type, pad, (void*)h, (void*)h->data);
55+
else
56+
jl_safe_printf(" %s[%zd] %s@%p->%p\n", type, (size_t)fd, pad, (void*)h, (void*)h->data);
57+
}
58+
59+
static void wait_empty_func(uv_timer_t *t)
60+
{
61+
// make sure this is hidden now, since we would auto-unref it later
62+
uv_unref((uv_handle_t*)&signal_async);
63+
if (!uv_loop_alive(t->loop))
64+
return;
65+
jl_safe_printf("\n[pid %zd] waiting for IO to finish:\n"
66+
" TYPE[FD/PID] @UV_HANDLE_T->DATA\n",
67+
(size_t)uv_os_getpid());
68+
uv_walk(jl_io_loop, walk_print_cb, NULL);
69+
jl_gc_collect(JL_GC_FULL);
70+
}
71+
72+
void jl_wait_empty_begin(void)
73+
{
74+
JL_UV_LOCK();
75+
if (wait_empty_worker.type != UV_TIMER && jl_io_loop) {
76+
// try to purge anything that is just waiting for cleanup
77+
jl_io_loop->stop_flag = 0;
78+
uv_run(jl_io_loop, UV_RUN_NOWAIT);
79+
uv_timer_init(jl_io_loop, &wait_empty_worker);
80+
uv_update_time(jl_io_loop);
81+
uv_timer_start(&wait_empty_worker, wait_empty_func, 10, 15000);
82+
uv_unref((uv_handle_t*)&wait_empty_worker);
83+
}
84+
JL_UV_UNLOCK();
85+
}
86+
87+
void jl_wait_empty_end(void)
88+
{
89+
JL_UV_LOCK();
90+
uv_close((uv_handle_t*)&wait_empty_worker, NULL);
91+
JL_UV_UNLOCK();
92+
}
93+
94+
3395

3496
static void jl_signal_async_cb(uv_async_t *hdl)
3597
{
@@ -49,6 +111,7 @@ jl_mutex_t jl_uv_mutex;
49111
void jl_init_uv(void)
50112
{
51113
uv_async_init(jl_io_loop, &signal_async, jl_signal_async_cb);
114+
uv_unref((uv_handle_t*)&signal_async);
52115
JL_MUTEX_INIT(&jl_uv_mutex); // a file-scope initializer can be used instead
53116
}
54117

@@ -110,7 +173,7 @@ static void jl_uv_closeHandle(uv_handle_t *handle)
110173
ct->world_age = last_age;
111174
return;
112175
}
113-
if (handle == (uv_handle_t*)&signal_async)
176+
if (handle == (uv_handle_t*)&signal_async || handle == (uv_handle_t*)&wait_empty_worker)
114177
return;
115178
free(handle);
116179
}
@@ -213,7 +276,9 @@ JL_DLLEXPORT int jl_process_events(void)
213276
if (jl_atomic_load_relaxed(&jl_uv_n_waiters) == 0 && jl_mutex_trylock(&jl_uv_mutex)) {
214277
JL_PROBE_RT_START_PROCESS_EVENTS(ct);
215278
loop->stop_flag = 0;
279+
uv_ref((uv_handle_t*)&signal_async); // force the loop alive
216280
int r = uv_run(loop, UV_RUN_NOWAIT);
281+
uv_unref((uv_handle_t*)&signal_async);
217282
JL_PROBE_RT_FINISH_PROCESS_EVENTS(ct);
218283
JL_UV_UNLOCK();
219284
return r;

src/julia.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1760,6 +1760,7 @@ JL_DLLEXPORT void jl_init_with_image(const char *julia_bindir,
17601760
JL_DLLEXPORT const char *jl_get_default_sysimg_path(void);
17611761
JL_DLLEXPORT int jl_is_initialized(void);
17621762
JL_DLLEXPORT void jl_atexit_hook(int status);
1763+
JL_DLLEXPORT void jl_task_wait_empty(void);
17631764
JL_DLLEXPORT void jl_postoutput_hook(void);
17641765
JL_DLLEXPORT void JL_NORETURN jl_exit(int status);
17651766
JL_DLLEXPORT void JL_NORETURN jl_raise(int signo);

src/partr.c

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,27 @@ static int check_empty(jl_value_t *checkempty)
281281
return jl_apply_generic(checkempty, NULL, 0) == jl_true;
282282
}
283283

284+
jl_task_t *wait_empty JL_GLOBALLY_ROOTED;
285+
void jl_wait_empty_begin(void);
286+
void jl_wait_empty_end(void);
287+
288+
void jl_task_wait_empty(void)
289+
{
290+
jl_task_t *ct = jl_current_task;
291+
if (jl_atomic_load_relaxed(&ct->tid) == 0 && jl_base_module) {
292+
jl_wait_empty_begin();
293+
jl_value_t *f = jl_get_global(jl_base_module, jl_symbol("wait"));
294+
wait_empty = ct;
295+
size_t lastage = ct->world_age;
296+
ct->world_age = jl_atomic_load_acquire(&jl_world_counter);
297+
if (f)
298+
jl_apply_generic(f, NULL, 0);
299+
ct->world_age = lastage;
300+
wait_empty = NULL;
301+
jl_wait_empty_end();
302+
}
303+
}
304+
284305
static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT
285306
{
286307
// sleep_check_state is only transitioned from not_sleeping to sleeping
@@ -312,7 +333,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
312333

313334
jl_cpu_pause();
314335
jl_ptls_t ptls = ct->ptls;
315-
if (sleep_check_after_threshold(&start_cycles) || (!jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0)) {
336+
if (sleep_check_after_threshold(&start_cycles) || (ptls->tid == 0 && (!jl_atomic_load_relaxed(&_threadedregion) || wait_empty))) {
316337
// acquire sleep-check lock
317338
jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping);
318339
jl_fence(); // [^store_buffering_1]
@@ -409,6 +430,14 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
409430
int8_t gc_state = jl_gc_safe_enter(ptls);
410431
uv_mutex_lock(&ptls->sleep_lock);
411432
while (may_sleep(ptls)) {
433+
if (ptls->tid == 0 && wait_empty) {
434+
task = wait_empty;
435+
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
436+
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
437+
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
438+
}
439+
break;
440+
}
412441
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
413442
// TODO: help with gc work here, if applicable
414443
}
@@ -417,6 +446,11 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
417446
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_leave = cycleclock() );
418447
jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
419448
start_cycles = 0;
449+
if (task) {
450+
assert(task == wait_empty);
451+
wait_empty = NULL;
452+
return task;
453+
}
420454
}
421455
else {
422456
// maybe check the kernel for new messages too

src/precompile.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ JL_DLLEXPORT void jl_write_compiler_output(void)
7575
return;
7676
}
7777

78+
jl_task_wait_empty();
79+
7880
if (!jl_module_init_order) {
7981
jl_printf(JL_STDERR, "WARNING: --output requested, but no modules defined during run\n");
8082
return;

stdlib/Profile/src/Profile.jl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,9 @@ function __init__()
156156
# used, if not manually initialized before that.
157157
@static if !Sys.iswindows()
158158
# triggering a profile via signals is not implemented on windows
159-
PROFILE_PRINT_COND[] = Base.AsyncCondition()
159+
cond = Base.AsyncCondition()
160+
Base.uv_unref(cond.handle)
161+
PROFILE_PRINT_COND[] = cond
160162
ccall(:jl_set_peek_cond, Cvoid, (Ptr{Cvoid},), PROFILE_PRINT_COND[].handle)
161163
errormonitor(Threads.@spawn(profile_printing_listener()))
162164
end

stdlib/Profile/test/runtests.jl

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -198,41 +198,48 @@ if Sys.isbsd() || Sys.islinux()
198198
@testset "SIGINFO/SIGUSR1 profile triggering" begin
199199
let cmd = Base.julia_cmd()
200200
script = """
201-
x = rand(1000, 1000)
202-
println(stderr, "started")
203-
while true
204-
x * x
205-
yield()
206-
end
201+
print(stderr, "started\n")
202+
eof(stdin)
203+
close(t)
207204
"""
208205
iob = Base.BufferStream()
209-
p = run(pipeline(`$cmd -e $script`, stderr = iob, stdout = devnull), wait = false)
206+
notify_exit = Base.PipeEndpoint()
207+
p = run(pipeline(`$cmd -e $script`, stdin=notify_exit, stderr=iob, stdout=devnull), wait=false)
210208
t = Timer(120) do t
211209
# should be under 10 seconds, so give it 2 minutes then report failure
212210
println("KILLING BY PROFILE TEST WATCHDOG\n")
213211
kill(p, Base.SIGTERM)
214212
sleep(10)
215213
kill(p, Base.SIGKILL)
216-
close(iob)
214+
close(p)
217215
end
218216
try
219-
s = readuntil(iob, "started", keep = true)
217+
s = readuntil(iob, "started", keep=true)
220218
@assert occursin("started", s)
221219
@assert process_running(p)
222-
for _ in 1:2
223-
sleep(2.5)
220+
for i in 1:2
221+
i > 1 && sleep(5)
224222
if Sys.isbsd()
225223
kill(p, 29) # SIGINFO
226224
elseif Sys.islinux()
227225
kill(p, 10) # SIGUSR1
228226
end
229-
s = readuntil(iob, "Overhead ╎", keep = true)
227+
s = readuntil(iob, "Overhead ╎", keep=true)
230228
@test process_running(p)
229+
readavailable(iob)
231230
@test occursin("Overhead ╎", s)
232231
end
233-
finally
234-
kill(p, Base.SIGKILL)
232+
close(notify_exit) # notify test finished
233+
s = read(iob, String) # consume test output
234+
wait(p) # wait for test completion
235+
close(t)
236+
catch
237+
close(notify_exit)
238+
errs = read(iob, String) # consume test output
239+
isempty(errs) || println("CHILD STDERR after test failure: ", errs)
240+
wait(p) # wait for test completion
235241
close(t)
242+
rethrow()
236243
end
237244
end
238245
end

test/precompile.jl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,10 @@ precompile_test_harness(false) do dir
261261
262262
# check that @ccallable works from precompiled modules
263263
Base.@ccallable Cint f35014(x::Cint) = x+Cint(1)
264+
265+
# check that Tasks work from serialized state
266+
ch1 = Channel(x -> nothing)
267+
ch2 = Channel(x -> (push!(x, 2); nothing), Inf)
264268
end
265269
""")
266270
# Issue #12623
@@ -310,6 +314,13 @@ precompile_test_harness(false) do dir
310314
@test Foo.layout2 == Any[Ptr{Int8}(0), Ptr{Int16}(0), Ptr{Int32}(-1)]
311315
@test typeof.(Foo.layout2) == [Ptr{Int8}, Ptr{Int16}, Ptr{Int32}]
312316
@test Foo.layout3 == ["ab", "cd", "ef", "gh", "ij"]
317+
318+
@test !isopen(Foo.ch1)
319+
@test !isopen(Foo.ch2)
320+
@test !isready(Foo.ch1)
321+
@test isready(Foo.ch2)
322+
@test take!(Foo.ch2) === 2
323+
@test !isready(Foo.ch2)
313324
end
314325

315326
@eval begin function ccallable_test()

0 commit comments

Comments
 (0)