Skip to content

Commit cfbddaf

Browse files
vtjnashmkitti
authored andcommitted
add dedicated IO thread (capability only) (JuliaLang#53422)
It has been oft-requested that we have a dedicated IO thread. That actually turns out to already be the case of something that exists, except that we hard-code the identity of that thread as being thread 0. This PR replaces all of the places where we hard code that assumption with a variable so that they are more easily searched for in the code. It also adds an internal function (`jl_set_io_loop_tid`) that can be used to transfer ownership of the loop to any (valid) tid. In conjunction with the prior foreign-threads work and foreign-thread pool, this lets us spawn a dedicate IO-management thread with this bit of code: ```julia function make_io_thread() tid = UInt[0] threadwork = @cfunction function(arg::Ptr{Cvoid}) Base.errormonitor(current_task()) # this may not go particularly well if the IO loop is dead, but try anyways @CCall jl_set_io_loop_tid((Threads.threadid() - 1)::Int16)::Cvoid wait() # spin uv_run as long as needed nothing end Cvoid (Ptr{Cvoid},) err = @CCall uv_thread_create(tid::Ptr{UInt}, threadwork::Ptr{Cvoid}, C_NULL::Ptr{Cvoid})::Cint err == 0 || Base.uv_error("uv_thread_create", err) @CCall uv_thread_detach(tid::Ptr{UInt})::Cint err == 0 || Base.uv_error("uv_thread_detach", err) # n.b. this does not wait for the thread to start or to take ownership of the event loop nothing end ```
1 parent 71040d1 commit cfbddaf

File tree

5 files changed

+35
-36
lines changed

5 files changed

+35
-36
lines changed

base/stream.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,7 @@ function notify_filled(buffer::IOBuffer, nread::Int)
619619
buffer.size += nread
620620
else
621621
buffer.ptr += nread
622+
buffer.size = max(buffer.size, buffer.ptr - 1)
622623
end
623624
nothing
624625
end

src/jl_uv.c

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -373,8 +373,6 @@ JL_DLLEXPORT void *jl_uv_handle_data(uv_handle_t *handle) { return handle->data;
373373
*/
374374
JL_DLLEXPORT void *jl_uv_write_handle(uv_write_t *req) { return req->handle; }
375375

376-
extern _Atomic(unsigned) _threadedregion;
377-
378376
/**
379377
* @brief Process pending UV events.
380378
*
@@ -387,7 +385,7 @@ JL_DLLEXPORT int jl_process_events(void)
387385
jl_task_t *ct = jl_current_task;
388386
uv_loop_t *loop = jl_io_loop;
389387
jl_gc_safepoint_(ct->ptls);
390-
if (loop && (jl_atomic_load_relaxed(&_threadedregion) || jl_atomic_load_relaxed(&ct->tid) == 0)) {
388+
if (loop && (jl_atomic_load_relaxed(&_threadedregion) || jl_atomic_load_relaxed(&ct->tid) == jl_atomic_load_relaxed(&io_loop_tid))) {
391389
if (jl_atomic_load_relaxed(&jl_uv_n_waiters) == 0 && jl_mutex_trylock(&jl_uv_mutex)) {
392390
JL_PROBE_RT_START_PROCESS_EVENTS(ct);
393391
loop->stop_flag = 0;
@@ -637,7 +635,7 @@ JL_DLLEXPORT int jl_fs_write(uv_os_fd_t handle, const char *data, size_t len,
637635
{
638636
jl_task_t *ct = jl_get_current_task();
639637
// TODO: fix this cheating
640-
if (jl_get_safe_restore() || ct == NULL || jl_atomic_load_relaxed(&ct->tid) != 0)
638+
if (jl_get_safe_restore() || ct == NULL || jl_atomic_load_relaxed(&ct->tid) != jl_atomic_load_relaxed(&io_loop_tid))
641639
#ifdef _OS_WINDOWS_
642640
return WriteFile(handle, data, len, NULL, NULL);
643641
#else
@@ -718,7 +716,7 @@ JL_DLLEXPORT void jl_uv_puts(uv_stream_t *stream, const char *str, size_t n)
718716

719717
// TODO: Hack to make CoreIO thread-safer
720718
jl_task_t *ct = jl_get_current_task();
721-
if (ct == NULL || jl_atomic_load_relaxed(&ct->tid) != 0) {
719+
if (ct == NULL || jl_atomic_load_relaxed(&ct->tid) != jl_atomic_load_relaxed(&io_loop_tid)) {
722720
if (stream == JL_STDOUT) {
723721
fd = UV_STDOUT_FD;
724722
}

src/julia_internal.h

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -97,23 +97,6 @@ JL_DLLIMPORT void *__tsan_get_current_fiber(void);
9797
JL_DLLIMPORT void __tsan_destroy_fiber(void *fiber);
9898
JL_DLLIMPORT void __tsan_switch_to_fiber(void *fiber, unsigned flags);
9999
#endif
100-
#ifdef __cplusplus
101-
}
102-
#endif
103-
104-
// Remove when C11 is required for C code.
105-
#ifndef static_assert
106-
# ifndef __cplusplus
107-
// C11 should already have `static_assert` from `<assert.h>` so there's no need
108-
// to check C version.
109-
# ifdef __GNUC__
110-
# define static_assert _Static_assert
111-
# else
112-
# define static_assert(...)
113-
# endif
114-
# endif
115-
// For C++, C++11 or MSVC is required. Both provide `static_assert`.
116-
#endif
117100

118101
#ifndef alignof
119102
# ifndef __cplusplus
@@ -182,10 +165,8 @@ extern jl_mutex_t jl_uv_mutex;
182165
extern _Atomic(int) jl_uv_n_waiters;
183166
void JL_UV_LOCK(void);
184167
#define JL_UV_UNLOCK() JL_UNLOCK(&jl_uv_mutex)
185-
186-
#ifdef __cplusplus
187-
extern "C" {
188-
#endif
168+
extern _Atomic(unsigned) _threadedregion;
169+
extern _Atomic(uint16_t) io_loop_tid;
189170

190171
int jl_running_under_rr(int recheck) JL_NOTSAFEPOINT;
191172

src/scheduler.c

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -417,8 +417,6 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT
417417
}
418418

419419

420-
extern _Atomic(unsigned) _threadedregion;
421-
422420
JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty)
423421
{
424422
jl_task_t *ct = jl_current_task;
@@ -438,7 +436,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
438436

439437
jl_cpu_pause();
440438
jl_ptls_t ptls = ct->ptls;
441-
if (sleep_check_after_threshold(&start_cycles) || (ptls->tid == 0 && (!jl_atomic_load_relaxed(&_threadedregion) || wait_empty))) {
439+
if (sleep_check_after_threshold(&start_cycles) || (ptls->tid == jl_atomic_load_relaxed(&io_loop_tid) && (!jl_atomic_load_relaxed(&_threadedregion) || wait_empty))) {
442440
// acquire sleep-check lock
443441
jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping);
444442
jl_fence(); // [^store_buffering_1]
@@ -485,7 +483,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
485483
if (jl_atomic_load_relaxed(&_threadedregion)) {
486484
uvlock = jl_mutex_trylock(&jl_uv_mutex);
487485
}
488-
else if (ptls->tid == 0) {
486+
else if (ptls->tid == jl_atomic_load_relaxed(&io_loop_tid)) {
489487
uvlock = 1;
490488
JL_UV_LOCK();
491489
}
@@ -526,7 +524,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
526524
start_cycles = 0;
527525
continue;
528526
}
529-
if (!enter_eventloop && !jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0) {
527+
if (!enter_eventloop && !jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == jl_atomic_load_relaxed(&io_loop_tid)) {
530528
// thread 0 is the only thread permitted to run the event loop
531529
// so it needs to stay alive, just spin-looping if necessary
532530
if (set_not_sleeping(ptls)) {
@@ -543,10 +541,10 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
543541
assert(wasrunning);
544542
if (wasrunning == 1) {
545543
// This was the last running thread, and there is no thread with !may_sleep
546-
// so make sure tid 0 is notified to check wait_empty
544+
// so make sure io_loop_tid is notified to check wait_empty
547545
// TODO: this also might be a good time to check again that
548546
// libuv's queue is truly empty, instead of during delete_thread
549-
if (ptls->tid != 0) {
547+
if (ptls->tid != jl_atomic_load_relaxed(&io_loop_tid)) {
550548
uv_mutex_lock(&ptls->sleep_lock);
551549
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
552550
uv_mutex_unlock(&ptls->sleep_lock);
@@ -597,7 +595,7 @@ void scheduler_delete_thread(jl_ptls_t ptls) JL_NOTSAFEPOINT
597595
jl_fence();
598596
if (notsleeping) {
599597
if (jl_atomic_load_relaxed(&nrunning) == 1) {
600-
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[0];
598+
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[jl_atomic_load_relaxed(&io_loop_tid)];
601599
// This was the last running thread, and there is no thread with !may_sleep
602600
// so make sure tid 0 is notified to check wait_empty
603601
uv_mutex_lock(&ptls2->sleep_lock);

src/threading.c

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -808,7 +808,8 @@ void jl_start_threads(void)
808808
uv_barrier_wait(&thread_init_done);
809809
}
810810

811-
_Atomic(unsigned) _threadedregion; // HACK: keep track of whether to prioritize IO or threading
811+
_Atomic(unsigned) _threadedregion; // keep track of whether to prioritize IO or threading
812+
_Atomic(uint16_t) io_loop_tid; // mark which thread is assigned to run the uv_loop
812813

813814
JL_DLLEXPORT int jl_in_threaded_region(void)
814815
{
@@ -829,7 +830,27 @@ JL_DLLEXPORT void jl_exit_threaded_region(void)
829830
JL_UV_UNLOCK();
830831
// make sure thread 0 is not using the sleep_lock
831832
// so that it may enter the libuv event loop instead
832-
jl_wakeup_thread(0);
833+
jl_fence();
834+
jl_wakeup_thread(jl_atomic_load_relaxed(&io_loop_tid));
835+
}
836+
}
837+
838+
JL_DLLEXPORT void jl_set_io_loop_tid(int16_t tid)
839+
{
840+
if (tid < 0 || tid >= jl_atomic_load_relaxed(&jl_n_threads)) {
841+
// TODO: do we care if this thread has exited or not started yet,
842+
// since ptls2 might not be defined yet and visible on all threads yet
843+
return;
844+
}
845+
jl_atomic_store_relaxed(&io_loop_tid, tid);
846+
jl_fence();
847+
if (jl_atomic_load_relaxed(&_threadedregion) == 0) {
848+
// make sure the previous io_loop_tid leaves the libuv event loop
849+
JL_UV_LOCK();
850+
JL_UV_UNLOCK();
851+
// make sure thread io_loop_tid is not using the sleep_lock
852+
// so that it may enter the libuv event loop instead
853+
jl_wakeup_thread(tid);
833854
}
834855
}
835856

0 commit comments

Comments
 (0)