Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded custom grouped operations with single-row result #2588

Merged
merged 11 commits into from
Feb 10, 2021
12 changes: 7 additions & 5 deletions docs/src/man/split_apply_combine.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,13 @@ It is allowed to mix single values and vectors if multiple transformations
are requested. In this case single value will be repeated to match the length
of columns specified by returned vectors.

A separate task is spawned for each specified transformation, allowing for
parallel operation when several transformations are requested and Julia was
started with more than one thread. Passed transformation functions should
therefore not modify global variables (i.e. they should be pure), or use
locks to control parallel accesses.
A separate task is spawned for each specified transformation; each transformation
then spawns as many tasks as Julia threads, and splits processing of groups across
them (however, currently transformations with optimized implementations like `sum`
bkamins marked this conversation as resolved.
Show resolved Hide resolved
and transformations that return multiple rows use a single task for all groups).
This allows for parallel operation when Julia was started with more than one
thread. Passed transformation functions should therefore not modify global variables
(i.e. they should be pure), or use locks to control parallel accesses.

To apply `function` to each row instead of whole columns, it can be wrapped in a
`ByRow` struct. `cols` can be any column indexing syntax, in which case
Expand Down
11 changes: 7 additions & 4 deletions src/abstractdataframe/selection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,13 @@ const TRANSFORMATION_COMMON_RULES =
`copycols=false`, a `SubDataFrame` is returned without copying columns.

