Skip to content

Tv sched metrics #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions base/lock.jl
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ end
#@assert rl.reentrancy_cnt === 0
rl.reentrancy_cnt = 0x0000_0001
@atomic :release rl.locked_by = ct
# TODO: returning without enable_finalizers() ??
return true
end
GC.enable_finalizers()
Expand All @@ -109,6 +110,8 @@ Each `lock` must be matched by an [`unlock`](@ref).
# it was unlocked, so try to lock it ourself
_trylock(rl, current_task()) && break
else # it was locked, so now wait for the release to notify us
# TODO: verify it is impossible for havelock == 0x00 before
# waiting here.
wait(c)
end
end
Expand All @@ -135,16 +138,13 @@ internal counter and return immediately.
rl.reentrancy_cnt = n
if n == 0x0000_00000
@atomic :monotonic rl.locked_by = nothing
if (@atomicswap :release rl.havelock = 0x00) == 0x02
(@noinline function notifywaiters(rl)
cond_wait = rl.cond_wait
lock(cond_wait)
try
notify(cond_wait)
finally
unlock(cond_wait)
end
end)(rl)
@atomic :release rl.havelock = 0x00
cond_wait = rl.cond_wait
lock(cond_wait)
try
notify(cond_wait, all=false)
finally
unlock(cond_wait)
end
return true
end
Expand Down
5 changes: 5 additions & 0 deletions base/partr.jl
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,16 @@ end


function multiq_insert(task::Task, priority::UInt16)
ccall(:jl_tv_multiq_p_inc, Cvoid, ())

# tpid = task pool id
tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), task)
heap_p = multiq_size(tpid)
tp = tpid + 1

task.priority = priority

# TODO: task pushed to a randomly chosen thread
rn = cong(heap_p, cong_unbias[tp])
tpheaps = heaps[tp]
while !trylock(tpheaps[rn].lock)
Expand Down Expand Up @@ -174,6 +178,7 @@ function multiq_deletemin()
prio1 = heap.tasks[1].priority
end
@atomic :monotonic heap.priority = prio1
ccall(:jl_tv_multiq_m_inc, Cvoid, ())
unlock(heap.lock)

return task
Expand Down
22 changes: 22 additions & 0 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ function task_done_hook(t::Task)
# Clear sigatomic before waiting
sigatomic_end()
try
ccall(:jl_tv_tasks_waiting_m_inc, Cvoid, ()) ## Kludge-
wait() # this will not return
catch e
# If an InterruptException happens while blocked in the event loop, try handing
Expand Down Expand Up @@ -801,6 +802,7 @@ function schedule(t::Task, @nospecialize(arg); error=false)
t.queue === nothing || Base.error("schedule: Task not runnable")
setfield!(t, :result, arg)
end
# TODO: how do we ensure that the same task is not enqueued multiple times?
enq_work(t)
return t
end
Expand Down Expand Up @@ -834,6 +836,10 @@ immediately yields to `t` before calling the scheduler.
function yield(t::Task, @nospecialize(x=nothing))
(t._state === task_state_runnable && t.queue === nothing) || error("yield: Task not runnable")
t.result = x
# ccall(:jl_tv_tasks_running_m_inc, Cvoid, ())
# if hash(time()) % 1000 == 0
# print("TASK-YIELD:\n$(sprint(Base.show_backtrace, Base.stacktrace()))\n\n\n")
# end
enq_work(current_task())
set_next_task(t)
return try_yieldto(ensure_rescheduled)
Expand All @@ -855,6 +861,10 @@ function yieldto(t::Task, @nospecialize(x=nothing))
elseif t._state === task_state_failed
throw(t.result)
end
# ccall(:jl_tv_tasks_running_m_inc, Cvoid, ())
# if hash(time()) % 1000 == 0
# print("TASK-YIELD:\n$(sprint(Base.show_backtrace, Base.stacktrace()))\n\n\n")
# end
t.result = x
set_next_task(t)
return try_yieldto(identity)
Expand All @@ -881,6 +891,10 @@ end

# yield to a task, throwing an exception in it
function throwto(t::Task, @nospecialize exc)
# ccall(:jl_tv_tasks_running_m_inc, Cvoid, ())
# if hash(time()) % 1000 == 0
# print("TASK-YIELD:\n$(sprint(Base.show_backtrace, Base.stacktrace()))\n\n\n")
# end
t.result = exc
t._isexception = true
set_next_task(t)
Expand Down Expand Up @@ -933,12 +947,20 @@ checktaskempty = Partr.multiq_check_empty
end

function wait()
ccall(:jl_tv_tasks_waiting_p_inc, Cvoid, ())
# if hash(time()) % 1000 == 0
# print("TASK-YIELD:\n$(sprint(Base.show_backtrace, Base.stacktrace()))\n\n\n")
# end
GC.safepoint()
W = workqueue_for(Threads.threadid())
poptask(W)
result = try_yieldto(ensure_rescheduled)
# TODO: how does this call to process_events() interact with locks / conditions?
# First thing a task does after waking is to process events?
# Will there be contention on libuv lock?
process_events()
# return when we come out of the queue
ccall(:jl_tv_tasks_waiting_m_inc, Cvoid, ())
return result
end

