Simple Multithreading in Julia
Documentation | Build Status | Quality |
---|---|---|
This is meant to be a simple, unambitious package that provides user-friendly ways of doing task-parallel multithreaded calculations via higher-order functions, with a focus on data parallelism without needing to expose julia's Task model to users.
Unlike most JuliaFolds2 packages, it is not built off of Transducers.jl, nor is it a building block for Transducers.jl. Rather, OhMyThreads is meant to be a simpler, more maintainable, and more accessible alternative to packages like ThreadsX.jl or Folds.jl.
OhMyThreads.jl re-exports the function chunks
from
ChunkSplitters.jl, and provides the following functions:
tmapreduce
tmapreduce(f, op, A::AbstractArray...;
[init],
nchunks::Int = nthreads(),
split::Symbol = :batch,
schedule::Symbol =:dynamic,
outputtype::Type = Any)
A multithreaded function like Base.mapreduce
. Perform a reduction over A
, applying a single-argument function f
to each element, and then combining them with the two-argument function op
. op
must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c)
. If op
is not (approximately) associative, you will get undefined results.
For a very well known example of mapreduce
, sum(f, A)
is equivalent to mapreduce(f, +, A)
. Doing
tmapreduce(√, +, [1, 2, 3, 4, 5])
is the parallelized version of
(√1 + √2) + (√3 + √4) + √5
This data is divided into chunks to be worked on in parallel using ChunkSplitters.jl.
-
init
optional keyword argument forwarded tomapreduce
for the sequential parts of the calculation. -
nchunks::Int
(defaultnthreads()
) is passed toChunkSplitters.chunks
to inform it how many pieces of data should be worked on in parallel. Greaternchunks
typically helps with load balancing, but at the expense of creating more overhead. -
split::Symbol
(default:batch
) is passed toChunkSplitters.chunks
to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). Ifscatter
is chosen, then your reducing operatorop
must be commutative in addition to being associative, or you could get incorrect results! -
schedule::Symbol
(default:dynamic
), determines how the parallel portions of the calculation are scheduled. Options are one of:dynamic
: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system.:static
: can sometimes be more performant than:dynamic
when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.:greedy
: best option for load-balancing slower, uneven computations, but does carry some additional overhead. This schedule will read from the contents ofA
in a non-deterministic order, and thus your reducingop
must be commutative in addition to being associative, or you could get incorrect results! This schedule will however work with non-AbstractArray
iterables. If you use the:greedy
scheduler, we strongly recommend you provide aninit
keyword argument.:interactive
: like:dynamic
but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time withoutyield
ing as it can interfere with heartbeat processes running on the interactive threadpool.
-
outputtype::Type
(defaultAny
) will work as the asserted output type of parallel calculations. This is typically only
needed if you are using a :static
schedule, since the :dynamic
schedule is uses StableTasks.jl, but if you experience problems with type stability, you may be able to recover it with the outputtype
keyword argument.
treducemap
treducemap(op, f, A::AbstractArray...;
[init],
nchunks::Int = nthreads(),
split::Symbol = :batch,
schedule::Symbol =:dynamic,
outputtype::Type = Any)
Like tmapreduce
except the order of the f
and op
arguments are switched. This is sometimes convenient with do
-block notation. Perform a reduction over A
, applying a single-argument function f
to each element, and then combining them with the two-argument function op
. op
must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c)
. If op
is not (approximately) associative, you will get undefined results.
For a very well known example of mapreduce
, sum(f, A)
is equivalent to mapreduce(f, +, A)
. Doing
treducemap(+, √, [1, 2, 3, 4, 5])
is the parallelized version of
(√1 + √2) + (√3 + √4) + √5
This data is divided into chunks to be worked on in parallel using ChunkSplitters.jl.
-
init
optional keyword argument forwarded tomapreduce
for the sequential parts of the calculation. -
nchunks::Int
(defaultnthreads()
) is passed toChunkSplitters.chunks
to inform it how many pieces of data should be worked on in parallel. Greaternchunks
typically helps with load balancing, but at the expense of creating more overhead. -
split::Symbol
(default:batch
) is passed toChunkSplitters.chunks
to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). Ifscatter
is chosen, then your reducing operatorop
must be commutative in addition to being associative, or you could get incorrect results! -
schedule::Symbol
(default:dynamic
), determines how the parallel portions of the calculation are scheduled. Options are one of:dynamic
: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system.:static
: can sometimes be more performant than:dynamic
when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.:greedy
: best option for load-balancing slower, uneven computations, but does carry some additional overhead. This schedule will read from the contents ofA
in a non-deterministic order, and thus your reducingop
must be commutative in addition to being associative, or you could get incorrect results! This schedule will however work with non-AbstractArray
iterables. If you use the:greedy
scheduler, we strongly recommend you provide aninit
keyword argument.:interactive
: like:dynamic
but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time withoutyield
ing as it can interfere with heartbeat processes running on the interactive threadpool.
-
outputtype::Type
(defaultAny
) will work as the asserted output type of parallel calculations. This is typically only
needed if you are using a :static
schedule, since the :dynamic
schedule is uses StableTasks.jl, but if you experience problems with type stability, you may be able to recover it with the outputtype
keyword argument.
treduce
treduce(op, A::AbstractArray...;
[init],
nchunks::Int = nthreads(),
split::Symbol = :batch,
schedule::Symbol =:dynamic,
outputtype::Type = Any)
Like tmapreduce
except the order of the f
and op
arguments are switched. Perform a reduction over A
, applying a single-argument function f
to each element, and then combining them with the two-argument function op
. op
must be an associative function, in the sense that op(a, op(b, c)) ≈ op(op(a, b), c)
. If op
is not (approximately) associative, you will get undefined results.
For a very well known example of reduce
, sum(A)
is equivalent to reduce(+, A)
. Doing
treduce(+, [1, 2, 3, 4, 5])
is the parallelized version of
(1 + 2) + (3 + 4) + 5
This data is divided into chunks to be worked on in parallel using ChunkSplitters.jl.
-
init
optional keyword argument forwarded tomapreduce
for the sequential parts of the calculation. -
nchunks::Int
(defaultnthreads()
) is passed toChunkSplitters.chunks
to inform it how many pieces of data should be worked on in parallel. Greaternchunks
typically helps with load balancing, but at the expense of creating more overhead. -
split::Symbol
(default:batch
) is passed toChunkSplitters.chunks
to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). Ifscatter
is chosen, then your reducing operatorop
must be commutative in addition to being associative, or you could get incorrect results! -
schedule::Symbol
(default:dynamic
), determines how the parallel portions of the calculation are scheduled. Options are one of:dynamic
: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system.:static
: can sometimes be more performant than:dynamic
when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.:greedy
: best option for load-balancing slower, uneven computations, but does carry some additional overhead. This schedule will read from the contents ofA
in a non-deterministic order, and thus your reducingop
must be commutative in addition to being associative, or you could get incorrect results! This schedule will however work with non-AbstractArray
iterables. If you use the:greedy
scheduler, we strongly recommend you provide aninit
keyword argument.:interactive
: like:dynamic
but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time withoutyield
ing as it can interfere with heartbeat processes running on the interactive threadpool.
-
outputtype::Type
(defaultAny
) will work as the asserted output type of parallel calculations. This is typically only
needed if you are using a :static
schedule, since the :dynamic
schedule is uses StableTasks.jl, but if you experience problems with type stability, you may be able to recover it with the outputtype
keyword argument.
tmap
tmap(f, [OutputElementType], A::AbstractArray...;
nchunks::Int = nthreads(),
split::Symbol = :batch,
schedule::Symbol =:dynamic)
A multithreaded function like Base.map
. Create a new container similar
to A
whose i
th element is equal to f(A[i])
. This container is filled in parallel on multiple tasks. The optional argument OutputElementType
will select a specific element type for the returned container, and will generally incur fewer allocations than the version where OutputElementType
is not specified.
-
nchunks::Int
(defaultnthreads()
) is passed toChunkSplitters.chunks
to inform it how many pieces of data should be worked on in parallel. Greaternchunks
typically helps with load balancing, but at the expense of creating more overhead. -
split::Symbol
(default:batch
) is passed toChunkSplitters.chunks
to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). Ifscatter
is chosen, then your reducing operatorop
must be commutative in addition to being associative, or you could get incorrect results! -
schedule::Symbol
(default:dynamic
), determines how the parallel portions of the calculation are scheduled. Options are one of:dynamic
: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system.:static
: can sometimes be more performant than:dynamic
when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.:greedy
: best option for load-balancing slower, uneven computations, but does carry some additional overhead. This schedule only works if theOutputElementType
argument is provided.:interactive
: like:dynamic
but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time withoutyield
ing as it can interfere with heartbeat processes running on the interactive threadpool.
tmap!
tmap!(f, out, A::AbstractArray...;
nchunks::Int = nthreads(),
split::Symbol = :batch,
schedule::Symbol =:dynamic)
A multithreaded function like Base.map!
. In parallel on multiple tasks, this function assigns each element of out[i] = f(A[i])
for each index i
of A
and out
.
-
nchunks::Int
(defaultnthreads()
) is passed toChunkSplitters.chunks
to inform it how many pieces of data should be worked on in parallel. Greaternchunks
typically helps with load balancing, but at the expense of creating more overhead. -
split::Symbol
(default:batch
) is passed toChunkSplitters.chunks
to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). Ifscatter
is chosen, then your reducing operatorop
must be commutative in addition to being associative, or you could get incorrect results! -
schedule::Symbol
(default:dynamic
), determines how the parallel portions of the calculation are scheduled. Options are one of:dynamic
: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system.:static
: can sometimes be more performant than:dynamic
when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.:greedy
: best option for load-balancing slower, uneven computations, but does carry some additional overhead.:interactive
: like:dynamic
but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time withoutyield
ing as it can interfere with heartbeat processes running on the interactive threadpool.
tforeach
tforeach(f, A::AbstractArray...;
nchunks::Int = nthreads(),
split::Symbol = :batch,
schedule::Symbol =:dynamic) :: Nothing
A multithreaded function like Base.foreach
. Apply f
to each element of A
on multiple parallel tasks, and return nothing
, i.e. it is the parallel equivalent of
for x in A
f(x)
end
-
nchunks::Int
(defaultnthreads()
) is passed toChunkSplitters.chunks
to inform it how many pieces of data should be worked on in parallel. Greaternchunks
typically helps with load balancing, but at the expense of creating more overhead. -
split::Symbol
(default:batch
) is passed toChunkSplitters.chunks
to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). Ifscatter
is chosen, then your reducing operatorop
must be commutative in addition to being associative, or you could get incorrect results! -
schedule::Symbol
(default:dynamic
), determines how the parallel portions of the calculation are scheduled. Options are one of:dynamic
: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system.:static
: can sometimes be more performant than:dynamic
when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.:greedy
: best option for load-balancing slower, uneven computations, but does carry some additional overhead.:interactive
: like:dynamic
but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time withoutyield
ing as it can interfere with heartbeat processes running on the interactive threadpool.
tcollect
tcollect([OutputElementType], gen::Union{AbstractArray, Generator{<:AbstractArray}};
nchunks::Int = nthreads(),
schedule::Symbol =:dynamic)
A multithreaded function like Base.collect
. Essentially just calls tmap
on the generator function and inputs. The optional argument OutputElementType
will select a specific element type for the returned container, and will generally incur fewer allocations than the version where OutputElementType
is not specified.
-
nchunks::Int
(defaultnthreads()
) is passed toChunkSplitters.chunks
to inform it how many pieces of data should be worked on in parallel. Greaternchunks
typically helps with load balancing, but at the expense of creating more overhead. -
schedule::Symbol
(default:dynamic
), determines how the parallel portions of the calculation are scheduled. Options are one of:dynamic
: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system.:static
: can sometimes be more performant than:dynamic
when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.:greedy
: best option for load-balancing slower, uneven computations, but does carry some additional overhead. This schedule only works if theOutputElementType
argument is provided.:interactive
: like:dynamic
but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time withoutyield
ing as it can interfere with heartbeat processes running on the interactive threadpool.