Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions python/ray/data/tests/test_execution_optimizer_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from packaging.version import parse as parse_version

import ray
from ray.data._internal.util import rows_same
from ray.data._internal.utils.arrow_utils import get_pyarrow_version
from ray.data.tests.conftest import * # noqa
from ray.data.tests.test_util import _check_usage_record
Expand Down Expand Up @@ -63,28 +64,24 @@ def test_from_pandas_refs_e2e(ray_start_regular_shared_2_cpus, enable_pandas_blo
try:
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
expected_df = pd.concat([df1, df2])

ds = ray.data.from_pandas_refs([ray.put(df1), ray.put(df2)])
values = [(r["one"], r["two"]) for r in ds.take(6)]
rows = [(r.one, r.two) for _, r in pd.concat([df1, df2]).iterrows()]
assert values == rows
assert rows_same(ds.to_pandas(), expected_df)
# Check that metadata fetch is included in stats.
assert "FromPandas" in ds.stats()
assert ds._plan._logical_plan.dag.name == "FromPandas"

# Test chaining multiple operations
ds2 = ds.map_batches(lambda x: x)
values = [(r["one"], r["two"]) for r in ds2.take(6)]
assert values == rows
assert rows_same(ds2.to_pandas(), expected_df)
assert "MapBatches" in ds2.stats()
assert "FromPandas" in ds2.stats()
assert ds2._plan._logical_plan.dag.name == "MapBatches(<lambda>)"

# test from single pandas dataframe
ds = ray.data.from_pandas_refs(ray.put(df1))
values = [(r["one"], r["two"]) for r in ds.take(3)]
rows = [(r.one, r.two) for _, r in df1.iterrows()]
assert values == rows
assert rows_same(ds.to_pandas(), df1)
# Check that metadata fetch is included in stats.
assert "FromPandas" in ds.stats()
assert ds._plan._logical_plan.dag.name == "FromPandas"
Expand Down