Skip to content

feat: add axis param to simple df aggregations #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,22 +822,54 @@ def filter(self, column_id: str, keep_null: bool = False):
index_labels=self.index.names,
)

def aggregate_all_and_pivot(
def aggregate_all_and_stack(
self,
operation: agg_ops.AggregateOp,
*,
axis: int | str = 0,
value_col_id: str = "values",
dropna: bool = True,
dtype=pd.Float64Dtype(),
) -> Block:
aggregations = [(col_id, operation, col_id) for col_id in self.value_columns]
result_expr = self.expr.aggregate(aggregations, dropna=dropna).unpivot(
row_labels=self.column_labels.to_list(),
index_col_id="index",
unpivot_columns=[(value_col_id, self.value_columns)],
dtype=dtype,
)
return Block(result_expr, index_columns=["index"], column_labels=[None])
axis_n = utils.get_axis_number(axis)
if axis_n == 0:
aggregations = [
(col_id, operation, col_id) for col_id in self.value_columns
]
result_expr = self.expr.aggregate(aggregations, dropna=dropna).unpivot(
row_labels=self.column_labels.to_list(),
index_col_id="index",
unpivot_columns=[(value_col_id, self.value_columns)],
dtype=dtype,
)
return Block(result_expr, index_columns=["index"], column_labels=[None])
else: # axis_n == 1
# using offsets as identity to group on.
# TODO: Allow to promote identity/total_order columns instead for better perf
expr_with_offsets, offset_col = self.expr.promote_offsets()
stacked_expr = expr_with_offsets.unpivot(
row_labels=self.column_labels.to_list(),
index_col_id=guid.generate_guid(),
unpivot_columns=[(value_col_id, self.value_columns)],
passthrough_columns=[*self.index_columns, offset_col],
dtype=dtype,
)
index_aggregations = [
(col_id, agg_ops.AnyValueOp(), col_id)
for col_id in [*self.index_columns]
]
main_aggregation = (value_col_id, operation, value_col_id)
result_expr = stacked_expr.aggregate(
[*index_aggregations, main_aggregation],
by_column_ids=[offset_col],
dropna=dropna,
)
return Block(
result_expr.drop_columns([offset_col]),
self.index_columns,
column_labels=[None],
index_labels=self.index_labels,
)

def select_column(self, id: str) -> Block:
return self.select_columns([id])
Expand Down
63 changes: 40 additions & 23 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1462,41 +1462,48 @@ def dropna(
def any(
self,
*,
axis: typing.Union[str, int] = 0,
bool_only: bool = False,
) -> bigframes.series.Series:
if not bool_only:
frame = self._raise_on_non_boolean("any")
else:
frame = self._drop_non_bool()
block = frame._block.aggregate_all_and_pivot(
agg_ops.any_op, dtype=pandas.BooleanDtype()
block = frame._block.aggregate_all_and_stack(
agg_ops.any_op, dtype=pandas.BooleanDtype(), axis=axis
)
return bigframes.series.Series(block.select_column("values"))

def all(self, *, bool_only: bool = False) -> bigframes.series.Series:
def all(
self, axis: typing.Union[str, int] = 0, *, bool_only: bool = False
) -> bigframes.series.Series:
if not bool_only:
frame = self._raise_on_non_boolean("all")
else:
frame = self._drop_non_bool()
block = frame._block.aggregate_all_and_pivot(
agg_ops.all_op, dtype=pandas.BooleanDtype()
block = frame._block.aggregate_all_and_stack(
agg_ops.all_op, dtype=pandas.BooleanDtype(), axis=axis
)
return bigframes.series.Series(block.select_column("values"))

def sum(self, *, numeric_only: bool = False) -> bigframes.series.Series:
def sum(
self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False
) -> bigframes.series.Series:
if not numeric_only:
frame = self._raise_on_non_numeric("sum")
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_pivot(agg_ops.sum_op)
block = frame._block.aggregate_all_and_stack(agg_ops.sum_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))

def mean(self, *, numeric_only: bool = False) -> bigframes.series.Series:
def mean(
self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False
) -> bigframes.series.Series:
if not numeric_only:
frame = self._raise_on_non_numeric("mean")
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_pivot(agg_ops.mean_op)
block = frame._block.aggregate_all_and_stack(agg_ops.mean_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))

def median(
Expand All @@ -1510,47 +1517,57 @@ def median(
frame = self._raise_on_non_numeric("median")
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_pivot(agg_ops.median_op)
block = frame._block.aggregate_all_and_stack(agg_ops.median_op)
return bigframes.series.Series(block.select_column("values"))

def std(self, *, numeric_only: bool = False) -> bigframes.series.Series:
def std(
self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False
) -> bigframes.series.Series:
if not numeric_only:
frame = self._raise_on_non_numeric("std")
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_pivot(agg_ops.std_op)
block = frame._block.aggregate_all_and_stack(agg_ops.std_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))

def var(self, *, numeric_only: bool = False) -> bigframes.series.Series:
def var(
self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False
) -> bigframes.series.Series:
if not numeric_only:
frame = self._raise_on_non_numeric("var")
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_pivot(agg_ops.var_op)
block = frame._block.aggregate_all_and_stack(agg_ops.var_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))

def min(self, *, numeric_only: bool = False) -> bigframes.series.Series:
def min(
self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False
) -> bigframes.series.Series:
if not numeric_only:
frame = self._raise_on_non_numeric("min")
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_pivot(agg_ops.min_op)
block = frame._block.aggregate_all_and_stack(agg_ops.min_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))

def max(self, *, numeric_only: bool = False) -> bigframes.series.Series:
def max(
self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False
) -> bigframes.series.Series:
if not numeric_only:
frame = self._raise_on_non_numeric("max")
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_pivot(agg_ops.max_op)
block = frame._block.aggregate_all_and_stack(agg_ops.max_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))

