Skip to content

Commit 5c9398c

Browse files
Merged groupby_agg and groupby_dict_agg to implement dictionary functions aggregations (#2317)
* FIX-#2254: Added dictionary functions to groupby aggregate tests Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com> * FIX-#2254: Initial implementation of dictionary functions aggregation Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com> * FIX-#2254: Remove lambda wrapper to allow dictionary to go to backend Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com> * FIX-#2254: Fixed AttributeError not being thrown from getattr Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com> * FIX-#2254: Lint fixes Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com> * FEAT-#2363: fix index name setter in OmniSci backend Signed-off-by: ienkovich <ilya.enkovich@intel.com> * FIX-#2254: Removed obsolete groupby_dict_agg API function Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com> * FIX-#2254: Fixed dict aggregate for base backend Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com> * FIX-#2254: Address reformatting comments Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com> * FIX-#2254: Remove whitespace Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com> * FIX-#2254: Removed redundant argument conversion because it is already done inside of base backend. Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com> Co-authored-by: ienkovich <ilya.enkovich@intel.com>
1 parent 2b0b755 commit 5c9398c

File tree

6 files changed

+47
-77
lines changed

6 files changed

+47
-77
lines changed

modin/backends/base/query_compiler.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1426,16 +1426,6 @@ def groupby_agg(
14261426
drop=drop,
14271427
)
14281428

1429-
def groupby_dict_agg(self, by, func_dict, groupby_args, agg_args, drop=False):
1430-
return GroupByDefault.register(pandas.core.groupby.DataFrameGroupBy.aggregate)(
1431-
self,
1432-
by=by,
1433-
func_dict=func_dict,
1434-
groupby_args=groupby_args,
1435-
agg_args=agg_args,
1436-
drop=drop,
1437-
)
1438-
14391429
# END Manual Partitioning methods
14401430

14411431
def unstack(self, level, fill_value):

modin/backends/pandas/query_compiler.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2573,7 +2573,8 @@ def groupby_agg(
25732573
groupby_kwargs,
25742574
drop=False,
25752575
):
2576-
agg_func = wrap_udf_function(agg_func)
2576+
if callable(agg_func):
2577+
agg_func = wrap_udf_function(agg_func)
25772578

25782579
if is_multi_by:
25792580
return super().groupby_agg(
@@ -2605,7 +2606,11 @@ def groupby_agg_builder(df):
26052606
def compute_groupby(df):
26062607
grouped_df = df.groupby(by=by, axis=axis, **groupby_kwargs)
26072608
try:
2608-
result = agg_func(grouped_df, **agg_kwargs)
2609+
result = (
2610+
grouped_df.agg(agg_func)
2611+
if isinstance(agg_func, dict)
2612+
else agg_func(grouped_df, **agg_kwargs)
2613+
)
26092614
# This happens when the partition is filled with non-numeric data and a
26102615
# numeric operation is done. We need to build the index here to avoid
26112616
# issues with extracting the index.
@@ -2631,7 +2636,9 @@ def compute_groupby(df):
26312636
# determening type of raised exception by applying `aggfunc`
26322637
# to empty DataFrame
26332638
try:
2634-
agg_func(
2639+
pandas.DataFrame(index=[1], columns=[1]).agg(agg_func) if isinstance(
2640+
agg_func, dict
2641+
) else agg_func(
26352642
pandas.DataFrame(index=[1], columns=[1]).groupby(level=0),
26362643
**agg_kwargs,
26372644
)

modin/data_management/functions/default_methods/groupby_default.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,11 @@ def fn(
8080

8181
grp = df.groupby(by, axis=axis, **groupby_args)
8282
agg_func = cls.get_func(grp, key, **kwargs)
83-
result = agg_func(grp, **agg_args)
83+
result = (
84+
grp.agg(agg_func, **agg_args)
85+
if isinstance(agg_func, dict)
86+
else agg_func(grp, **agg_args)
87+
)
8488

8589
if not is_multi_by:
8690
if as_index:

modin/experimental/backends/omnisci/query_compiler.py

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -279,33 +279,6 @@ def groupby_agg(
279279
)
280280
return self.__constructor__(new_frame)
281281

282-
def groupby_dict_agg(self, by, func_dict, groupby_args, agg_args, drop=False):
283-
"""Apply aggregation functions to a grouped dataframe per-column.
284-
285-
Parameters
286-
----------
287-
by : DFAlgQueryCompiler
288-
The column to group by
289-
func_dict : dict of str, callable/string
290-
The dictionary mapping of column to function
291-
groupby_args : dict
292-
The dictionary of keyword arguments for the group by.
293-
agg_args : dict
294-
The dictionary of keyword arguments for the aggregation functions
295-
drop : bool
296-
Whether or not to drop the column from the data.
297-
298-
Returns
299-
-------
300-
DFAlgQueryCompiler
301-
The result of the per-column aggregations on the grouped dataframe.
302-
"""
303-
# TODO: handle `drop` arg
304-
new_frame = self._modin_frame.groupby_agg(
305-
by, 0, func_dict, groupby_args, **agg_args
306-
)
307-
return self.__constructor__(new_frame)
308-
309282
def count(self, **kwargs):
310283
return self._agg("count", **kwargs)
311284

modin/pandas/groupby.py

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,8 @@ def aggregate(self, func=None, *args, **kwargs):
357357
# This is not implemented in pandas,
358358
# so we throw a different message
359359
raise NotImplementedError("axis other than 0 is not supported")
360+
361+
relabeling_required = False
360362
if isinstance(func, dict) or func is None:
361363

362364
def _reconstruct_func(func, **kwargs):
@@ -380,50 +382,32 @@ def _reconstruct_func(func, **kwargs):
380382
from pandas.core.base import SpecificationError
381383

382384
raise SpecificationError("nested renamer is not supported")
383-
if isinstance(self._by, type(self._query_compiler)):
384-
by = list(self._by.columns)
385-
else:
386-
by = self._by
387-
388-
subset_cols = list(func_dict.keys()) + (
389-
list(self._by.columns)
390-
if isinstance(self._by, type(self._query_compiler))
391-
and all(c in self._df.columns for c in self._by.columns)
392-
else []
393-
)
394-
result = type(self._df)(
395-
query_compiler=self._df[subset_cols]._query_compiler.groupby_dict_agg(
396-
by=by,
397-
func_dict=func_dict,
398-
groupby_args=self._kwargs,
399-
agg_args=kwargs,
400-
drop=self._drop,
401-
)
402-
)
403-
404-
if relabeling_required:
405-
result = result.iloc[:, order]
406-
result.columns = new_columns
407-
408-
return result
409-
410-
if is_list_like(func):
385+
func = func_dict
386+
elif is_list_like(func):
411387
return self._default_to_pandas(
412388
lambda df, *args, **kwargs: df.aggregate(func, *args, **kwargs),
413389
*args,
414390
**kwargs,
415391
)
416-
if isinstance(func, str):
417-
agg_func = getattr(self, func, None)
392+
elif isinstance(func, str):
393+
# Using "getattr" here masks possible AttributeError which we throw
394+
# in __getattr__, so we should call __getattr__ directly instead.
395+
agg_func = self.__getattr__(func)
418396
if callable(agg_func):
419397
return agg_func(*args, **kwargs)
420-
return self._apply_agg_function(
421-
lambda df, *args, **kwargs: df.aggregate(func, *args, **kwargs),
398+
399+
result = self._apply_agg_function(
400+
func,
422401
drop=self._as_index,
423402
*args,
424403
**kwargs,
425404
)
426405

406+
if relabeling_required:
407+
result = result.iloc[:, order]
408+
result.columns = new_columns
409+
return result
410+
427411
agg = aggregate
428412

429413
def last(self, **kwargs):
@@ -888,7 +872,9 @@ def _apply_agg_function(self, f, drop=True, *args, **kwargs):
888872
-------
889873
A new combined DataFrame with the result of all groups.
890874
"""
891-
assert callable(f), "'{0}' object is not callable".format(type(f))
875+
assert callable(f) or isinstance(
876+
f, dict
877+
), "'{0}' object is not callable and not a dict".format(type(f))
892878

893879
# For aggregations, pandas behavior does this for the result.
894880
# For other operations it does not, so we wait until there is an aggregation to

modin/pandas/test/test_groupby.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,12 @@ def test_mixed_dtypes_groupby(as_index):
149149
eval_var(modin_groupby, pandas_groupby)
150150
eval_skew(modin_groupby, pandas_groupby)
151151

152-
agg_functions = ["min", "max"]
152+
agg_functions = [
153+
"min",
154+
"max",
155+
{"col2": "sum"},
156+
{"col2": "max", "col4": "sum", "col5": "min"},
157+
]
153158
for func in agg_functions:
154159
eval_agg(modin_groupby, pandas_groupby, func)
155160
eval_aggregate(modin_groupby, pandas_groupby, func)
@@ -479,7 +484,12 @@ def test_single_group_row_groupby():
479484
eval_prod(modin_groupby, pandas_groupby)
480485
eval_std(modin_groupby, pandas_groupby)
481486

482-
agg_functions = ["min", "max"]
487+
agg_functions = [
488+
"min",
489+
"max",
490+
{"col2": "sum"},
491+
{"col2": "max", "col4": "sum", "col5": "min"},
492+
]
483493
for func in agg_functions:
484494
eval_agg(modin_groupby, pandas_groupby, func)
485495
eval_aggregate(modin_groupby, pandas_groupby, func)
@@ -595,7 +605,7 @@ def test_large_row_groupby(is_by_category):
595605
# eval_prod(modin_groupby, pandas_groupby) causes overflows
596606
eval_std(modin_groupby, pandas_groupby)
597607

598-
agg_functions = ["min", "max"]
608+
agg_functions = ["min", "max", {"A": "sum"}, {"A": "max", "B": "sum", "C": "min"}]
599609
for func in agg_functions:
600610
eval_agg(modin_groupby, pandas_groupby, func)
601611
eval_aggregate(modin_groupby, pandas_groupby, func)

0 commit comments

Comments
 (0)