Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ function notify_filled(buffer::IOBuffer, nread::Int)
buffer.size += nread
else
buffer.ptr += nread
buffer.size = max(buffer.size, buffer.ptr - 1)
end
nothing
end
Expand Down
8 changes: 3 additions & 5 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,6 @@ JL_DLLEXPORT void *jl_uv_handle_data(uv_handle_t *handle) { return handle->data;
*/
JL_DLLEXPORT void *jl_uv_write_handle(uv_write_t *req) { return req->handle; }

extern _Atomic(unsigned) _threadedregion;

/**
* @brief Process pending UV events.
*
Expand All @@ -387,7 +385,7 @@ JL_DLLEXPORT int jl_process_events(void)
jl_task_t *ct = jl_current_task;
uv_loop_t *loop = jl_io_loop;
jl_gc_safepoint_(ct->ptls);
if (loop && (jl_atomic_load_relaxed(&_threadedregion) || jl_atomic_load_relaxed(&ct->tid) == 0)) {
if (loop && (jl_atomic_load_relaxed(&_threadedregion) || jl_atomic_load_relaxed(&ct->tid) == jl_atomic_load_relaxed(&io_loop_tid))) {
if (jl_atomic_load_relaxed(&jl_uv_n_waiters) == 0 && jl_mutex_trylock(&jl_uv_mutex)) {
JL_PROBE_RT_START_PROCESS_EVENTS(ct);
loop->stop_flag = 0;
Expand Down Expand Up @@ -637,7 +635,7 @@ JL_DLLEXPORT int jl_fs_write(uv_os_fd_t handle, const char *data, size_t len,
{
jl_task_t *ct = jl_get_current_task();
// TODO: fix this cheating
if (jl_get_safe_restore() || ct == NULL || jl_atomic_load_relaxed(&ct->tid) != 0)
if (jl_get_safe_restore() || ct == NULL || jl_atomic_load_relaxed(&ct->tid) != jl_atomic_load_relaxed(&io_loop_tid))
#ifdef _OS_WINDOWS_
return WriteFile(handle, data, len, NULL, NULL);
#else
Expand Down Expand Up @@ -718,7 +716,7 @@ JL_DLLEXPORT void jl_uv_puts(uv_stream_t *stream, const char *str, size_t n)

// TODO: Hack to make CoreIO thread-safer
jl_task_t *ct = jl_get_current_task();
if (ct == NULL || jl_atomic_load_relaxed(&ct->tid) != 0) {
if (ct == NULL || jl_atomic_load_relaxed(&ct->tid) != jl_atomic_load_relaxed(&io_loop_tid)) {
if (stream == JL_STDOUT) {
fd = UV_STDOUT_FD;
}
Expand Down
23 changes: 2 additions & 21 deletions src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,6 @@ JL_DLLIMPORT void *__tsan_get_current_fiber(void);
JL_DLLIMPORT void __tsan_destroy_fiber(void *fiber);
JL_DLLIMPORT void __tsan_switch_to_fiber(void *fiber, unsigned flags);
#endif
#ifdef __cplusplus
}
#endif

// Remove when C11 is required for C code.
#ifndef static_assert
# ifndef __cplusplus
// C11 should already have `static_assert` from `<assert.h>` so there's no need
// to check C version.
# ifdef __GNUC__
# define static_assert _Static_assert
# else
# define static_assert(...)
# endif
# endif
// For C++, C++11 or MSVC is required. Both provide `static_assert`.
#endif

#ifndef alignof
# ifndef __cplusplus
Expand Down Expand Up @@ -182,10 +165,8 @@ extern jl_mutex_t jl_uv_mutex;
extern _Atomic(int) jl_uv_n_waiters;
void JL_UV_LOCK(void);
#define JL_UV_UNLOCK() JL_UNLOCK(&jl_uv_mutex)

#ifdef __cplusplus
extern "C" {
#endif
extern _Atomic(unsigned) _threadedregion;
extern _Atomic(uint16_t) io_loop_tid;

int jl_running_under_rr(int recheck) JL_NOTSAFEPOINT;

Expand Down
14 changes: 6 additions & 8 deletions src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,6 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT
}


extern _Atomic(unsigned) _threadedregion;

JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty)
{
jl_task_t *ct = jl_current_task;
Expand All @@ -440,7 +438,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,

jl_cpu_pause();
jl_ptls_t ptls = ct->ptls;
if (sleep_check_after_threshold(&start_cycles) || (ptls->tid == 0 && (!jl_atomic_load_relaxed(&_threadedregion) || wait_empty))) {
if (sleep_check_after_threshold(&start_cycles) || (ptls->tid == jl_atomic_load_relaxed(&io_loop_tid) && (!jl_atomic_load_relaxed(&_threadedregion) || wait_empty))) {
// acquire sleep-check lock
jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping);
jl_fence(); // [^store_buffering_1]
Expand Down Expand Up @@ -487,7 +485,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
if (jl_atomic_load_relaxed(&_threadedregion)) {
uvlock = jl_mutex_trylock(&jl_uv_mutex);
}
else if (ptls->tid == 0) {
else if (ptls->tid == jl_atomic_load_relaxed(&io_loop_tid)) {
uvlock = 1;
JL_UV_LOCK();
}
Expand Down Expand Up @@ -528,7 +526,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
start_cycles = 0;
continue;
}
if (!enter_eventloop && !jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0) {
if (!enter_eventloop && !jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == jl_atomic_load_relaxed(&io_loop_tid)) {
// thread 0 is the only thread permitted to run the event loop
// so it needs to stay alive, just spin-looping if necessary
if (set_not_sleeping(ptls)) {
Expand All @@ -545,10 +543,10 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
assert(wasrunning);
if (wasrunning == 1) {
// This was the last running thread, and there is no thread with !may_sleep
// so make sure tid 0 is notified to check wait_empty
// so make sure io_loop_tid is notified to check wait_empty
// TODO: this also might be a good time to check again that
// libuv's queue is truly empty, instead of during delete_thread
if (ptls->tid != 0) {
if (ptls->tid != jl_atomic_load_relaxed(&io_loop_tid)) {
uv_mutex_lock(&ptls->sleep_lock);
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
uv_mutex_unlock(&ptls->sleep_lock);
Expand Down Expand Up @@ -599,7 +597,7 @@ void scheduler_delete_thread(jl_ptls_t ptls) JL_NOTSAFEPOINT
jl_fence();
if (notsleeping) {
if (jl_atomic_load_relaxed(&nrunning) == 1) {
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[0];
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[jl_atomic_load_relaxed(&io_loop_tid)];
// This was the last running thread, and there is no thread with !may_sleep
// so make sure tid 0 is notified to check wait_empty
uv_mutex_lock(&ptls2->sleep_lock);
Expand Down
25 changes: 23 additions & 2 deletions src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,8 @@ void jl_start_threads(void)
uv_barrier_wait(&thread_init_done);
}

_Atomic(unsigned) _threadedregion; // HACK: keep track of whether to prioritize IO or threading
_Atomic(unsigned) _threadedregion; // keep track of whether to prioritize IO or threading
_Atomic(uint16_t) io_loop_tid; // mark which thread is assigned to run the uv_loop

JL_DLLEXPORT int jl_in_threaded_region(void)
{
Expand All @@ -821,7 +822,27 @@ JL_DLLEXPORT void jl_exit_threaded_region(void)
JL_UV_UNLOCK();
// make sure thread 0 is not using the sleep_lock
// so that it may enter the libuv event loop instead
jl_wakeup_thread(0);
jl_fence();
jl_wakeup_thread(jl_atomic_load_relaxed(&io_loop_tid));
}
}

JL_DLLEXPORT void jl_set_io_loop_tid(int16_t tid)
{
if (tid < 0 || tid >= jl_atomic_load_relaxed(&jl_n_threads)) {
// TODO: do we care if this thread has exited or not started yet,
// since ptls2 might not be defined yet and visible on all threads yet
return;
}
jl_atomic_store_relaxed(&io_loop_tid, tid);
jl_fence();
if (jl_atomic_load_relaxed(&_threadedregion) == 0) {
// make sure the previous io_loop_tid leaves the libuv event loop
JL_UV_LOCK();
JL_UV_UNLOCK();
// make sure thread io_loop_tid is not using the sleep_lock
// so that it may enter the libuv event loop instead
jl_wakeup_thread(tid);
}
}

Expand Down