def prod(self, *, numeric_only: bool = False) -> bigframes.series.Series:
def prod(
self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False
) -> bigframes.series.Series:
if not numeric_only:
frame = self._raise_on_non_numeric("prod")
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_pivot(agg_ops.product_op)
block = frame._block.aggregate_all_and_stack(agg_ops.product_op, axis=axis)
return bigframes.series.Series(block.select_column("values"))

product = prod
Expand All @@ -1560,11 +1577,11 @@ def count(self, *, numeric_only: bool = False) -> bigframes.series.Series:
frame = self
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_pivot(agg_ops.count_op)
block = frame._block.aggregate_all_and_stack(agg_ops.count_op)
return bigframes.series.Series(block.select_column("values"))

def nunique(self) -> bigframes.series.Series:
block = self._block.aggregate_all_and_pivot(agg_ops.nunique_op)
block = self._block.aggregate_all_and_stack(agg_ops.nunique_op)
return bigframes.series.Series(block.select_column("values"))

def agg(
Expand All @@ -1587,7 +1604,7 @@ def agg(
)
else:
return bigframes.series.Series(
self._block.aggregate_all_and_pivot(
self._block.aggregate_all_and_stack(
agg_ops.lookup_agg_func(typing.cast(str, func))
)
)
Expand Down
30 changes: 29 additions & 1 deletion tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1999,6 +1999,29 @@ def test_dataframe_aggregates(scalars_df_index, scalars_pandas_df_index, op):
pd.testing.assert_series_equal(pd_series, bf_result, check_index_type=False)


@pytest.mark.parametrize(
("op"),
[
(lambda x: x.sum(axis=1, numeric_only=True)),
(lambda x: x.mean(axis=1, numeric_only=True)),
(lambda x: x.min(axis=1, numeric_only=True)),
(lambda x: x.max(axis=1, numeric_only=True)),
(lambda x: x.std(axis=1, numeric_only=True)),
(lambda x: x.var(axis=1, numeric_only=True)),
],
ids=["sum", "mean", "min", "max", "std", "var"],
)
def test_dataframe_aggregates_axis_1(scalars_df_index, scalars_pandas_df_index, op):
col_names = ["int64_too", "int64_col", "float64_col", "bool_col", "string_col"]
bf_result = op(scalars_df_index[col_names]).to_pandas()
pd_result = op(scalars_pandas_df_index[col_names])

# Pandas may produce narrower numeric types, but bigframes always produces Float64
pd_result = pd_result.astype("Float64")
# Pandas has object index type
pd.testing.assert_series_equal(pd_result, bf_result, check_index_type=False)


def test_dataframe_aggregates_median(scalars_df_index, scalars_pandas_df_index):
col_names = ["int64_too", "float64_col", "int64_col", "bool_col"]
bf_result = scalars_df_index[col_names].median(numeric_only=True).to_pandas()
Expand All @@ -2019,11 +2042,16 @@ def test_dataframe_aggregates_median(scalars_df_index, scalars_pandas_df_index):
[
(lambda x: x.all(bool_only=True)),
(lambda x: x.any(bool_only=True)),
(lambda x: x.all(axis=1, bool_only=True)),
(lambda x: x.any(axis=1, bool_only=True)),
],
ids=["all", "any"],
ids=["all_axis0", "any_axis0", "all_axis1", "any_axis1"],
)
def test_dataframe_bool_aggregates(scalars_df_index, scalars_pandas_df_index, op):
# Pandas will drop nullable 'boolean' dtype so we convert first to bool, then cast back later
scalars_df_index = scalars_df_index.assign(
bool_col=scalars_df_index.bool_col.fillna(False)
)
scalars_pandas_df_index = scalars_pandas_df_index.assign(
bool_col=scalars_pandas_df_index.bool_col.fillna(False).astype("bool")
)
Expand Down
Loading