Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BREAKING] fix isagg to correctly use a fast path #2357

Merged
merged 8 commits into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 70 additions & 68 deletions src/groupeddataframe/splitapplycombine.jl
Original file line number Diff line number Diff line change
Expand Up @@ -764,38 +764,35 @@ struct Reduce{O, C, A} <: AbstractAggregate
end
Reduce(f, condf=nothing, adjust=nothing) = Reduce(f, condf, adjust, false)

check_aggregate(f::Any) = f
validate_aggregate(f::Any, col::AbstractVector) = false

check_aggregate(::typeof(sum)) = Reduce(Base.add_sum)
validate_aggregate(::typeof(sum), ::AbstractVector{<:Union{Missing, Number}}) = true

check_aggregate(::typeof(sum∘skipmissing)) = Reduce(Base.add_sum, !ismissing)
validate_aggregate(::typeof(sum∘skipmissing), ::AbstractVector{<:Union{Missing, Number}}) = true

check_aggregate(::typeof(prod)) = Reduce(Base.mul_prod)
validate_aggregate(::typeof(prod), ::AbstractVector{<:Union{Missing, Number}}) = true

check_aggregate(::typeof(prod∘skipmissing)) = Reduce(Base.mul_prod, !ismissing)
validate_aggregate(::typeof(prod∘skipmissing), ::AbstractVector{<:Union{Missing, Number}}) = true

check_aggregate(::typeof(maximum)) = Reduce(max)
validate_aggregate(::typeof(maximum), ::AbstractVector{<:Union{Missing, Real}}) = true

check_aggregate(::typeof(maximum∘skipmissing)) = Reduce(max, !ismissing, nothing, true)
validate_aggregate(::typeof(maximum∘skipmissing), ::AbstractVector{<:Union{Missing, Real}}) = true

check_aggregate(::typeof(minimum)) = Reduce(min)
validate_aggregate(::typeof(minimum), ::AbstractVector{<:Union{Missing, Real}}) = true

check_aggregate(::typeof(minimum∘skipmissing)) = Reduce(min, !ismissing, nothing, true)
validate_aggregate(::typeof(minimum∘skipmissing), ::AbstractVector{<:Union{Missing, Real}}) = true

check_aggregate(::typeof(mean)) = Reduce(Base.add_sum, nothing, /)
validate_aggregate(::typeof(mean), ::AbstractVector{<:Union{Missing, Number}}) = true

check_aggregate(::typeof(mean∘skipmissing)) = Reduce(Base.add_sum, !ismissing, /)
validate_aggregate(::typeof(mean∘skipmissing), ::AbstractVector{<:Union{Missing, Number}}) = true
check_aggregate(f::Any, ::AbstractVector) = f
check_aggregate(f::typeof(sum), ::AbstractVector{<:Union{Missing, Number}}) =
Reduce(Base.add_sum)
check_aggregate(f::typeof(sum∘skipmissing), ::AbstractVector{<:Union{Missing, Number}}) =
Reduce(Base.add_sum, !ismissing)
check_aggregate(f::typeof(prod), ::AbstractVector{<:Union{Missing, Number}}) =
Reduce(Base.mul_prod)
check_aggregate(f::typeof(prod∘skipmissing), ::AbstractVector{<:Union{Missing, Number}}) =
Reduce(Base.mul_prod, !ismissing)
check_aggregate(f::typeof(maximum),
::AbstractVector{<:Union{Missing, MULTI_COLS_TYPE, AbstractVector}}) = f
check_aggregate(f::typeof(maximum), v::AbstractVector{<:Union{Missing, Real}}) =
eltype(v) === Any ? f : Reduce(max)
check_aggregate(f::typeof(maximum∘skipmissing),
::AbstractVector{<:Union{Missing, MULTI_COLS_TYPE, AbstractVector}}) = f
check_aggregate(f::typeof(maximum∘skipmissing), v::AbstractVector{<:Union{Missing, Real}}) =
eltype(v) === Any ? f : Reduce(max, !ismissing, nothing, true)
check_aggregate(f::typeof(minimum),
::AbstractVector{<:Union{Missing, MULTI_COLS_TYPE, AbstractVector}}) = f
check_aggregate(f::typeof(minimum), v::AbstractVector{<:Union{Missing, Real}}) =
eltype(v) === Any ? f : Reduce(min)
check_aggregate(f::typeof(minimum∘skipmissing),
::AbstractVector{<:Union{Missing, MULTI_COLS_TYPE, AbstractVector}}) = f
check_aggregate(f::typeof(minimum∘skipmissing), v::AbstractVector{<:Union{Missing, Real}}) =
eltype(v) === Any ? f : Reduce(min, !ismissing, nothing, true)
check_aggregate(f::typeof(mean), ::AbstractVector{<:Union{Missing, Number}}) =
Reduce(Base.add_sum, nothing, /)
check_aggregate(f::typeof(mean∘skipmissing), ::AbstractVector{<:Union{Missing, Number}}) =
Reduce(Base.add_sum, !ismissing, /)