Expand Down
1 change: 1 addition & 0 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ macro spawn(args...)
let $(letargs...)
local task = Task($thunk)
task.sticky = false
# TODO: return value from jl_set_task_threadpoolid not checked
ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), task, $tpid)
if $(Expr(:islocal, var))
put!($var, task)
Expand Down
5 changes: 5 additions & 0 deletions src/jl_exported_funcs.inc
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,11 @@
XX(jl_try_substrtof) \
XX(jl_tty_set_mode) \
XX(jl_tupletype_fill) \
XX(jl_tv_multiq_p_inc) \
XX(jl_tv_multiq_m_inc) \
XX(jl_tv_tasks_waiting_p_inc) \
XX(jl_tv_tasks_waiting_m_inc) \
XX(jl_tv_getmetric) \
XX(jl_typeassert) \
XX(jl_typeinf_begin) \
XX(jl_typeinf_end) \
Expand Down
9 changes: 9 additions & 0 deletions src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
#define sleep(x) Sleep(1000*x)
#endif

extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_threads_waiting_p;
extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_threads_waiting_m;
extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_p;
extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_m;
extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_multiq_p;
extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_multiq_m;
extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_waiting_p;
extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_waiting_m;

#ifdef __cplusplus
extern "C" {
#endif
Expand Down
16 changes: 14 additions & 2 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ static const int16_t sleeping = 1;
// invariant: Any particular thread is not asleep unless that thread's sleep_check_state is sleeping.
// invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it.
// information: Observing thread not-sleeping is sufficient to ensure the target thread will subsequently inspect its local queue.
// ^^^ ??? TODO
// information: Observing thread is-sleeping says it may be necessary to notify it at least once to wakeup. It may already be awake however for a variety of reasons.
// information: These observations require sequentially-consistent fences to be inserted between each of those operational phases.
// [^store_buffering_1]: These fences are used to avoid the cycle 2b -> 1a -> 1b -> 2a -> 2b where
Expand Down Expand Up @@ -187,6 +188,9 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)
*start_cycles = jl_hrtime();
return 0;
}
// TODO: jl_hrtime() is a wall clock timestamp. This OS thread is not guaranteed to
// run continuously- there might be a context switch, and this thread could resume
// well after sleep_threshold has elapsed?
uint64_t elapsed_cycles = jl_hrtime() - (*start_cycles);
if (elapsed_cycles >= sleep_threshold) {
*start_cycles = 0;
Expand All @@ -195,12 +199,15 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)
return 0;
}


// this doesn't guarantee that on return the thread is waking or awake.
// there is a race condition here where the other thread goes to sleep just
// after this thread checks its state and sees !(jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping)
static int wake_thread(int16_t tid)
{
jl_ptls_t other = jl_all_tls_states[tid];
int8_t state = sleeping;

// TODO: use of condition variable here doesn't adhere to required discipline?
if (jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping) {
if (jl_atomic_cmpswap_relaxed(&other->sleep_check_state, &state, not_sleeping)) {
JL_PROBE_RT_SLEEP_CHECK_WAKE(other, state);
Expand Down Expand Up @@ -255,6 +262,8 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
}
// check if the other threads might be sleeping
if (tid == -1) {
// TODO: every thread woken up when something added to multi-queue??

// something added to the multi-queue: notify all threads
// in the future, we might want to instead wake some fraction of threads,
// and let each of those wake additional threads if they find work
Expand All @@ -281,7 +290,7 @@ static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q)
jl_task_t *task = (jl_task_t*)jl_apply_generic(trypoptask, &q, 1);
if (jl_typeis(task, jl_task_type)) {
int self = jl_atomic_load_relaxed(&jl_current_task->tid);
jl_set_task_tid(task, self);
jl_set_task_tid(task, self); // TODO: return value not checked
return task;
}
return NULL;
Expand All @@ -302,6 +311,7 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT
return jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping;
}

// TODO: what is _threadedregion?
extern _Atomic(unsigned) _threadedregion;

JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty)
Expand Down Expand Up @@ -420,7 +430,9 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_mutex_lock(&sleep_locks[ptls->tid]);
while (may_sleep(ptls)) {
jl_atomic_fetch_add_relaxed(&jl_tv_threads_waiting_p, 1);
uv_cond_wait(&wake_signals[ptls->tid], &sleep_locks[ptls->tid]);
jl_atomic_fetch_add_relaxed(&jl_tv_threads_waiting_m, 1);
// TODO: help with gc work here, if applicable
}
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
Expand Down
Binary file added src/partr.pdf
Binary file not shown.
6 changes: 6 additions & 0 deletions src/signals-unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,12 @@ static void *signal_listener(void *arg)

// notify thread to resume
jl_thread_resume(i, sig);

if (!critical)
{
// Kludge: only sample a single thread, to get an unbiased sample
break;
}
}
jl_unlock_profile();
}
Expand Down
6 changes: 6 additions & 0 deletions src/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ static _Atomic(jl_function_t*) task_done_hook_func JL_GLOBALLY_ROOTED = NULL;

