Skip to content

Commit

Permalink
scheduler: improve libuv wakeup event ordering
Browse files Browse the repository at this point in the history
Ensures better that we support nesting of threaded regions and access to
this variable from any thread. Previously, it appears that we might
toggle this variable on another thread (via thread migration), but would
then not successfully wake up the thread 0 and cause it to resume
sleeping on the event loop.
  • Loading branch information
vtjnash committed Jan 20, 2022
1 parent 38620a1 commit c3f7396
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 52 deletions.
6 changes: 3 additions & 3 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ function _threadsfor(iter, lbody, schedule)
end
end
end
if threadid() != 1 || ccall(:jl_in_threaded_region, Cint, ()) != 0
if ccall(:jl_in_threaded_region, Cint, ()) != 0
$(if schedule === :static
:(error("`@threads :static` can only be used from thread 1 and not nested"))
:(error("`@threads :static` cannot be used concurrently or nested"))
else
# only use threads when called from thread 1, outside @threads
# only use threads when called from outside @threads
:(Base.invokelatest(threadsfor_fun, true))
end)
else
Expand Down
6 changes: 3 additions & 3 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ void JL_UV_LOCK(void)
if (jl_mutex_trylock(&jl_uv_mutex)) {
}
else {
jl_atomic_fetch_add(&jl_uv_n_waiters, 1);
jl_atomic_fetch_add_relaxed(&jl_uv_n_waiters, 1);
jl_wake_libuv();
JL_LOCK(&jl_uv_mutex);
jl_atomic_fetch_add(&jl_uv_n_waiters, -1);
jl_atomic_fetch_add_relaxed(&jl_uv_n_waiters, -1);
}
}

Expand Down Expand Up @@ -204,7 +204,7 @@ JL_DLLEXPORT int jl_process_events(void)
uv_loop_t *loop = jl_io_loop;
jl_gc_safepoint_(ct->ptls);
if (loop && (jl_atomic_load_relaxed(&_threadedregion) || jl_atomic_load_relaxed(&ct->tid) == 0)) {
if (jl_atomic_load(&jl_uv_n_waiters) == 0 && jl_mutex_trylock(&jl_uv_mutex)) {
if (jl_atomic_load_relaxed(&jl_uv_n_waiters) == 0 && jl_mutex_trylock(&jl_uv_mutex)) {
loop->stop_flag = 0;
int r = uv_run(loop, UV_RUN_NOWAIT);
JL_UV_UNLOCK();
Expand Down
73 changes: 35 additions & 38 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -492,11 +492,18 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)
return task;
}

// one thread should win this race and watch the event loop
// inside a threaded region, any thread can listen for IO messages,
// although none are allowed to create new ones
// outside of threaded regions, all IO is permitted,
// but only on thread 1
// one thread should win this race and watch the event loop inside
// a threaded region, any thread can listen for IO messages,
// although none are allowed to create new ones outside of threaded
// regions, all IO is permitted, but only on thread 1
// the reason this works is somewhat convoluted:
// - only thread 0 is permitted to change _threadedregion,
// so it will definitely acquire the unconditional lock after
// switching that flag
// - inside a _threadedregion, there must exist at least one
// thread that has a happens-before relationship on the libuv lock
// before reaching this decision point in the code who will see
// the lock as unlocked and thus must win this race here
int uvlock = 0;
if (jl_atomic_load_relaxed(&_threadedregion)) {
uvlock = jl_mutex_trylock(&jl_uv_mutex);
Expand All @@ -507,50 +514,40 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)
}
if (uvlock) {
int active = 1;
if (jl_atomic_load(&jl_uv_n_waiters) != 0) {
// but if we won the race against someone who actually needs
// the lock to do real work, we need to let them have it instead
JL_UV_UNLOCK();
}
else {
// otherwise, we may block until someone asks us for the lock
uv_loop_t *loop = jl_global_event_loop();
// otherwise, we block until someone asks us for the lock
uv_loop_t *loop = jl_global_event_loop();
while (active && may_sleep(ptls)) {
if (jl_atomic_load_relaxed(&jl_uv_n_waiters) != 0)
// but if we won the race against someone who actually needs
// the lock to do real work, we need to let them have it instead
break;
loop->stop_flag = 0;
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() );
active = uv_run(loop, UV_RUN_ONCE);
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() );
jl_gc_safepoint();
if (may_sleep(ptls)) {
loop->stop_flag = 0;
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() );
active = uv_run(loop, UV_RUN_ONCE);
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() );
}
JL_UV_UNLOCK();
// optimization: check again first if we may have work to do
if (!may_sleep(ptls)) {
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
start_cycles = 0;
continue;
}
// otherwise, we got a spurious wakeup since some other
// thread that just wanted to steal libuv from us,
// just go right back to sleep on the other wake signal
// to let them take it from us without conflict
// TODO: this relinquishes responsibility for all event
// to the last thread to do an explicit operation,
// which may starve other threads of critical work
if (jl_atomic_load(&jl_uv_n_waiters) == 0) {
continue;
}
}
JL_UV_UNLOCK();
// optimization: check again first if we may have work to do.
// Otherwise we got a spurious wakeup since some other thread
// that just wanted to steal libuv from us. We will just go
// right back to sleep on the individual wake signal to let
// them take it from us without conflict.
if (!may_sleep(ptls)) {
start_cycles = 0;
continue;
}
if (!jl_atomic_load_relaxed(&_threadedregion) && active && ptls->tid == 0) {
// thread 0 is the only thread permitted to run the event loop
// so it needs to stay alive
// so it needs to stay alive, just spin-looping if necessary
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
start_cycles = 0;
continue;
}
}

// the other threads will just wait for on signal to resume
// the other threads will just wait for an individual wake signal to resume
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = cycleclock() );
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_mutex_lock(&sleep_locks[ptls->tid]);
Expand Down
18 changes: 10 additions & 8 deletions src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,7 @@ _Atomic(unsigned) _threadedregion; // HACK: keep track of whether to prioritize

JL_DLLEXPORT int jl_in_threaded_region(void)
{
return jl_atomic_load_relaxed(&jl_current_task->tid) != 0 ||
jl_atomic_load_relaxed(&_threadedregion) != 0;
return jl_atomic_load_relaxed(&_threadedregion) != 0;
}

JL_DLLEXPORT void jl_enter_threaded_region(void)
Expand All @@ -542,12 +541,15 @@ JL_DLLEXPORT void jl_enter_threaded_region(void)

JL_DLLEXPORT void jl_exit_threaded_region(void)
{
jl_atomic_fetch_add(&_threadedregion, -1);
jl_wake_libuv();
// make sure no more callbacks will run while user code continues
// outside thread region and might touch an I/O object.
JL_UV_LOCK();
JL_UV_UNLOCK();
if (jl_atomic_fetch_add(&_threadedregion, -1) == 1) {
// make sure no more callbacks will run while user code continues
// outside thread region and might touch an I/O object.
JL_UV_LOCK();
JL_UV_UNLOCK();
// make sure thread 0 is not using the sleep_lock
// so that it may enter the libuv event loop instead
jl_wakeup_thread(0);
}
}


Expand Down

0 comments on commit c3f7396

Please sign in to comment.