Skip to content

Commit

Permalink
[FEA] Support named aggregations in df.groupby().agg() (#16528)
Browse files Browse the repository at this point in the history
Closes #15967

Authors:
  - Matthew Murray (https://github.com/Matt711)

Approvers:
  - Matthew Roeschke (https://github.com/mroeschke)

URL: #16528
  • Loading branch information
Matt711 authored Aug 15, 2024
1 parent d684ae0 commit 0253e97
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 21 deletions.
7 changes: 2 additions & 5 deletions python/cudf/cudf/core/column_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ def _pad_key(self, key: Any, pad_value="") -> Any:
return key + (pad_value,) * (self.nlevels - len(key))

def rename_levels(
self, mapper: Mapping[Any, Any] | Callable, level: int | None
self, mapper: Mapping[Any, Any] | Callable, level: int | None = None
) -> ColumnAccessor:
"""
Rename the specified levels of the given ColumnAccessor
Expand Down Expand Up @@ -653,10 +653,7 @@ def rename_column(x):
return x

if level is None:
raise NotImplementedError(
"Renaming columns with a MultiIndex and level=None is"
"not supported"
)
level = 0
new_col_names = (rename_column(k) for k in self.keys())

else:
Expand Down
41 changes: 26 additions & 15 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ def _groupby(self):
)

@_performance_tracking
def agg(self, func, *args, engine=None, engine_kwargs=None, **kwargs):
def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs):
"""
Apply aggregation(s) to the groups.
Expand Down Expand Up @@ -648,11 +648,10 @@ def agg(self, func, *args, engine=None, engine_kwargs=None, **kwargs):
raise NotImplementedError(
"Passing args to func is currently not supported."
)
if kwargs:
raise NotImplementedError(
"Passing kwargs to func is currently not supported."
)
column_names, columns, normalized_aggs = self._normalize_aggs(func)

column_names, columns, normalized_aggs = self._normalize_aggs(
func, **kwargs
)
orig_dtypes = tuple(c.dtype for c in columns)

# Note: When there are no key columns, the below produces
Expand Down Expand Up @@ -1266,11 +1265,11 @@ def _grouped(self, *, include_groups: bool = True):
return (group_names, offsets, grouped_keys, grouped_values)

def _normalize_aggs(
self, aggs: MultiColumnAggType
self, aggs: MultiColumnAggType, **kwargs
) -> tuple[Iterable[Any], tuple[ColumnBase, ...], list[list[AggType]]]:
"""
Normalize aggs to a list of list of aggregations, where `out[i]`
is a list of aggregations for column `self.obj[i]`. We support three
is a list of aggregations for column `self.obj[i]`. We support four
different form of `aggs` input here:
- A single agg, such as "sum". This agg is applied to all value
columns.
Expand All @@ -1279,18 +1278,30 @@ def _normalize_aggs(
- A mapping of column name to aggs, such as
{"a": ["sum"], "b": ["mean"]}, the aggs are applied to specified
column.
- Pairs of column name and agg tuples passed as kwargs
eg. col1=("a", "sum"), col2=("b", "prod"). The output column names are
the keys. The aggs are applied to the corresponding column in the tuple.
Each agg can be string or lambda functions.
"""

aggs_per_column: Iterable[AggType | Iterable[AggType]]
if isinstance(aggs, dict):
column_names, aggs_per_column = aggs.keys(), aggs.values()
columns = tuple(self.obj._data[col] for col in column_names)
# TODO: Remove isinstance condition when the legacy dask_cudf API is removed.
# See https://github.com/rapidsai/cudf/pull/16528#discussion_r1715482302 for information.
if aggs or isinstance(aggs, dict):
if isinstance(aggs, dict):
column_names, aggs_per_column = aggs.keys(), aggs.values()
columns = tuple(self.obj._data[col] for col in column_names)
else:
values = self.grouping.values
column_names = values._column_names
columns = values._columns
aggs_per_column = (aggs,) * len(columns)
elif not aggs and kwargs:
column_names, aggs_per_column = kwargs.keys(), kwargs.values()
columns = tuple(self.obj._data[x[0]] for x in kwargs.values())
aggs_per_column = tuple(x[1] for x in kwargs.values())
else:
values = self.grouping.values
column_names = values._column_names
columns = values._columns
aggs_per_column = (aggs,) * len(columns)
raise TypeError("Must provide at least one aggregation function.")

# is_list_like performs type narrowing but type-checkers don't
# know it. One could add a TypeGuard annotation to
Expand Down
30 changes: 30 additions & 0 deletions python/cudf/cudf/tests/groupby/test_agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

import cudf
from cudf.testing import assert_eq


@pytest.mark.parametrize(
Expand All @@ -26,3 +27,32 @@ def test_series_agg(attr):
pd_agg = getattr(pdf.groupby(["a"])["a"], attr)("count")

assert agg.ndim == pd_agg.ndim


@pytest.mark.parametrize("func", ["sum", "prod", "mean", "count"])
@pytest.mark.parametrize("attr", ["agg", "aggregate"])
def test_dataframe_agg(attr, func):
df = cudf.DataFrame({"a": [1, 2, 1, 2], "b": [0, 0, 0, 0]})
pdf = df.to_pandas()

agg = getattr(df.groupby("a"), attr)(func)
pd_agg = getattr(pdf.groupby(["a"]), attr)(func)

assert_eq(agg, pd_agg)

agg = getattr(df.groupby("a"), attr)({"b": func})
pd_agg = getattr(pdf.groupby(["a"]), attr)({"b": func})

assert_eq(agg, pd_agg)

agg = getattr(df.groupby("a"), attr)([func])
pd_agg = getattr(pdf.groupby(["a"]), attr)([func])

assert_eq(agg, pd_agg)

agg = getattr(df.groupby("a"), attr)(foo=("b", func), bar=("a", func))
pd_agg = getattr(pdf.groupby(["a"]), attr)(
foo=("b", func), bar=("a", func)
)

assert_eq(agg, pd_agg)
4 changes: 4 additions & 0 deletions python/cudf/cudf/tests/test_column_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ def test_replace_level_values_MultiColumn():
got = ca.rename_levels(mapper={"a": "f"}, level=0)
check_ca_equal(expect, got)

# passing without level kwarg assumes level=0
got = ca.rename_levels(mapper={"a": "f"})
check_ca_equal(expect, got)


def test_clear_nrows_empty_before():
ca = ColumnAccessor({})
Expand Down
1 change: 0 additions & 1 deletion python/cudf/cudf/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -9409,7 +9409,6 @@ def test_rename_for_level_RangeIndex_dataframe():
assert_eq(expect, got)


@pytest_xfail(reason="level=None not implemented yet")
def test_rename_for_level_is_None_MC():
gdf = cudf.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6], "c": [7, 8, 9]})
gdf.columns = pd.MultiIndex.from_tuples([("a", 1), ("a", 2), ("b", 1)])
Expand Down

0 comments on commit 0253e97

Please sign in to comment.