Skip to content

Implement work stealing #373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ version = "0.16.3"
[deps]
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
ContextVariablesX = "6add18c4-b38d-439d-96f6-d6bc489c04c5"
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
Expand Down
36 changes: 36 additions & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,5 +227,41 @@ opts = Dagger.Sch.ThunkOptions(;single=1)
delayed(+)(1, 2; options=opts)
```

### Core vs. Worker Schedulers

Dagger's scheduler is really two kinds of entities: the "core" scheduler, and
"worker" schedulers:

The core scheduler runs on worker 1, thread 1, and is the entrypoint to tasks
which have been submitted. The core scheduler manages all task dependencies,
notifies calls to `wait` and `fetch` of task completion, and generally performs
initial task placement. The core scheduler has cached information about each
worker and their processors, and uses that information (together with metrics
about previous tasks and other aspects of the Dagger runtime) to generate a
near-optimal just-in-time task schedule.

The worker schedulers each run as a set of tasks across all workers and all
processors, and handles data movement and task execution. Once the core
scheduler has scheduled and launched a task, it arrives at the worker scheduler
for handling. The worker scheduler will pass the task to a queue for the
assigned processor, where it will wait until the processor has a sufficient
amount of "occupancy" for the task. Once the processor is ready for the task,
it will first fetch all arguments to the task from other workers, and then it
will execute the task, package the result into a `Chunk`, and pass that back to
the core scheduler.

### Workload Balancing

In general, Dagger's core scheduler tries to balance workloads as much as
possible across all the available processors, but it can fail to do so
effectively when either the cached per-processor information is outdated, or
when the estimates about the task's behavior are inaccurate. To minimize the
impact of this potential workload imbalance, the worker schedulers' processors
will attempt to steal tasks from each other when they are under-occupied. Tasks
will only be stolen if their [scope](`Scopes`) matches the processor attempting
the steal, so tasks with wider scopes have better balancing potential.

### Scheduler/Thunk Options

[`Dagger.Sch.SchedulerOptions`](@ref)
[`Dagger.Sch.ThunkOptions`](@ref)
1 change: 1 addition & 0 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ include("chunks.jl")
include("compute.jl")
include("utils/clock.jl")
include("utils/system_uuid.jl")
include("utils/locked-object.jl")
include("sch/Sch.jl"); using .Sch

# Array computations
Expand Down
11 changes: 6 additions & 5 deletions src/processor.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
export OSProc, Context, addprocs!, rmprocs!

import Base: @invokelatest

"""
Processor

Expand Down Expand Up @@ -157,8 +159,9 @@ function execute!(proc::ThreadProc, @nospecialize(f), @nospecialize(args...))
task = Task() do
set_tls!(tls)
TimespanLogging.prof_task_put!(tls.sch_handle.thunk_id.id)
f(args...)
@invokelatest f(args...)
end
task.sticky = true
ret = ccall(:jl_set_task_tid, Cint, (Any, Cint), task, proc.tid-1)
if ret == 0
error("jl_set_task_tid == 0")
Expand Down Expand Up @@ -310,8 +313,7 @@ get_tls() = (
sch_uid=task_local_storage(:_dagger_sch_uid),
sch_handle=task_local_storage(:_dagger_sch_handle),
processor=thunk_processor(),
time_utilization=task_local_storage(:_dagger_time_utilization),
alloc_utilization=task_local_storage(:_dagger_alloc_utilization),
task_spec=task_local_storage(:_dagger_task_spec),
)

"""
Expand All @@ -323,6 +325,5 @@ function set_tls!(tls)
task_local_storage(:_dagger_sch_uid, tls.sch_uid)
task_local_storage(:_dagger_sch_handle, tls.sch_handle)
task_local_storage(:_dagger_processor, tls.processor)
task_local_storage(:_dagger_time_utilization, tls.time_utilization)
task_local_storage(:_dagger_alloc_utilization, tls.alloc_utilization)
task_local_storage(:_dagger_task_spec, tls.task_spec)
end
Loading