From 60df3fa98f5b367f4f4ae56fa89ace11be129715 Mon Sep 17 00:00:00 2001 From: John Victor Kew II Date: Fri, 3 Mar 2023 09:53:56 -0800 Subject: [PATCH] FEAT 5460: Make DataFrameGroupBy.value counts more distributed (#3) * FIX-#4154: add value_counts method for SeriesGroupBy and DataFrameGroupBy (#5453) Signed-off-by: Anatoly Myachev * WIP GroupBy * Seemingly working w/ Ponder and Modin * Maintain proper value ordering for single group-by operations * linter updates from black on the files that have changed * When normalize is used, or the groupby was performed with as_index=False, default to pandas. With ponder this will result in a NotImplemented error currently. * Remove _to_pandas() by implementing sort_index on Series on the service side * | Object | call | Pushdown? | Compatibility | | --- | --- | --- | --- | | DataFrameGroupBy | value_counts() | Full | PASS | | DataFrameGroupBy | value_counts(ascending=True) |Full | PASS | | DataFrameGroupBy | value_counts(ascending=False) | Full |PASS | | DataFrameGroupBy | value_counts(sort=False) |Full | PASS | | DataFrameGroupBy | value_counts(sort=True) |Full | PASS | | DataFrameGroupBy | value_counts(normalize=False) |Full | PASS | | DataFrameGroupBy | value_counts(normalize=True) | | FAIL | | DataFrame | groupby(as_index=False) | | FAIL | | DataFrameGroupBy | value_counts(dropna=False) | Full | PASS | DataFrameGroupBy | value_counts(dropna=True) | |FAIL $\color{red}{\text{NOTE: For MultiIndex GroupBys the n+1 level index is ignored in the sorting.}}$ --------- Signed-off-by: Anatoly Myachev Co-authored-by: Anatoly Myachev --- modin/pandas/groupby.py | 49 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index d6dde786768..d7726a431d7 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -218,7 +218,7 @@ def skew(self, *args, **kwargs): ) def ffill(self, limit=None): - return self.fillna(limit=limit, method='ffill') + return self.fillna(limit=limit, method="ffill") def sem(self, ddof=1): return self._wrap_aggregation( @@ -235,14 +235,35 @@ def value_counts( ascending: bool = False, dropna: bool = True, ): - return self._default_to_pandas( - lambda df: df.value_counts( + # Compatibility Notes: + # dfGroupBy.value_counts nearly semantically + # equivalent to df.value_counts([, ]).sort_index() + # it returns a MultiIndex Series which needs to be converted to + # pandas for sort_index. + # + # Semantic Exceptions: + # normalize does not work; it will return the normalized results + # across the entire dataframe, not within the sub levels + # DataFrame(as_index=False) does not work. The default is True + # calling this function will always result in a Series rather + # than a DataFrame + # + if is_list_like(self._by): + subset = self._by + elif isinstance(self._by, type(self._query_compiler)): + subset = self._by.columns.values.tolist() + for c in self._columns.values.tolist(): + if c not in subset: + subset.append(c) + return ( + self._df.value_counts( subset=subset, normalize=normalize, sort=sort, ascending=ascending, dropna=dropna, ) + .sort_index(level=0, sort_remaining=False) ) def mean(self, numeric_only=None): @@ -618,7 +639,7 @@ def cummin(self, axis=0, **kwargs): ) def bfill(self, limit=None): - return self.fillna(limit=limit, method='bfill') + return self.fillna(limit=limit, method="bfill") def idxmin(self): return self._default_to_pandas(lambda df: df.idxmin()) @@ -787,7 +808,7 @@ def corrwith(self): return self._default_to_pandas(lambda df: df.corrwith) def pad(self, limit=None): - return self.fillna(limit=limit, method='pad') + return self.fillna(limit=limit, method="pad") def max(self, numeric_only=False, min_count=-1): return self._wrap_aggregation( @@ -1453,6 +1474,24 @@ def _iter(self): for k in (sorted(group_ids) if self._sort else group_ids) ) + def value_counts( + self, + normalize: bool = False, + sort: bool = True, + ascending: bool = False, + bins=None, + dropna: bool = True, + ): + return self._default_to_pandas( + lambda ser: ser.value_counts( + normalize=normalize, + sort=sort, + ascending=ascending, + bins=bins, + dropna=dropna, + ) + ) + def unique(self): return self._check_index( self._wrap_aggregation(