If a `GroupedDataFrame` is passed, a separate task is spawned for each
specified transformation, allowing for parallel operation when several
transformations are requested and Julia was started with more than one thread.
Passed transformation functions should therefore not modify global variables
(i.e. they should be pure), or use locks to control parallel accesses.
specified transformation; each transformation then spawns as many tasks
as Julia threads, and splits processing of groups across them
(however, currently transformations with optimized implementations like `sum`
and transformations that return multiple rows use a single task for all groups).
This allows for parallel operation when Julia was started with more than one
thread. Passed transformation functions should therefore not modify global
variables (i.e. they should be pure), or use locks to control parallel accesses.
In the future, parallelism may be extended to other cases, so this requirement
also holds for `DataFrame` inputs.
"""
Expand Down
161 changes: 136 additions & 25 deletions src/groupeddataframe/complextransforms.jl
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function _combine_with_first(first::Union{NamedTuple, DataFrameRow, AbstractData
f, gd, incols, targetcolnames,
firstmulticol)
else
outcols, finalcolnames = _combine_rows_with_first!(first, initialcols, 1, 1,
outcols, finalcolnames = _combine_rows_with_first!(first, initialcols,
f, gd, incols, targetcolnames,
firstmulticol)
end
Expand Down Expand Up @@ -92,9 +92,92 @@ function fill_row!(row, outcols::NTuple{N, AbstractVector},
return nothing
end

function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow},
function _combine_rows_with_first_task!(tid::Integer,
rowstart::Integer,
rowend::Integer,
rownext::Integer,
outcols::NTuple{<:Any, AbstractVector},
outcolsref::Ref{NTuple{<:Any, AbstractVector}},
type_widened::AbstractVector{Bool},
widen_type_lock::ReentrantLock,
f::Base.Callable,
gd::GroupedDataFrame,
starts::AbstractVector{<:Integer},
ends::AbstractVector{<:Integer},
incols::Union{Nothing, AbstractVector,
Tuple, NamedTuple},
colnames::NTuple{<:Any, Symbol},
firstmulticol::Val)
j = nothing
gdidx = gd.idx
local newoutcols
for i in rownext:rowend
row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol)
j = fill_row!(row, outcols, i, 1, colnames)
if j !== nothing # Need to widen column
# If another thread is already widening outcols, wait until it's done
lock(widen_type_lock)
try
newoutcols = outcolsref[]
# Workaround for julia#15276
newoutcols = let i=i, j=j, newoutcols=newoutcols, row=row
ntuple(length(newoutcols)) do k
S = typeof(row[k])
T = eltype(newoutcols[k])
U = promote_type(S, T)
if S <: T || U <: T
newoutcols[k]
else
type_widened .= true
Tables.allocatecolumn(U, length(newoutcols[k]))
end
end
end
for k in 1:length(outcols)
if outcols[k] !== newoutcols[k]
copyto!(newoutcols[k], rowstart,
outcols[k], rowstart, i - rowstart + (k < j))
end
end
j = fill_row!(row, newoutcols, i, j, colnames)
@assert j === nothing # eltype is guaranteed to match

outcolsref[] = newoutcols
type_widened[tid] = false
finally
unlock(widen_type_lock)
end
return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref,
type_widened, widen_type_lock,
f, gd, starts, ends,
incols, colnames, firstmulticol)
end
# If other thread widened columns, copy already processed data to new vectors
# This doesn't have to happen immediately (hence type_widened isn't atomic),
# but the more we wait the more data will have to be copied
if type_widened[tid]
nalimilan marked this conversation as resolved.
Show resolved Hide resolved
lock(widen_type_lock) do
type_widened[tid] = false
newoutcols = outcolsref[]
for k in 1:length(outcols)
# Check whether this particular column has been widened
if outcols[k] !== newoutcols[k]
copyto!(newoutcols[k], rowstart,
outcols[k], rowstart, i - rowstart + 1)
end
end
end
return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref,
type_widened, widen_type_lock,
f, gd, starts, ends,
incols, colnames, firstmulticol)
end
end
return outcols
end

function _combine_rows_with_first!(firstrow::Union{NamedTuple, DataFrameRow},
outcols::NTuple{N, AbstractVector},
rowstart::Integer, colstart::Integer,
f::Base.Callable, gd::GroupedDataFrame,
incols::Union{Nothing, AbstractVector, Tuple, NamedTuple},
colnames::NTuple{N, Symbol},
Expand All @@ -107,32 +190,60 @@ function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow},
# handle empty GroupedDataFrame
len == 0 && return outcols, colnames

# Handle first group
j = fill_row!(first, outcols, rowstart, colstart, colnames)
@assert j === nothing # eltype is guaranteed to match
# Handle remaining groups
@inbounds for i in rowstart+1:len
row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol)
j = fill_row!(row, outcols, i, 1, colnames)
if j !== nothing # Need to widen column type
local newcols
let i = i, j = j, outcols=outcols, row=row # Workaround for julia#15276
newcols = ntuple(length(outcols)) do k
S = typeof(row[k])
T = eltype(outcols[k])
U = promote_type(S, T)
if S <: T || U <: T
outcols[k]
else
copyto!(Tables.allocatecolumn(U, length(outcols[k])),
1, outcols[k], 1, k >= j ? i-1 : i)
end
# Handle groups other than the first one (which is handled below)
# Create up to one task per thread
# This has lower overhead than creating one task per group,
# but is optimal only if operations take roughly the same time for all groups
@static if VERSION >= v"1.4"
basesize = max(1, cld(len - 1, Threads.nthreads()))
partitions = Iterators.partition(2:len, basesize)
else
partitions = (2:len,)
end
widen_type_lock = ReentrantLock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you create a new lock and not use the GroupedDataFrame lock? Is the other lock used in the part of code that does parallel processing of different transformations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory AFAICT we don't have to take gd.lazy_lock, i.e. it should be possible to compute indices in parallel without conflicting with what we're doing here. But yeah, since the code here requires indices to have been computed, gd.lazy_lock will never be locked when we are here, so it doesn't make a difference and I can reuse gd.lazy_lock for simplicity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about it actually it is OK to use a separate lock I think. The reason is that if you run 2 transformations that produce one row then you want separate locks for both of them.

Still - as commented below - it would be good to have a benchmark of something like:

combine($gd, :y => (y -> sum(y)) => :y1, :y => (y -> sum(y)) => :y2);

(so that we can see what happens to the performance when we run in parallel two transformations that themselves get run in parallel)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes that's probably why I used separate locks. :-)

I'll run more benchmarks after fixing the PR.

outcolsref = Ref{NTuple{<:Any, AbstractVector}}(outcols)
type_widened = fill(false, length(partitions))
tasks = Vector{Task}(undef, length(partitions))
for (tid, idx) in enumerate(partitions)
tasks[tid] =
@spawn _combine_rows_with_first_task!(tid, first(idx), last(idx), first(idx),
outcols, outcolsref,
type_widened, widen_type_lock,
f, gd, starts, ends, incols, colnames,
firstmulticol)
end

# Workaround JuliaLang/julia#38931:
# we want to preserve the exception type thrown in user code,
# and print the backtrace corresponding to it
for t in tasks
try
wait(t)
catch e
throw(t.exception)
end
end

# Copy data for any tasks that finished before others widened columns
outcols = outcolsref[]
for (tid, idx) in enumerate(partitions)
if type_widened[tid]
oldoutcols = fetch(tasks[tid])
for k in 1:length(outcols)
# Check whether this particular column has been widened
if oldoutcols[k] !== outcols[k]
nalimilan marked this conversation as resolved.
Show resolved Hide resolved
copyto!(outcols[k], first(idx), oldoutcols[k], first(idx),
last(idx) - first(idx) + 1)
end
end
return _combine_rows_with_first!(row, newcols, i, j,
f, gd, incols, colnames, firstmulticol)
end
end

# Handle first group
# This is done at the end to write directly to the final outcols
j1 = fill_row!(firstrow, outcols, 1, 1, colnames)
@assert j1 === nothing # eltype is guaranteed to match

return outcols, colnames
end

Expand Down
42 changes: 42 additions & 0 deletions test/grouping.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3577,4 +3577,46 @@ end
end
end

@testset "result eltype widening from different tasks" begin
if VERSION < v"1.5"
Base.convert(::Type{Union{Missing, Nothing, Float64}}, x::Int) = float(x)
end
Random.seed!(1)
for y in (Any[1, missing, missing, 2, 4],
Any[1, missing, nothing, 2.1, 'a'],
Any[1, 1, missing, 1, nothing, 1, 2.1, 1, 'a'],
Any[1, 2, 3, 4, 5, 6, 2.1, missing, 'a'],
Any[1, 2, 3.1, 4, 5, 6, 2.1, missing, 'a']),
x in (1:length(y), rand(1:2, length(y)), rand(1:3, length(y)))
df = DataFrame(x=x, y1=y, y2=reverse(y))
gd = groupby(df, :x)
res = combine(gd, :y1 => (y -> y[1]) => :y1, :y2 => (y -> y[end]) => :y2)
# sleep ensures one task will widen the result after the other is done,
# so that data has to be copied at the end
@test res ≅
combine(gd, [:x, :y1] => ((x, y) -> (sleep((x == [5])/10); y[1])) => :y1,
[:x, :y2] => ((x, y) -> (sleep((x == [5])/10); y[end])) => :y2) ≅
combine(gd, [:x, :y1] => ((x, y) -> (sleep(x[1]/100); y[1])) => :y1,
[:x, :y2] => ((x, y) -> (sleep(x[1]/100); y[end])) => :y2) ≅
combine(gd, [:x, :y1] => ((x, y) -> (sleep(rand()/10); y[1])) => :y1,
[:x, :y2] => ((x, y) -> (sleep(rand()/10); y[end])) => :y2)

if df.x == 1:nrow(df)
@test res ≅ df
end

res = combine(gd, :y1 => (y -> (y1=y[1], y2=y[end])) => AsTable,
:y2 => (y -> (y3=y[1], y4=y[end])) => AsTable)
# sleep ensures one task will widen the result after the other is done,
# so that data has to be copied at the end
@test res ≅
combine(gd, [:x, :y1] => ((x, y) -> (sleep((x == [5])/10); (y1=y[1], y2=y[end]))) => AsTable,
[:x, :y2] => ((x, y) -> (sleep((x == [5])/10); (y3=y[1], y4=y[end]))) => AsTable) ≅
combine(gd, [:x, :y1] => ((x, y) -> (sleep(x[1]/100); (y1=y[1], y2=y[end]))) => AsTable,
[:x, :y2] => ((x, y) -> (sleep(x[1]/100); (y3=y[1], y4=y[end]))) => AsTable) ≅
combine(gd, [:x, :y1] => ((x, y) -> (sleep(rand()/10); (y1=y[1], y2=y[end]))) => AsTable,
[:x, :y2] => ((x, y) -> (sleep(rand()/10); (y3=y[1], y4=y[end]))) => AsTable)
end
nalimilan marked this conversation as resolved.
Show resolved Hide resolved
end

end # module