# Other aggregate functions which are not strictly reductions
struct Aggregate{F, C} <: AbstractAggregate
Expand All @@ -804,36 +801,31 @@ struct Aggregate{F, C} <: AbstractAggregate
end
Aggregate(f) = Aggregate(f, nothing)

check_aggregate(::typeof(var)) = Aggregate(var)
validate_aggregate(::typeof(var), ::AbstractVector{<:Union{Missing, Number}}) = true

check_aggregate(::typeof(var∘skipmissing)) = Aggregate(var, !ismissing)
validate_aggregate(::typeof(var∘skipmissing), ::AbstractVector{<:Union{Missing, Number}}) = true

check_aggregate(::typeof(std)) = Aggregate(std)
validate_aggregate(::typeof(std), ::AbstractVector{<:Union{Missing, Number}}) = true

check_aggregate(::typeof(std∘skipmissing)) = Aggregate(std, !ismissing)
validate_aggregate(::typeof(std∘skipmissing), ::AbstractVector{<:Union{Missing, Number}}) = true

check_aggregate(::typeof(first)) = Aggregate(first)
validate_aggregate(::typeof(first), v::AbstractVector) = eltype(v) === Any ? false : true
validate_aggregate(::typeof(first), ::AbstractVector{<:Union{Missing, MULTI_COLS_TYPE, AbstractVector}}) = false

check_aggregate(::typeof(first∘skipmissing)) = Aggregate(first, !ismissing)
validate_aggregate(::typeof(first∘skipmissing), v::AbstractVector) = eltype(v) === Any ? false : true
validate_aggregate(::typeof(first∘skipmissing), ::AbstractVector{<:Union{Missing, MULTI_COLS_TYPE, AbstractVector}}) = false

check_aggregate(::typeof(last)) = Aggregate(last)
validate_aggregate(::typeof(last), v::AbstractVector) = eltype(v) === Any ? false : true
validate_aggregate(::typeof(last), ::AbstractVector{<:Union{Missing, MULTI_COLS_TYPE, AbstractVector}}) = false

check_aggregate(::typeof(last∘skipmissing)) = Aggregate(last, !ismissing)
validate_aggregate(::typeof(last∘skipmissing), v::AbstractVector) = eltype(v) === Any ? false : true
validate_aggregate(::typeof(last∘skipmissing), ::AbstractVector{<:Union{Missing, MULTI_COLS_TYPE, AbstractVector}}) = false

