Skip to content

Commit

Permalink
Multithreaded custom grouped operations with single-row result (#2588)
Browse files Browse the repository at this point in the history
Spawn one task per thread in `_combine_rows_with_first!` so that custom grouped
operations that return a single row are run in parallel. This is optimal if
operations take about the same time for all groups. Spawning one task per group
could be faster if these times vary a lot, but the overhead would be larger:
we could add this as an option later.

The implementation is somewhat tricky as output columns need to be reallocated
when a new return type is detected.
  • Loading branch information
nalimilan authored Feb 10, 2021
1 parent 2e3f1b0 commit ecfc733
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 35 deletions.
12 changes: 7 additions & 5 deletions docs/src/man/split_apply_combine.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,13 @@ It is allowed to mix single values and vectors if multiple transformations
are requested. In this case single value will be repeated to match the length
of columns specified by returned vectors.

A separate task is spawned for each specified transformation, allowing for
parallel operation when several transformations are requested and Julia was
started with more than one thread. Passed transformation functions should
therefore not modify global variables (i.e. they should be pure), or use
locks to control parallel accesses.
A separate task is spawned for each specified transformation; each transformation
then spawns as many tasks as Julia threads, and splits processing of groups across
them (however, currently transformations with optimized implementations like `sum`
and transformations that return multiple rows use a single task for all groups).
This allows for parallel operation when Julia was started with more than one
thread. Passed transformation functions should therefore not modify global variables
(i.e. they should be pure), or use locks to control parallel accesses.

To apply `function` to each row instead of whole columns, it can be wrapped in a
`ByRow` struct. `cols` can be any column indexing syntax, in which case
Expand Down
11 changes: 7 additions & 4 deletions src/abstractdataframe/selection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,13 @@ const TRANSFORMATION_COMMON_RULES =
`copycols=false`, a `SubDataFrame` is returned without copying columns.
If a `GroupedDataFrame` is passed, a separate task is spawned for each
specified transformation, allowing for parallel operation when several
transformations are requested and Julia was started with more than one thread.
Passed transformation functions should therefore not modify global variables
(i.e. they should be pure), or use locks to control parallel accesses.
specified transformation; each transformation then spawns as many tasks
as Julia threads, and splits processing of groups across them
(however, currently transformations with optimized implementations like `sum`
and transformations that return multiple rows use a single task for all groups).
This allows for parallel operation when Julia was started with more than one
thread. Passed transformation functions should therefore not modify global
variables (i.e. they should be pure), or use locks to control parallel accesses.
In the future, parallelism may be extended to other cases, so this requirement
also holds for `DataFrame` inputs.
"""
Expand Down
198 changes: 174 additions & 24 deletions src/groupeddataframe/complextransforms.jl
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function _combine_with_first(first::Union{NamedTuple, DataFrameRow, AbstractData
f, gd, incols, targetcolnames,
firstmulticol)
else
outcols, finalcolnames = _combine_rows_with_first!(first, initialcols, 1, 1,
outcols, finalcolnames = _combine_rows_with_first!(first, initialcols,
f, gd, incols, targetcolnames,
firstmulticol)
end
Expand Down Expand Up @@ -92,9 +92,126 @@ function fill_row!(row, outcols::NTuple{N, AbstractVector},
return nothing
end

function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow},
function _combine_rows_with_first_task!(tid::Integer,
rowstart::Integer,
rowend::Integer,
rownext::Integer,
outcols::NTuple{<:Any, AbstractVector},
outcolsref::Ref{NTuple{<:Any, AbstractVector}},
type_widened::AbstractVector{Bool},
widen_type_lock::ReentrantLock,
f::Base.Callable,
gd::GroupedDataFrame,
starts::AbstractVector{<:Integer},
ends::AbstractVector{<:Integer},
incols::Union{Nothing, AbstractVector,
Tuple, NamedTuple},
colnames::NTuple{<:Any, Symbol},
firstmulticol::Val)
j = nothing
gdidx = gd.idx
local newoutcols
for i in rownext:rowend
row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol)
j = fill_row!(row, outcols, i, 1, colnames)
if j !== nothing # Need to widen column
# If another thread is already widening outcols, wait until it's done
lock(widen_type_lock)
try
newoutcols = outcolsref[]
# Workaround for julia#15276
newoutcols = let i=i, j=j, newoutcols=newoutcols, row=row
ntuple(length(newoutcols)) do k
S = typeof(row[k])
T = eltype(newoutcols[k])
U = promote_type(S, T)
if S <: T || U <: T
newoutcols[k]
else
type_widened .= true
Tables.allocatecolumn(U, length(newoutcols[k]))
end
end
end
for k in 1:length(outcols)
if outcols[k] !== newoutcols[k]
copyto!(newoutcols[k], rowstart,
outcols[k], rowstart, i - rowstart + (k < j))
end
end
j = fill_row!(row, newoutcols, i, j, colnames)
@assert j === nothing # eltype is guaranteed to match

outcolsref[] = newoutcols
type_widened[tid] = false
finally
unlock(widen_type_lock)
end
return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref,
type_widened, widen_type_lock,
f, gd, starts, ends,
incols, colnames, firstmulticol)
end
# If other thread widened columns, copy already processed data to new vectors
# This doesn't have to happen immediately (hence type_widened isn't atomic),
# but the more we wait the more data will have to be copied
if type_widened[tid]
lock(widen_type_lock) do
type_widened[tid] = false
newoutcols = outcolsref[]
for k in 1:length(outcols)
# Check whether this particular column has been widened
if outcols[k] !== newoutcols[k]
copyto!(newoutcols[k], rowstart,
outcols[k], rowstart, i - rowstart + 1)
end
end
end
return _combine_rows_with_first_task!(tid, rowstart, rowend, i+1, newoutcols, outcolsref,
type_widened, widen_type_lock,
f, gd, starts, ends,
incols, colnames, firstmulticol)
end
end
return outcols
end

# CategoricalArray is thread-safe only when input and output levels are equal
# since then all values are added upfront. Otherwise, if the function returns
# CategoricalValues mixed from different pools, one thread may add values,
# which may put the invpool in an invalid state while the other one is reading from it
function isthreadsafe(outcols::NTuple{<:Any, AbstractVector},
incols::Union{Tuple, NamedTuple})
anycat = any(outcols) do col
T = typeof(col)
# If the first result is missing, widening can give a CategoricalArray
# if later results are CategoricalValues
eltype(col) === Missing ||
(nameof(T) === :CategoricalArray &&
nameof(parentmodule(T)) === :CategoricalArrays)
end
if anycat
levs = nothing
for col in incols
T = typeof(col)
if nameof(T) === :CategoricalArray &&
nameof(parentmodule(T)) === :CategoricalArrays
if levs !== nothing
levs == levels(col) || return false
else
levs = levels(col)
end
end
end
end
return true
end
isthreadsafe(outcols::NTuple{<:Any, AbstractVector}, incols::AbstractVector) =
isthreadsafe(outcols, (incols,))
isthreadsafe(outcols::NTuple{<:Any, AbstractVector}, incols::Nothing) = true

function _combine_rows_with_first!(firstrow::Union{NamedTuple, DataFrameRow},
outcols::NTuple{N, AbstractVector},
rowstart::Integer, colstart::Integer,
f::Base.Callable, gd::GroupedDataFrame,
incols::Union{Nothing, AbstractVector, Tuple, NamedTuple},
colnames::NTuple{N, Symbol},
Expand All @@ -108,31 +225,64 @@ function _combine_rows_with_first!(first::Union{NamedTuple, DataFrameRow},
len == 0 && return outcols, colnames

# Handle first group
j = fill_row!(first, outcols, rowstart, colstart, colnames)
@assert j === nothing # eltype is guaranteed to match
# Handle remaining groups
@inbounds for i in rowstart+1:len
row = wrap_row(do_call(f, gdidx, starts, ends, gd, incols, i), firstmulticol)
j = fill_row!(row, outcols, i, 1, colnames)
if j !== nothing # Need to widen column type
local newcols
let i = i, j = j, outcols=outcols, row=row # Workaround for julia#15276
newcols = ntuple(length(outcols)) do k
S = typeof(row[k])
T = eltype(outcols[k])
U = promote_type(S, T)
if S <: T || U <: T
outcols[k]
else
copyto!(Tables.allocatecolumn(U, length(outcols[k])),
1, outcols[k], 1, k >= j ? i-1 : i)
end
j1 = fill_row!(firstrow, outcols, 1, 1, colnames)
@assert j1 === nothing # eltype is guaranteed to match

# Handle groups other than the first one
# Create up to one task per thread
# This has lower overhead than creating one task per group,
# but is optimal only if operations take roughly the same time for all groups
if VERSION >= v"1.4" && isthreadsafe(outcols, incols)
basesize = max(1, cld(len - 1, Threads.nthreads()))
partitions = Iterators.partition(2:len, basesize)
else
partitions = (2:len,)
end
widen_type_lock = ReentrantLock()
outcolsref = Ref{NTuple{<:Any, AbstractVector}}(outcols)
type_widened = fill(false, length(partitions))
tasks = Vector{Task}(undef, length(partitions))
for (tid, idx) in enumerate(partitions)
tasks[tid] =
@spawn _combine_rows_with_first_task!(tid, first(idx), last(idx), first(idx),
outcols, outcolsref,
type_widened, widen_type_lock,
f, gd, starts, ends, incols, colnames,
firstmulticol)
end

# Workaround JuliaLang/julia#38931:
# we want to preserve the exception type thrown in user code,
# and print the backtrace corresponding to it
for t in tasks
try
wait(t)
catch e
throw(t.exception)
end
end

# Copy data for any tasks that finished before others widened columns
oldoutcols = outcols
outcols = outcolsref[]
if outcols !== oldoutcols # first group
for k in 1:length(outcols)
outcols[k][1] = oldoutcols[k][1]
end
end
for (tid, idx) in enumerate(partitions)
if type_widened[tid]
oldoutcols = fetch(tasks[tid])
for k in 1:length(outcols)
# Check whether this particular column has been widened
if oldoutcols[k] !== outcols[k]
copyto!(outcols[k], first(idx), oldoutcols[k], first(idx),
last(idx) - first(idx) + 1)
end
end
return _combine_rows_with_first!(row, newcols, i, j,
f, gd, incols, colnames, firstmulticol)
end
end

return outcols, colnames
end

Expand Down
70 changes: 70 additions & 0 deletions test/grouping.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3761,4 +3761,74 @@ end
end
end

@testset "result eltype widening from different tasks" begin
if VERSION < v"1.5"
Base.convert(::Type{Union{Missing, Nothing, Float64}}, x::Int) = float(x)
end
Random.seed!(1)
for y in (Any[1, missing, missing, 2, 4],
Any[1, missing, nothing, 2.1, 'a'],
Any[1, 1, missing, 1, nothing, 1, 2.1, 1, 'a'],
Any[1, 2, 3, 4, 5, 6, 2.1, missing, 'a'],
Any[1, 2, 3.1, 4, 5, 6, 2.1, missing, 'a']),
x in (1:length(y), rand(1:2, length(y)), rand(1:3, length(y)))
df = DataFrame(x=x, y1=y, y2=reverse(y))
gd = groupby(df, :x)
res = combine(gd, :y1 => (y -> y[1]) => :y1, :y2 => (y -> y[end]) => :y2)
# sleep ensures one task will widen the result after the other is done,
# so that data has to be copied at the end
@test res
combine(gd, [:x, :y1] => ((x, y) -> (sleep((x == [5])/10); y[1])) => :y1,
[:x, :y2] => ((x, y) -> (sleep((x == [5])/10); y[end])) => :y2)
combine(gd, [:x, :y1] => ((x, y) -> (sleep(x[1]/100); y[1])) => :y1,
[:x, :y2] => ((x, y) -> (sleep(x[1]/100); y[end])) => :y2)
combine(gd, [:x, :y1] => ((x, y) -> (sleep(rand()/10); y[1])) => :y1,
[:x, :y2] => ((x, y) -> (sleep(rand()/10); y[end])) => :y2)

if df.x == 1:nrow(df)
@test res df
end

res = combine(gd, :y1 => (y -> (y1=y[1], y2=y[end])) => AsTable,
:y2 => (y -> (y3=y[1], y4=y[end])) => AsTable)
# sleep ensures one task will widen the result after the other is done,
# so that data has to be copied at the end
@test res
combine(gd, [:x, :y1] => ((x, y) -> (sleep((x == [5])/10); (y1=y[1], y2=y[end]))) => AsTable,
[:x, :y2] => ((x, y) -> (sleep((x == [5])/10); (y3=y[1], y4=y[end]))) => AsTable)
combine(gd, [:x, :y1] => ((x, y) -> (sleep(x[1]/100); (y1=y[1], y2=y[end]))) => AsTable,
[:x, :y2] => ((x, y) -> (sleep(x[1]/100); (y3=y[1], y4=y[end]))) => AsTable)
combine(gd, [:x, :y1] => ((x, y) -> (sleep(rand()/10); (y1=y[1], y2=y[end]))) => AsTable,
[:x, :y2] => ((x, y) -> (sleep(rand()/10); (y3=y[1], y4=y[end]))) => AsTable)
end
end

@testset "CategoricalArray thread safety" begin
# These tests do not actually trigger multithreading bugs,
# but at least they check that the code that disables multithreading
# with CategoricalArray when levels are different works
Random.seed!(35)
df = DataFrame(x=rand(1:10, 100),
y=categorical(rand(10:15, 100)),
z=categorical(rand(0:20, 100)))
df.y2 = reverse(df.y) # Same levels
gd = groupby(df, :x)

@test combine(gd, :y => (y -> y[1]) => :res) ==
combine(gd, [:y, :y2] => ((y, x) -> y[1]) => :res) ==
combine(gd, [:y, :x] => ((y, x) -> y[1]) => :res) ==
combine(gd, [:y, :z] => ((y, z) -> y[1]) => :res) ==
combine(gd, :y => (y -> get(y[1])) => :res)

@test combine(gd, [:x, :y, :y2] =>
((x, y, y2) -> x[1] <= 5 ? y[1] : y2[1]) => :res) ==
combine(gd, [:x, :y, :y2] =>
((x, y, y2) -> x[1] <= 5 ? get(y[1]) : get(y2[1])) => :res)

@test combine(gd, [:x, :y, :z] =>
((x, y, z) -> x[1] <= 5 ? y[1] : z[1]) => :res) ==
combine(gd, [:x, :y, :z] =>
((x, y, z) -> x[1] <= 5 ? get(y[1]) : get(z[1])) => :res)
end

end # module
8 changes: 6 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ anyerrors = false

using DataFrames, Dates, Test, Random

if VERSION > v"1.3" && Threads.nthreads() < 2
@warn("Running with only one thread: correctness of parallel operations is not tested")
if VERSION > v"1.3"
if Threads.nthreads() < 2
@warn("Running with only one thread: correctness of parallel operations is not tested")
else
@show Threads.nthreads()
end
end

my_tests = ["utils.jl",
Expand Down

0 comments on commit ecfc733

Please sign in to comment.