void JL_NORETURN jl_finish_task(jl_task_t *t)
{
jl_atomic_fetch_add_relaxed(&jl_tv_tasks_m, 1);
jl_task_t *ct = jl_current_task;
JL_PROBE_RT_FINISH_TASK(ct);
JL_SIGATOMIC_BEGIN();
Expand Down Expand Up @@ -518,6 +519,7 @@ static void ctx_switch(jl_task_t *lastt)

JL_DLLEXPORT void jl_switch(void)
{
//jl_atomic_fetch_add_relaxed(&jl_tv_tasks_running_p, 1);
jl_task_t *ct = jl_current_task;
jl_ptls_t ptls = ct->ptls;
jl_task_t *t = ptls->next_task;
Expand Down Expand Up @@ -820,6 +822,7 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, jl_value_t *completion
#ifdef _COMPILER_TSAN_ENABLED_
t->ctx.tsan_state = __tsan_create_fiber(0);
#endif
jl_atomic_fetch_add_relaxed(&jl_tv_tasks_p, 1);
return t;
}

Expand All @@ -842,6 +845,7 @@ JL_DLLEXPORT jl_value_t *jl_get_root_task(void)
return (jl_value_t*)ct->ptls->root_task;
}

// TODO: this function has no callers?
JL_DLLEXPORT void jl_task_wait()
{
static jl_function_t *wait_func = NULL;
Expand Down Expand Up @@ -914,6 +918,7 @@ CFI_NORETURN
jl_atomic_store_release(&pt->tid, -1);
#endif

//jl_atomic_fetch_add_relaxed(&jl_tv_tasks_running_p, 1);
ct->started = 1;
JL_PROBE_RT_START_TASK(ct);
if (jl_atomic_load_relaxed(&ct->_isexception)) {
Expand All @@ -939,6 +944,7 @@ CFI_NORETURN
skip_pop_exception:;
}
ct->result = res;
//jl_atomic_fetch_add_relaxed(&jl_tv_tasks_running_m, 1);
jl_gc_wb(ct, ct->result);
jl_finish_task(ct);
jl_gc_debug_critical_error();
Expand Down
53 changes: 53 additions & 0 deletions src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,59 @@ JL_DLLEXPORT _Atomic(uint8_t) jl_measure_compile_time_enabled = 0;
JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_compile_time = 0;
JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_recompile_time = 0;

JL_DLLEXPORT _Atomic(uint64_t) jl_tv_threads_waiting_p = 0;
JL_DLLEXPORT _Atomic(uint64_t) jl_tv_threads_waiting_m = 0;
JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_p = 0;
JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_m = 0;
JL_DLLEXPORT _Atomic(uint64_t) jl_tv_multiq_p = 0;
JL_DLLEXPORT _Atomic(uint64_t) jl_tv_multiq_m = 0;
JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_waiting_p = 0;
JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_waiting_m = 0;

JL_DLLEXPORT void jl_tv_multiq_p_inc(void)
{ jl_atomic_fetch_add_relaxed(&jl_tv_multiq_p, 1); }

JL_DLLEXPORT void jl_tv_multiq_m_inc(void)
{ jl_atomic_fetch_add_relaxed(&jl_tv_multiq_m, 1); }

JL_DLLEXPORT _Atomic(uint64_t) jl_tv_dbg_counter = 0;

JL_DLLEXPORT void jl_tv_tasks_waiting_p_inc(void)
{
jl_atomic_fetch_add_relaxed(&jl_tv_tasks_waiting_p, 1);
if (jl_atomic_fetch_add_relaxed(&jl_tv_dbg_counter, 1) % 521 == 257)
{
JL_TRY {
jl_error(""); // get a backtrace
}
JL_CATCH {
jl_printf((JL_STREAM*)STDERR_FILENO, "\n\nSample of task waiting:\n");
jlbacktrace(); // written to STDERR_FILENO
}
}
}

JL_DLLEXPORT void jl_tv_tasks_waiting_m_inc(void)
{
jl_atomic_fetch_add_relaxed(&jl_tv_tasks_waiting_m, 1);
}

JL_DLLEXPORT int jl_tv_getmetric(int i)
{
switch(i)
{
case 1: return jl_atomic_load_relaxed(&jl_tv_threads_waiting_p);
case 2: return jl_atomic_load_relaxed(&jl_tv_threads_waiting_m);
case 3: return jl_atomic_load_relaxed(&jl_tv_tasks_p);
case 4: return jl_atomic_load_relaxed(&jl_tv_tasks_m);
case 5: return jl_atomic_load_relaxed(&jl_tv_multiq_p);
case 6: return jl_atomic_load_relaxed(&jl_tv_multiq_m);
case 7: return jl_atomic_load_relaxed(&jl_tv_tasks_waiting_p);
case 8: return jl_atomic_load_relaxed(&jl_tv_tasks_waiting_m);
default: return 0;
}
}

// return calling thread's ID
JL_DLLEXPORT int16_t jl_threadid(void)
{
Expand Down