check_aggregate(::typeof(length)) = Aggregate(length)
validate_aggregate(::typeof(length), ::AbstractVector) = true
check_aggregate(f::typeof(var), ::AbstractVector{<:Union{Missing, Number}}) =
Aggregate(var)
check_aggregate(f::typeof(var∘skipmissing), ::AbstractVector{<:Union{Missing, Number}}) =
Aggregate(var, !ismissing)
check_aggregate(f::typeof(std), ::AbstractVector{<:Union{Missing, Number}}) =
Aggregate(std)
check_aggregate(f::typeof(std∘skipmissing), ::AbstractVector{<:Union{Missing, Number}}) =
Aggregate(std, !ismissing)
check_aggregate(f::typeof(first), v::AbstractVector) =
eltype(v) === Any ? f : Aggregate(first)
check_aggregate(f::typeof(first),
::AbstractVector{<:Union{Missing, MULTI_COLS_TYPE, AbstractVector}}) = f
check_aggregate(f::typeof(first∘skipmissing), v::AbstractVector) =
eltype(v) === Any ? f : Aggregate(first, !ismissing)
check_aggregate(f::typeof(first∘skipmissing),
::AbstractVector{<:Union{Missing, MULTI_COLS_TYPE, AbstractVector}}) = f
check_aggregate(f::typeof(last), v::AbstractVector) =
eltype(v) === Any ? f : Aggregate(last)
check_aggregate(f::typeof(last),
::AbstractVector{<:Union{Missing, MULTI_COLS_TYPE, AbstractVector}}) = f
check_aggregate(f::typeof(last∘skipmissing), v::AbstractVector) =
eltype(v) === Any ? f : Aggregate(last, !ismissing)
check_aggregate(f::typeof(last∘skipmissing),
::AbstractVector{<:Union{Missing, MULTI_COLS_TYPE, AbstractVector}}) = f
check_aggregate(f::typeof(length), ::AbstractVector) = Aggregate(length)

# SkipMissing does not support length

Expand Down Expand Up @@ -1046,6 +1038,12 @@ groupreduce(f, op, condf::typeof(!ismissing), adjust, checkempty::Bool,
(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame) =
groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd)

# this definition is missing in Julia 1.0 LTS and is required by aggregation for var
# TODO: remove this when 1,0 is no longer LTS
bkamins marked this conversation as resolved.
Show resolved Hide resolved
if VERSION < v"1.1"
Base.zero(::Type{Missing}) = missing
end

function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame)
means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, incol, gd)
# !ismissing check is purely an optimization to avoid a copy later
Expand All @@ -1055,14 +1053,18 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra
T = real(eltype(means))
end
res = zeros(T, length(gd))
groupreduce!(res, (x, i) -> @inbounds(abs2(x - means[i])), +, agg.condf,
(x, l) -> l <= 1 ? oftype(x / (l-1), NaN) : x / (l-1),
false, incol, gd)
return groupreduce!(res, (x, i) -> @inbounds(abs2(x - means[i])), +, agg.condf,
(x, l) -> l <= 1 ? oftype(x / (l-1), NaN) : x / (l-1),
false, incol, gd)
end

function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame)
outcol = Aggregate(var, agg.condf)(incol, gd)
map!(sqrt, outcol, outcol)
if eltype(outcol) <: Union{Missing, Rational}
return sqrt.(outcol)
else
return map!(sqrt, outcol, outcol)
end
end

for f in (first, last)
Expand Down Expand Up @@ -1091,7 +1093,7 @@ function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedData
end

isagg((col, fun)::Pair, gdf::GroupedDataFrame) =
col isa ColumnIndex && validate_aggregate(fun, parent(gdf)[!, col])
col isa ColumnIndex && check_aggregate(fun, parent(gdf)[!, col]) isa AbstractAggregate

