-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement mapreduce #561
Draft
vchuravy
wants to merge
3
commits into
master
Choose a base branch
from
vc/ka_mapreducedim
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+183
−13
Draft
Implement mapreduce #561
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,10 +2,10 @@ | |
|
||
const AbstractArrayOrBroadcasted = Union{AbstractArray,Broadcast.Broadcasted} | ||
|
||
# GPUArrays' mapreduce methods build on `Base.mapreducedim!`, but with an additional | ||
# argument `init` value to avoid eager initialization of `R` (if set to something). | ||
mapreducedim!(f, op, R::AnyGPUArray, A::AbstractArrayOrBroadcasted; | ||
init=nothing) = error("Not implemented") # COV_EXCL_LINE | ||
# # GPUArrays' mapreduce methods build on `Base.mapreducedim!`, but with an additional | ||
# # argument `init` value to avoid eager initialization of `R` (if set to something). | ||
# mapreducedim!(f, op, R::AnyGPUArray, A::AbstractArrayOrBroadcasted; | ||
# init=nothing) = error("Not implemented") # COV_EXCL_LINE | ||
# resolve ambiguities | ||
Base.mapreducedim!(f, op, R::AnyGPUArray, A::AbstractArray) = mapreducedim!(f, op, R, A) | ||
Base.mapreducedim!(f, op, R::AnyGPUArray, A::Broadcast.Broadcasted) = mapreducedim!(f, op, R, A) | ||
|
@@ -142,3 +142,182 @@ function Base.:(==)(A::AnyGPUArray, B::AnyGPUArray) | |
res = mapreduce(mapper, reducer, A, B; init=(; is_missing=false, is_equal=true)) | ||
res.is_missing ? missing : res.is_equal | ||
end | ||
|
||
|
||
import KernelAbstractions: @context | ||
|
||
@inline function reduce_group(@context, op, val::T, neutral, ::Val{maxitems}) where {T, maxitems} | ||
items = @groupsize()[1] | ||
item = @index(Local, Linear) | ||
|
||
# local mem for a complete reduction | ||
shared = @localmem T (maxitems,) | ||
@inbounds shared[item] = val | ||
|
||
# perform a reduction | ||
d = 1 | ||
while d < items | ||
@synchronize() # legal since cpu=false | ||
index = 2 * d * (item-1) + 1 | ||
@inbounds if index <= items | ||
other_val = if index + d <= items | ||
shared[index+d] | ||
else | ||
neutral | ||
end | ||
shared[index] = op(shared[index], other_val) | ||
end | ||
d *= 2 | ||
end | ||
|
||
# load the final value on the first item | ||
if item == 1 | ||
val = @inbounds shared[item] | ||
end | ||
|
||
return val | ||
end | ||
|
||
Base.@propagate_inbounds _map_getindex(args::Tuple, I) = ((args[1][I]), _map_getindex(Base.tail(args), I)...) | ||
Base.@propagate_inbounds _map_getindex(args::Tuple{Any}, I) = ((args[1][I]),) | ||
Base.@propagate_inbounds _map_getindex(args::Tuple{}, I) = () | ||
|
||
# Reduce an array across the grid. All elements to be processed can be addressed by the | ||
# product of the two iterators `Rreduce` and `Rother`, where the latter iterator will have | ||
# singleton entries for the dimensions that should be reduced (and vice versa). | ||
@kernel cpu=false function partial_mapreduce_device(f, op, neutral, maxitems, Rreduce, Rother, R, As...) | ||
# decompose the 1D hardware indices into separate ones for reduction (across items | ||
# and possibly groups if it doesn't fit) and other elements (remaining groups) | ||
localIdx_reduce = @index(Local, Linear) | ||
localDim_reduce = @groupsize()[1] | ||
groupIdx_reduce, groupIdx_other = fldmod1(@index(Group, Linear), length(Rother)) | ||
numGroups = length(KernelAbstractions.blocks(KernelAbstractions.__iterspace(@context()))) | ||
groupDim_reduce = numGroups ÷ length(Rother) | ||
|
||
# group-based indexing into the values outside of the reduction dimension | ||
# (that means we can safely synchronize items within this group) | ||
iother = groupIdx_other | ||
@inbounds if iother <= length(Rother) | ||
Iother = Rother[iother] | ||
|
||
# load the neutral value | ||
Iout = CartesianIndex(Tuple(Iother)..., groupIdx_reduce) | ||
neutral = if neutral === nothing | ||
R[Iout] | ||
else | ||
neutral | ||
end | ||
|
||
val = op(neutral, neutral) | ||
|
||
# reduce serially across chunks of input vector that don't fit in a group | ||
ireduce = localIdx_reduce + (groupIdx_reduce - 1) * localDim_reduce | ||
while ireduce <= length(Rreduce) | ||
Ireduce = Rreduce[ireduce] | ||
J = max(Iother, Ireduce) | ||
val = op(val, f(_map_getindex(As, J)...)) | ||
ireduce += localDim_reduce * groupDim_reduce | ||
end | ||
|
||
val = reduce_group(@context(), op, val, neutral, maxitems) | ||
|
||
# write back to memory | ||
if localIdx_reduce == 1 | ||
R[Iout] = val | ||
end | ||
end | ||
end | ||
|
||
## COV_EXCL_STOP | ||
|
||
function mapreducedim!(f::F, op::OP, R::AnyGPUArray{T}, A::AbstractArrayOrBroadcasted; | ||
init=nothing) where {F, OP, T} | ||
Base.check_reducedims(R, A) | ||
length(A) == 0 && return R # isempty(::Broadcasted) iterates | ||
|
||
# add singleton dimensions to the output container, if needed | ||
if ndims(R) < ndims(A) | ||
dims = Base.fill_to_length(size(R), 1, Val(ndims(A))) | ||
R = reshape(R, dims) | ||
end | ||
|
||
# iteration domain, split in two: one part covers the dimensions that should | ||
# be reduced, and the other covers the rest. combining both covers all values. | ||
Rall = CartesianIndices(axes(A)) | ||
Rother = CartesianIndices(axes(R)) | ||
Rreduce = CartesianIndices(ifelse.(axes(A) .== axes(R), Ref(Base.OneTo(1)), axes(A))) | ||
# NOTE: we hard-code `OneTo` (`first.(axes(A))` would work too) or we get a | ||
# CartesianIndices object with UnitRanges that behave badly on the GPU. | ||
@assert length(Rall) == length(Rother) * length(Rreduce) | ||
|
||
# allocate an additional, empty dimension to write the reduced value to. | ||
# this does not affect the actual location in memory of the final values, | ||
# but allows us to write a generalized kernel supporting partial reductions. | ||
R′ = reshape(R, (size(R)..., 1)) | ||
|
||
# how many items do we want? | ||
# | ||
# items in a group work together to reduce values across the reduction dimensions; | ||
# we want as many as possible to improve algorithm efficiency and execution occupancy. | ||
wanted_items = length(Rreduce) | ||
function compute_items(max_items) | ||
if wanted_items > max_items | ||
max_items | ||
else | ||
wanted_items | ||
end | ||
end | ||
|
||
# how many items can we launch? | ||
# | ||
# we might not be able to launch all those items to reduce each slice in one go. | ||
# that's why each items also loops across their inputs, processing multiple values | ||
# so that we can span the entire reduction dimension using a single item group. | ||
|
||
# group size is restricted by local memory | ||
# max_lmem_elements = compute_properties(device()).maxSharedLocalMemory ÷ sizeof(T) | ||
# max_items = min(compute_properties(device()).maxTotalGroupSize, | ||
# compute_items(max_lmem_elements ÷ 2)) | ||
# TODO: dynamic local memory to avoid two compilations | ||
|
||
# let the driver suggest a group size | ||
# args = (f, op, init, Val(max_items), Rreduce, Rother, R′, A) | ||
# kernel_args = kernel_convert.(args) | ||
# kernel_tt = Tuple{Core.Typeof.(kernel_args)...} | ||
# kernel = zefunction(partial_mapreduce_device, kernel_tt) | ||
# reduce_items = launch_configuration(kernel) | ||
reduce_items = 512 | ||
reduce_kernel = partial_mapreduce_device(get_backend(R), (reduce_items,)) | ||
|
||
# how many groups should we launch? | ||
# | ||
# even though we can always reduce each slice in a single item group, that may not be | ||
# optimal as it might not saturate the GPU. we already launch some groups to process | ||
# independent dimensions in parallel; pad that number to ensure full occupancy. | ||
other_groups = length(Rother) | ||
reduce_groups = cld(length(Rreduce), reduce_items) | ||
|
||
# determine the launch configuration | ||
items = reduce_items | ||
groups = reduce_groups*other_groups | ||
|
||
ndrange = groups*items | ||
|
||
# perform the actual reduction | ||
if reduce_groups == 1 | ||
# we can cover the dimensions to reduce using a single group | ||
reduce_kernel(f, op, init, Val(items), Rreduce, Rother, R′, A; ndrange) | ||
else | ||
# we need multiple steps to cover all values to reduce | ||
partial = similar(R, (size(R)..., reduce_groups)) | ||
if init === nothing | ||
# without an explicit initializer we need to copy from the output container | ||
partial .= R | ||
end | ||
reduce_kernel(f, op, init, Val(items), Rreduce, Rother, partial, A; ndrange) | ||
|
||
GPUArrays.mapreducedim!(identity, op, R′, partial; init=init) | ||
Comment on lines
+311
to
+319
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This may be a good time to add support for grid stride loops to KA.jl and handle this with a single kernel launch + atomic writes to global memory? |
||
end | ||
|
||
return R | ||
end |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But also has to become dynamic, of course.