Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Fixing aggregation protocol to be appropriately associative #50757

Merged
merged 39 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
14fb287
Make sure tests are run for both PA and Pandas
alexeykudinkin Feb 20, 2025
d8ae5fa
Updated CI names to include PA version
alexeykudinkin Feb 20, 2025
a0722c8
Fixed Pandas block avoiding noisy warning
alexeykudinkin Feb 20, 2025
e8df5ad
Tidying up
alexeykudinkin Feb 20, 2025
d4446ef
Revisited null-safe aggregation protocol rebasing it onto `_Optional`
alexeykudinkin Feb 20, 2025
62013c3
Added formal test for aggregation protocol verifying all aggregation …
alexeykudinkin Feb 20, 2025
bdb90b2
Fixed NaN handling in Std/Mean
alexeykudinkin Feb 20, 2025
20db5a2
Fixed Std handling of ddof
alexeykudinkin Feb 20, 2025
cbae683
Fixed `Quantile` finalization seq
alexeykudinkin Feb 20, 2025
e70377c
Updated ATA tests
alexeykudinkin Feb 20, 2025
16ee17a
`lint`
alexeykudinkin Feb 20, 2025
7d7673d
Added AbsMax & Unique to aggregation protocol test
alexeykudinkin Feb 20, 2025
715928b
Fixed AbsMax aggregation seq
alexeykudinkin Feb 20, 2025
effa4cb
Fixing test
alexeykudinkin Feb 20, 2025
f9165df
Updated `Block.count` to add `ignore_nulls` param;
alexeykudinkin Feb 20, 2025
a99abb8
Fixed Count aggregation to properly respect `ignore_nulls`
alexeykudinkin Feb 20, 2025
b304174
Added Count aggregation to protocol test
alexeykudinkin Feb 20, 2025
db1a7ea
`lint`
alexeykudinkin Feb 20, 2025
c0fa3aa
Refactored test to avoid unnecessary parameterization
alexeykudinkin Feb 20, 2025
0ea546c
Fixed test to avoid sharing zero values
alexeykudinkin Feb 20, 2025
08ea9ea
Aligned test with actual aggregation protocol
alexeykudinkin Feb 20, 2025
874ec56
Fixed global count case
alexeykudinkin Feb 20, 2025
3d567a9
Fixed `PandasBlockAccessor.count` to respect `ignore_nulls`
alexeykudinkin Feb 20, 2025
8b5e634
Properly propagate `ignore_nulls`
alexeykudinkin Feb 20, 2025
746b480
`lint`
alexeykudinkin Feb 20, 2025
6dba288
Added `zero_factory` param to produce "zero" for aggregation monoids
alexeykudinkin Feb 20, 2025
c1ea200
Simplified combination protocol (to avoid the need for `_Optional` co…
alexeykudinkin Feb 20, 2025
45db8e8
Added py-doc
alexeykudinkin Feb 20, 2025
986b751
Fixed Mean finalization seq
alexeykudinkin Feb 20, 2025
e2cddf6
Fixed Std finalization seq to properly handle zero
alexeykudinkin Feb 20, 2025
c0ff8df
Fixed tests incorrect ref
alexeykudinkin Feb 20, 2025
66f10ee
Fixed combination seq to avoid coercing nans into nones
alexeykudinkin Feb 20, 2025
48ca317
Added py-doc for combining
alexeykudinkin Feb 20, 2025
a405547
Tidying up
alexeykudinkin Feb 20, 2025
7530205
Tidying up more
alexeykudinkin Feb 20, 2025
7d1b555
`lint`
alexeykudinkin Feb 20, 2025
0b27650
Missing epilogue
alexeykudinkin Feb 21, 2025
fda3d6d
Lifted ignore_nulls parameterization into creation path instead of ex…
alexeykudinkin Feb 21, 2025
1769d89
`lint`
alexeykudinkin Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .buildkite/data.rayci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ steps:
--only-tags data_non_parallel
depends_on: data9build

- label: ":database: data: arrow latest tests"
- label: ":database: data: arrow v18 tests"
tags:
- python
- data
Expand All @@ -68,7 +68,7 @@ steps:
--except-tags data_integration,doctest,data_non_parallel
depends_on: datalbuild

- label: ":database: data: arrow latest tests (data_non_parallel)"
- label: ":database: data: arrow v18 tests (data_non_parallel)"
tags:
- python
- data
Expand All @@ -80,7 +80,7 @@ steps:
--only-tags data_non_parallel
depends_on: datalbuild

- label: ":database: data: arrow latest {{matrix.python}} tests ({{matrix.worker_id}})"
- label: ":database: data: arrow v18 {{matrix.python}} tests ({{matrix.worker_id}})"
key: datal_python_tests
if: build.pull_request.labels includes "continuous-build" || pipeline.id == "0189e759-8c96-4302-b6b5-b4274406bf89" || pipeline.id == "018f4f1e-1b73-4906-9802-92422e3badaa"
tags:
Expand All @@ -98,7 +98,7 @@ steps:
python: ["3.12"]
worker_id: ["0", "1"]

- label: ":database: data: arrow latest {{matrix.python}} tests (data_non_parallel)"
- label: ":database: data: arrow v18 {{matrix.python}} tests (data_non_parallel)"
key: datal_python_non_parallel_tests
if: build.pull_request.labels includes "continuous-build" || pipeline.id == "0189e759-8c96-4302-b6b5-b4274406bf89" || pipeline.id == "018f4f1e-1b73-4906-9802-92422e3badaa"
tags:
Expand Down
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_aggregate",
size = "small",
srcs = ["tests/test_aggregate.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_avro",
size = "small",
Expand Down
6 changes: 4 additions & 2 deletions python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def _sample(self, n_samples: int, sort_key: "SortKey") -> "pyarrow.Table":
table = self._table.select(sort_key.get_columns())
return transform_pyarrow.take_table(table, indices)

def count(self, on: str) -> Optional[U]:
def count(self, on: str, ignore_nulls: bool = False) -> Optional[U]:
"""Count the number of non-null values in the provided column."""
import pyarrow.compute as pac

Expand All @@ -353,8 +353,10 @@ def count(self, on: str) -> Optional[U]:
if self.num_rows() == 0:
return None

mode = "only_valid" if ignore_nulls else "all"

col = self._table[on]
return pac.count(col).as_py()
return pac.count(col, mode=mode).as_py()

def _apply_arrow_compute(
self, compute_fn: Callable, on: str, ignore_nulls: bool
Expand Down
18 changes: 10 additions & 8 deletions python/ray/data/_internal/pandas_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ class PandasRow(TableRow):
def __getitem__(self, key: Union[str, List[str]]) -> Any:
from ray.data.extensions import TensorArrayElement

pd = lazy_import_pandas()

def get_item(keys: List[str]) -> Any:
col = self._row[keys]
if len(col) == 0:
Expand All @@ -75,14 +73,16 @@ def get_item(keys: List[str]) -> Any:
if isinstance(items.iloc[0], TensorArrayElement):
# Getting an item in a Pandas tensor column may return
# a TensorArrayElement, which we have to convert to an ndarray.
return pd.Series(item.to_numpy() for item in items)
return tuple(item.to_numpy() for item in items)

try:
# Try to interpret this as a numpy-type value.
# See https://stackoverflow.com/questions/9452775/converting-numpy-dtypes-to-native-python-types. # noqa: E501
return pd.Series(item.as_py() for item in items)
return tuple(item for item in items)

except (AttributeError, ValueError) as e:
logger.warning(f"Failed to convert {items} to a tuple", exc_info=e)

except (AttributeError, ValueError):
# Fallback to the original form.
return items

Expand All @@ -94,7 +94,7 @@ def get_item(keys: List[str]) -> Any:
if items is None:
return None
elif is_single_item:
return items.iloc[0]
return items[0]
else:
return items

Expand Down Expand Up @@ -447,8 +447,10 @@ def _apply_agg(

return val

def count(self, on: str) -> Optional[U]:
return self._apply_agg(lambda col: col.count(), on)
def count(self, on: str, ignore_nulls: bool = False) -> Optional[U]:
return self._apply_agg(
lambda col: col.count() if ignore_nulls else len(col), on
)

def sum(self, on: str, ignore_nulls: bool) -> Optional[U]:
if on is not None and not isinstance(on, str):
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/table_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def iter_groups() -> Iterator[Tuple[Sequence[KeyType], Block]]:
if len(group_keys) == 1:
init_vals = group_keys[0]

accumulators = [(agg.init(init_vals) if agg.init else None) for agg in aggs]
accumulators = [agg.init(init_vals) for agg in aggs]
for i in range(len(aggs)):
accessor = BlockAccessor.for_block(group_view)
# Skip empty blocks
Expand Down
Loading