function _agg2idx_map_helper(idx, idx_agg)
agg2idx_map = fill(-1, length(idx))
Expand Down Expand Up @@ -1166,7 +1168,7 @@ function _combine(f::AbstractVector{<:Pair},
source_cols, fun = p
if length(gd) > 0 && isagg(p, gd)
incol = parentdf[!, source_cols]
agg = check_aggregate(last(p))
agg = check_aggregate(last(p), incol)
outcol = agg(incol, gd)
res[i] = idx_agg, outcol
elseif keeprows && fun === identity && !(source_cols isa AsTable)
Expand Down
46 changes: 26 additions & 20 deletions test/grouping.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ const ≅ = isequal
isequal_typed(df1::AbstractDataFrame, df2::AbstractDataFrame) =
isequal(df1, df2) && eltype.(eachcol(df1)) == eltype.(eachcol(df2))

"""Check if passed data frames are `isequal` and have the same types of columns"""
isequal_coltyped(df1::AbstractDataFrame, df2::AbstractDataFrame) =
isequal(df1, df2) && typeof.(eachcol(df1)) == typeof.(eachcol(df2))

"""Check that groups in gd are equal to provided data frames, ignoring order"""
function isequal_unordered(gd::GroupedDataFrame,
dfs::AbstractVector{<:AbstractDataFrame})
Expand Down Expand Up @@ -2691,7 +2695,7 @@ end
@test map(nrow, gdf) == [1, 1, 1]
end

@testset "isagg fix" begin
@testset "check isagg correctly uses fast path only when it should" begin
for fun in (sum, prod, mean, var, std, sum∘skipmissing, prod∘skipmissing,
mean∘skipmissing, var∘skipmissing, std∘skipmissing),
col in ([1, 2, 3], [big(1.5), big(2.5), big(3.5)], [1 + 0.5im, 2 + 0.5im, 3 + 0.5im],
Expand All @@ -2702,7 +2706,7 @@ end
Union{Missing,Real}[1, 1.5, missing],
Union{Missing,Number}[1, 1.5, missing], Any[1, 1.5, missing])
gdf = groupby_checked(DataFrame(g=[1, 1, 1], x=col), :g)
@test combine(gdf, :x => fun => :y)combine(gdf, :x => (x -> fun(x)) => :y)
@test isequal_coltyped(combine(gdf, :x => fun => :y), combine(gdf, :x => (x -> fun(x)) => :y))
end

for fun in (maximum, minimum, maximum∘skipmissing, minimum∘skipmissing),
Expand All @@ -2714,7 +2718,7 @@ end
Union{Missing,Real}[1, 1.5, missing],
Union{Missing,Number}[1, 1.5, missing], Any[1, 1.5, missing])
gdf = groupby_checked(DataFrame(g=[1, 1, 1], x=col), :g)
@test combine(gdf, :x => fun => :y)combine(gdf, :x => (x -> fun(x)) => :y)
@test isequal_coltyped(combine(gdf, :x => fun => :y), combine(gdf, :x => (x -> fun(x)) => :y))
end

for fun in (first, last, length, first∘skipmissing, last∘skipmissing),
Expand All @@ -2726,17 +2730,17 @@ end
Union{Missing,Real}[1, 1.5, missing],
Union{Missing,Number}[1, 1.5, missing], Any[1, 1.5, missing])
gdf = groupby_checked(DataFrame(g=[1, 1, 1], x=col), :g)
if fun isa typeof(last∘skipmissing)
# this is another hard corner case
if fun === last∘skipmissing
# corner case - it fails in slow path, but works in fast path
if eltype(col) === Any
@test_throws MethodError combine(gdf, :x => fun => :y)
else
@test combine(gdf, :x => fun => :y) ==
combine(groupby_checked(dropmissing(parent(gdf)), :g), :x => fun => :y)
@test isequal_coltyped(combine(gdf, :x => fun => :y),
combine(groupby_checked(dropmissing(parent(gdf)), :g), :x => fun => :y))
end
@test_throws MethodError combine(gdf, :x => (x -> fun(x)) => :y)
else
@test combine(gdf, :x => fun => :y)combine(gdf, :x => (x -> fun(x)) => :y)
@test isequal_coltyped(combine(gdf, :x => fun => :y), combine(gdf, :x => (x -> fun(x)) => :y))
end
end

