From ecfc733607526b5474090d15518d85ce587526d1 Mon Sep 17 00:00:00 2001 From: Milan Bouchet-Valat Date: Wed, 10 Feb 2021 22:00:27 +0100 Subject: [PATCH] Multithreaded custom grouped operations with single-row result (#2588) Spawn one task per thread in `_combine_rows_with_first!` so that custom grouped operations that return a single row are run in parallel. This is optimal if operations take about the same time for all groups. Spawning one task per group could be faster if these times vary a lot, but the overhead would be larger: we could add this as an option later. The implementation is somewhat tricky as output columns need to be reallocated when a new return type is detected. --- docs/src/man/split_apply_combine.md | 12 +- src/abstractdataframe/selection.jl | 11 +- src/groupeddataframe/complextransforms.jl | 198 +++++++++++++++++++--- test/grouping.jl | 70 ++++++++ test/runtests.jl | 8 +- 5 files changed, 264 insertions(+), 35 deletions(-) diff --git a/docs/src/man/split_apply_combine.md b/docs/src/man/split_apply_combine.md index b70509d66c..270d45e777 100644 --- a/docs/src/man/split_apply_combine.md +++ b/docs/src/man/split_apply_combine.md @@ -122,11 +122,13 @@ It is allowed to mix single values and vectors if multiple transformations are requested. In this case single value will be repeated to match the length of columns specified by returned vectors. -A separate task is spawned for each specified transformation, allowing for -parallel operation when several transformations are requested and Julia was -started with more than one thread. Passed transformation functions should -therefore not modify global variables (i.e. they should be pure), or use -locks to control parallel accesses. +A separate task is spawned for each specified transformation; each transformation +then spawns as many tasks as Julia threads, and splits processing of groups across +them (however, currently transformations with optimized implementations like `sum` +and transformations that return multiple rows use a single task for all groups). +This allows for parallel operation when Julia was started with more than one +thread. Passed transformation functions should therefore not modify global variables +(i.e. they should be pure), or use locks to control parallel accesses. To apply `function` to each row instead of whole columns, it can be wrapped in a `ByRow` struct. `cols` can be any column indexing syntax, in which case diff --git a/src/abstractdataframe/selection.jl b/src/abstractdataframe/selection.jl index 3e93a3b749..cd0b1934be 100644 --- a/src/abstractdataframe/selection.jl +++ b/src/abstractdataframe/selection.jl @@ -145,10 +145,13 @@ const TRANSFORMATION_COMMON_RULES = `copycols=false`, a `SubDataFrame` is returned without copying columns. If a `GroupedDataFrame` is passed, a separate task is spawned for each - specified transformation, allowing for parallel operation when several - transformations are requested and Julia was started with more than one thread. - Passed transformation functions should therefore not modify global variables - (i.e. they should be pure), or use locks to control parallel accesses. + specified transformation; each transformation then spawns as many tasks + as Julia threads, and splits processing of groups across them + (however, currently transformations with optimized implementations like `sum` + and transformations that return multiple rows use a single task for all groups). + This allows for parallel operation when Julia was started with more than one + thread. Passed transformation functions should therefore not modify global + variables (i.e. they should be pure), or use locks to control parallel accesses. In the future, parallelism may be extended to other cases, so this requirement also holds for `DataFrame` inputs. """ diff --git a/src/groupeddataframe/complextransforms.jl b/src/groupeddataframe/complextransforms.jl index 8db068c398..5d4a157ce7 100644 --- a/src/groupeddataframe/complextransforms.jl +++ b/src/groupeddataframe/complextransforms.jl @@ -57,7 +57,7 @@ function _combine_with_first(first::Union{NamedTuple, DataFrameRow, AbstractData f, gd, incols, targetcolnames, firstmulticol) else - outcols, finalcolnames = _combine_rows_with_first!(first, initialcols, 1, 1, + outcols, finalcolnames = _combine_rows_with_first!(first, initialcols, f, gd, incols, targetcolnames, firstmulticol) end @@ -92,9 +92,126 @@ function fill_row!(row, outcols::NTuple{N, AbstractVector}, return nothing end -function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow}, +function _combine_rows_with_first_task!(tid::Integer, + rowstart::Integer, + rowend::Integer, + rownext::Integer, + outcols::NTuple{<:Any, AbstractVector}, + outcolsref::Ref{NTuple{<:Any, AbstractVector}}, + type_widened::AbstractVector{Bool}, + widen_type_lock::ReentrantLock, + f::Base.Callable, + gd::GroupedDataFrame, + starts::AbstractVector{<:Integer}, + ends::AbstractVector{<:Integer}, + incols::Union{Nothing, AbstractVector, + Tuple, NamedTuple}, + colnames::NTuple{<:Any, Symbol}, + firstmulticol::Val) + j = nothing + gdidx = gd.idx + local newoutcols + for i in rownext:rowend + row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol) + j = fill_row!(row, outcols, i, 1, colnames) + if j !== nothing # Need to widen column + # If another thread is already widening outcols, wait until it's done + lock(widen_type_lock) + try + newoutcols = outcolsref[] + # Workaround for julia#15276 + newoutcols = let i=i, j=j, newoutcols=newoutcols, row=row + ntuple(length(newoutcols)) do k + S = typeof(row[k]) + T = eltype(newoutcols[k]) + U = promote_type(S, T) + if S <: T || U <: T + newoutcols[k] + else + type_widened .= true + Tables.allocatecolumn(U, length(newoutcols[k])) + end + end + end + for k in 1:length(outcols) + if outcols[k] !== newoutcols[k] + copyto!(newoutcols[k], rowstart, + outcols[k], rowstart, i - rowstart + (k < j)) + end + end + j = fill_row!(row, newoutcols, i, j, colnames) + @assert j === nothing # eltype is guaranteed to match + + outcolsref[] = newoutcols + type_widened[tid] = false + finally + unlock(widen_type_lock) + end + return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref, + type_widened, widen_type_lock, + f, gd, starts, ends, + incols, colnames, firstmulticol) + end + # If other thread widened columns, copy already processed data to new vectors + # This doesn't have to happen immediately (hence type_widened isn't atomic), + # but the more we wait the more data will have to be copied + if type_widened[tid] + lock(widen_type_lock) do + type_widened[tid] = false + newoutcols = outcolsref[] + for k in 1:length(outcols) + # Check whether this particular column has been widened + if outcols[k] !== newoutcols[k] + copyto!(newoutcols[k], rowstart, + outcols[k], rowstart, i - rowstart + 1) + end + end + end + return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref, + type_widened, widen_type_lock, + f, gd, starts, ends, + incols, colnames, firstmulticol) + end + end + return outcols +end + +# CategoricalArray is thread-safe only when input and output levels are equal +# since then all values are added upfront. Otherwise, if the function returns +# CategoricalValues mixed from different pools, one thread may add values, +# which may put the invpool in an invalid state while the other one is reading from it +function isthreadsafe(outcols::NTuple{<:Any, AbstractVector}, + incols::Union{Tuple, NamedTuple}) + anycat = any(outcols) do col + T = typeof(col) + # If the first result is missing, widening can give a CategoricalArray + # if later results are CategoricalValues + eltype(col) === Missing || + (nameof(T) === :CategoricalArray && + nameof(parentmodule(T)) === :CategoricalArrays) + end + if anycat + levs = nothing + for col in incols + T = typeof(col) + if nameof(T) === :CategoricalArray && + nameof(parentmodule(T)) === :CategoricalArrays + if levs !== nothing + levs == levels(col) || return false + else + levs = levels(col) + end + end + end + end + return true +end +isthreadsafe(outcols::NTuple{<:Any, AbstractVector}, incols::AbstractVector) = + isthreadsafe(outcols, (incols,)) +isthreadsafe(outcols::NTuple{<:Any, AbstractVector}, incols::Nothing) = true + +function _combine_rows_with_first!(firstrow::Union{NamedTuple, DataFrameRow}, outcols::NTuple{N, AbstractVector}, - rowstart::Integer, colstart::Integer, f::Base.Callable, gd::GroupedDataFrame, incols::Union{Nothing, AbstractVector, Tuple, NamedTuple}, colnames::NTuple{N, Symbol}, @@ -108,31 +225,64 @@ function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow}, len == 0 && return outcols, colnames # Handle first group - j = fill_row!(first, outcols, rowstart, colstart, colnames) - @assert j === nothing # eltype is guaranteed to match - # Handle remaining groups - @inbounds for i in rowstart+1:len - row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol) - j = fill_row!(row, outcols, i, 1, colnames) - if j !== nothing # Need to widen column type - local newcols - let i = i, j = j, outcols=outcols, row=row # Workaround for julia#15276 - newcols = ntuple(length(outcols)) do k - S = typeof(row[k]) - T = eltype(outcols[k]) - U = promote_type(S, T) - if S <: T || U <: T - outcols[k] - else - copyto!(Tables.allocatecolumn(U, length(outcols[k])), - 1, outcols[k], 1, k >= j ? i-1 : i) - end + j1 = fill_row!(firstrow, outcols, 1, 1, colnames) + @assert j1 === nothing # eltype is guaranteed to match + + # Handle groups other than the first one + # Create up to one task per thread + # This has lower overhead than creating one task per group, + # but is optimal only if operations take roughly the same time for all groups + if VERSION >= v"1.4" && isthreadsafe(outcols, incols) + basesize = max(1, cld(len - 1, Threads.nthreads())) + partitions = Iterators.partition(2:len, basesize) + else + partitions = (2:len,) + end + widen_type_lock = ReentrantLock() + outcolsref = Ref{NTuple{<:Any, AbstractVector}}(outcols) + type_widened = fill(false, length(partitions)) + tasks = Vector{Task}(undef, length(partitions)) + for (tid, idx) in enumerate(partitions) + tasks[tid] = + @spawn _combine_rows_with_first_task!(tid, first(idx), last(idx), first(idx), + outcols, outcolsref, + type_widened, widen_type_lock, + f, gd, starts, ends, incols, colnames, + firstmulticol) + end + + # Workaround JuliaLang/julia#38931: + # we want to preserve the exception type thrown in user code, + # and print the backtrace corresponding to it + for t in tasks + try + wait(t) + catch e + throw(t.exception) + end + end + + # Copy data for any tasks that finished before others widened columns + oldoutcols = outcols + outcols = outcolsref[] + if outcols !== oldoutcols # first group + for k in 1:length(outcols) + outcols[k][1] = oldoutcols[k][1] + end + end + for (tid, idx) in enumerate(partitions) + if type_widened[tid] + oldoutcols = fetch(tasks[tid]) + for k in 1:length(outcols) + # Check whether this particular column has been widened + if oldoutcols[k] !== outcols[k] + copyto!(outcols[k], first(idx), oldoutcols[k], first(idx), + last(idx) - first(idx) + 1) end end - return _combine_rows_with_first!(row, newcols, i, j, - f, gd, incols, colnames, firstmulticol) end end + return outcols, colnames end diff --git a/test/grouping.jl b/test/grouping.jl index 3551766845..f46aa6f87d 100644 --- a/test/grouping.jl +++ b/test/grouping.jl @@ -3761,4 +3761,74 @@ end end end +@testset "result eltype widening from different tasks" begin + if VERSION < v"1.5" + Base.convert(::Type{Union{Missing, Nothing, Float64}}, x::Int) = float(x) + end + Random.seed!(1) + for y in (Any[1, missing, missing, 2, 4], + Any[1, missing, nothing, 2.1, 'a'], + Any[1, 1, missing, 1, nothing, 1, 2.1, 1, 'a'], + Any[1, 2, 3, 4, 5, 6, 2.1, missing, 'a'], + Any[1, 2, 3.1, 4, 5, 6, 2.1, missing, 'a']), + x in (1:length(y), rand(1:2, length(y)), rand(1:3, length(y))) + df = DataFrame(x=x, y1=y, y2=reverse(y)) + gd = groupby(df, :x) + res = combine(gd, :y1 => (y -> y[1]) => :y1, :y2 => (y -> y[end]) => :y2) + # sleep ensures one task will widen the result after the other is done, + # so that data has to be copied at the end + @test res ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep((x == [5])/10); y[1])) => :y1, + [:x, :y2] => ((x, y) -> (sleep((x == [5])/10); y[end])) => :y2) ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep(x[1]/100); y[1])) => :y1, + [:x, :y2] => ((x, y) -> (sleep(x[1]/100); y[end])) => :y2) ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep(rand()/10); y[1])) => :y1, + [:x, :y2] => ((x, y) -> (sleep(rand()/10); y[end])) => :y2) + + if df.x == 1:nrow(df) + @test res ≅ df + end + + res = combine(gd, :y1 => (y -> (y1=y[1], y2=y[end])) => AsTable, + :y2 => (y -> (y3=y[1], y4=y[end])) => AsTable) + # sleep ensures one task will widen the result after the other is done, + # so that data has to be copied at the end + @test res ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep((x == [5])/10); (y1=y[1], y2=y[end]))) => AsTable, + [:x, :y2] => ((x, y) -> (sleep((x == [5])/10); (y3=y[1], y4=y[end]))) => AsTable) ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep(x[1]/100); (y1=y[1], y2=y[end]))) => AsTable, + [:x, :y2] => ((x, y) -> (sleep(x[1]/100); (y3=y[1], y4=y[end]))) => AsTable) ≅ + combine(gd, [:x, :y1] => ((x, y) -> (sleep(rand()/10); (y1=y[1], y2=y[end]))) => AsTable, + [:x, :y2] => ((x, y) -> (sleep(rand()/10); (y3=y[1], y4=y[end]))) => AsTable) + end +end + +@testset "CategoricalArray thread safety" begin + # These tests do not actually trigger multithreading bugs, + # but at least they check that the code that disables multithreading + # with CategoricalArray when levels are different works + Random.seed!(35) + df = DataFrame(x=rand(1:10, 100), + y=categorical(rand(10:15, 100)), + z=categorical(rand(0:20, 100))) + df.y2 = reverse(df.y) # Same levels + gd = groupby(df, :x) + + @test combine(gd, :y => (y -> y[1]) => :res) == + combine(gd, [:y, :y2] => ((y, x) -> y[1]) => :res) == + combine(gd, [:y, :x] => ((y, x) -> y[1]) => :res) == + combine(gd, [:y, :z] => ((y, z) -> y[1]) => :res) == + combine(gd, :y => (y -> get(y[1])) => :res) + + @test combine(gd, [:x, :y, :y2] => + ((x, y, y2) -> x[1] <= 5 ? y[1] : y2[1]) => :res) == + combine(gd, [:x, :y, :y2] => + ((x, y, y2) -> x[1] <= 5 ? get(y[1]) : get(y2[1])) => :res) + + @test combine(gd, [:x, :y, :z] => + ((x, y, z) -> x[1] <= 5 ? y[1] : z[1]) => :res) == + combine(gd, [:x, :y, :z] => + ((x, y, z) -> x[1] <= 5 ? get(y[1]) : get(z[1])) => :res) +end + end # module diff --git a/test/runtests.jl b/test/runtests.jl index ca9e1fd87e..a2f9ff1fc2 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -8,8 +8,12 @@ anyerrors = false using DataFrames, Dates, Test, Random -if VERSION > v"1.3" && Threads.nthreads() < 2 - @warn("Running with only one thread: correctness of parallel operations is not tested") +if VERSION > v"1.3" + if Threads.nthreads() < 2 + @warn("Running with only one thread: correctness of parallel operations is not tested") + else + @show Threads.nthreads() + end end my_tests = ["utils.jl",