Expand All @@ -2747,17 +2751,17 @@ end
@test_throws MethodError combine(gdf, :x => fun => :y)
@test_throws MethodError combine(gdf, :x => (x -> fun(x)) => :y)
else
@test combine(gdf, :x => fun => :y)combine(gdf, :x => (x -> fun(x)) => :y)
@test isequal_coltyped(combine(gdf, :x => fun => :y), combine(gdf, :x => (x -> fun(x)) => :y))
end
end

for fun in (sum∘skipmissing, mean∘skipmissing),
col in ([1:3, 4:6, 7:9], [1:3, 4:6, missing])
gdf = groupby_checked(DataFrame(g=[1, 1, 1], x=col), :g)
@test combine(gdf, :x => fun => :y)combine(gdf, :x => (x -> fun(x)) => :y)
@test isequal_coltyped(combine(gdf, :x => fun => :y), combine(gdf, :x => (x -> fun(x)) => :y))
end

# workaround https://github.com/JuliaLang/julia/issues/36979
# see https://github.com/JuliaLang/julia/issues/36979
for fun in (var∘skipmissing, std∘skipmissing),
col in ([1:3, 4:6, 7:9], [1:3, 4:6, missing])
gdf = groupby_checked(DataFrame(g=[1, 1, 1], x=col), :g)
Expand All @@ -2773,7 +2777,7 @@ end
@test_throws MethodError combine(gdf, :x => fun => :y)
@test_throws MethodError combine(gdf, :x => (x -> fun(x)) => :y)
else
@test combine(gdf, :x => fun => :y)combine(gdf, :x => (x -> fun(x)) => :y)
@test isequal_coltyped(combine(gdf, :x => fun => :y), combine(gdf, :x => (x -> fun(x)) => :y))
end
end

Expand All @@ -2794,14 +2798,16 @@ end
[DataFrame(ones(2,2)), DataFrame(zeros(2,2)), missing],
[(a=1, b=2), (a=3, b=4), (a=5, b=6)], [(a=1, b=2), (a=3, b=4), missing])
gdf = groupby_checked(DataFrame(g=[1, 1, 1], x=col), :g)
if fun isa typeof(length)
@test combine(gdf, :x => fun => :y) ≅ DataFrame(g=1, y=3)
@test combine(gdf, :x => (x -> fun(x)) => :y) ≅ DataFrame(g=1, y=3)
elseif (fun isa typeof(last) && ismissing(last(col))) ||
(fun isa Union{typeof(maximum), typeof(minimum)} && col ≅ [(a=1, b=2), (a=3, b=4), missing])
# this case is a hard problem what to do and probably we have to leave it as is
@test combine(gdf, :x => fun => :y) ≅ DataFrame(g=1, y=missing)
@test combine(gdf, :x => (x -> fun(x)) => :y) ≅ DataFrame(g=1, y=missing)
if fun === length
@test isequal_coltyped(combine(gdf, :x => fun => :y), DataFrame(g=1, y=3))
@test isequal_coltyped(combine(gdf, :x => (x -> fun(x)) => :y), DataFrame(g=1, y=3))
elseif (fun === last && ismissing(last(col))) ||
(fun in (maximum, minimum) && col ≅ [(a=1, b=2), (a=3, b=4), missing])
# this case is a situation when the vector type would not be accepted in
# general as it contains entries that we do not allow but accidentally
# its last element is accepted because it is missing
@test isequal_coltyped(combine(gdf, :x => fun => :y), DataFrame(g=1, y=missing))
@test isequal_coltyped(combine(gdf, :x => (x -> fun(x)) => :y), DataFrame(g=1, y=missing))
else
@test_throws Union{ArgumentError, MethodError} combine(gdf, :x => fun => :y)
@test_throws Union{ArgumentError, MethodError} combine(gdf, :x => (x -> fun(x)) => :y)
Expand Down