Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
)
from ray.data._internal.arrow_ops import transform_polars, transform_pyarrow
from ray.data._internal.arrow_ops.transform_pyarrow import shuffle
from ray.data._internal.row import TableRow
from ray.data._internal.row import row_repr, row_repr_pretty, row_str
from ray.data._internal.table_block import TableBlockAccessor, TableBlockBuilder
from ray.data.block import (
Block,
Expand Down Expand Up @@ -85,11 +85,14 @@ def get_concat_and_sort_transform(context: DataContext) -> Callable:
return transform_pyarrow.concat_and_sort


class ArrowRow(TableRow):
class ArrowRow(Mapping):
"""
Row of a tabular Dataset backed by a Arrow Table block.
"""

def __init__(self, row: Any):
self._row = row

def __getitem__(self, key: Union[str, List[str]]) -> Any:
from ray.data.extensions import get_arrow_extension_tensor_types

Expand All @@ -101,7 +104,9 @@ def get_item(keys: List[str]) -> Any:
# Build a tensor row.
return tuple(
[
ArrowBlockAccessor._build_tensor_row(self._row, col_name=key)
ArrowBlockAccessor._build_tensor_row(
self._row, col_name=key, row_idx=0
)
for key in keys
]
)
Expand Down Expand Up @@ -142,6 +147,15 @@ def __len__(self):
def as_pydict(self) -> Dict[str, Any]:
return dict(self.items())

def __str__(self):
return row_str(self)

def __repr__(self):
return row_repr(self)

def _repr_pretty_(self, p, cycle):
return row_repr_pretty(self, p, cycle)


class ArrowBlockBuilder(TableBlockBuilder):
def __init__(self):
Expand Down Expand Up @@ -203,6 +217,11 @@ def __init__(self, table: "pyarrow.Table"):
if pyarrow is None:
raise ImportError("Run `pip install pyarrow` for Arrow support")
super().__init__(table)
self._max_chunk_size: Optional[int] = None

def _get_row(self, index: int) -> ArrowRow:
base_row = self.slice(index, index + 1, copy=False)
return ArrowRow(base_row)

def column_names(self) -> List[str]:
return self._table.column_names
Expand Down Expand Up @@ -231,10 +250,10 @@ def from_bytes(cls, data: bytes) -> "ArrowBlockAccessor":

@staticmethod
def _build_tensor_row(
row: ArrowRow, col_name: str = TENSOR_COLUMN_NAME
row: ArrowRow, row_idx: int, col_name: str = TENSOR_COLUMN_NAME
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The type hint for the row parameter is ArrowRow, but it appears to be incorrect. The only call site in this file, ArrowRow.__getitem__, passes self._row, which is a pyarrow.Table. If row were an ArrowRow instance, the expression row[col_name] within this method would trigger a recursive call to ArrowRow.__getitem__. To improve type safety and clarity, please update the type hint to row: "pyarrow.Table".

Suggested change
row: ArrowRow, row_idx: int, col_name: str = TENSOR_COLUMN_NAME
row: "pyarrow.Table", row_idx: int, col_name: str = TENSOR_COLUMN_NAME

) -> np.ndarray:

element = row[col_name][0]
element = row[col_name][row_idx]
arr = element.as_py()

assert isinstance(arr, np.ndarray), type(arr)
Expand Down Expand Up @@ -444,16 +463,17 @@ def iter_rows(
) -> Iterator[Union[Mapping, np.ndarray]]:
table = self._table
if public_row_format:
if not hasattr(self, "_max_chunk_size"):
if self._max_chunk_size is None:
# Calling _get_max_chunk_size in constructor makes it slow, so we
# are calling it here only when needed.
self._max_chunk_size = _get_max_chunk_size(
self._table, ARROW_MAX_CHUNK_SIZE_BYTES
table, ARROW_MAX_CHUNK_SIZE_BYTES
)
for batch in table.to_batches(max_chunksize=self._max_chunk_size):
yield from batch.to_pylist()
else:
for i in range(self.num_rows()):
num_rows = self.num_rows()
for i in range(num_rows):
yield self._get_row(i)

def filter(self, predicate_expr: "Expr") -> "pyarrow.Table":
Expand Down
29 changes: 23 additions & 6 deletions python/ray/data/_internal/pandas_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from ray.air.constants import TENSOR_COLUMN_NAME
from ray.air.util.tensor_extensions.utils import _should_convert_to_tensor
from ray.data._internal.numpy_support import convert_to_numpy
from ray.data._internal.row import TableRow
from ray.data._internal.row import row_repr, row_repr_pretty, row_str
from ray.data._internal.table_block import TableBlockAccessor, TableBlockBuilder
from ray.data._internal.util import is_null
from ray.data.block import (
Expand Down Expand Up @@ -61,11 +61,14 @@ def lazy_import_pandas():
return _pandas


class PandasRow(TableRow):
class PandasRow(Mapping):
"""
Row of a tabular Dataset backed by a Pandas DataFrame block.
"""

def __init__(self, row: Any):
self._row = row

def __getitem__(self, key: Union[str, List[str]]) -> Any:
from ray.data.extensions import TensorArrayElement

Expand Down Expand Up @@ -124,6 +127,15 @@ def as_pydict(self) -> Dict[str, Any]:

return pydict

def __str__(self):
return row_str(self)

def __repr__(self):
return row_repr(self)

def _repr_pretty_(self, p, cycle):
return row_repr_pretty(self, p, cycle)


class PandasBlockColumnAccessor(BlockColumnAccessor):
def __init__(self, col: "pandas.Series"):
Expand Down Expand Up @@ -330,6 +342,10 @@ class PandasBlockAccessor(TableBlockAccessor):
def __init__(self, table: "pandas.DataFrame"):
super().__init__(table)

def _get_row(self, index: int) -> PandasRow:
base_row = self.slice(index, index + 1, copy=False)
return PandasRow(base_row)

def column_names(self) -> List[str]:
return self._table.columns.tolist()

Expand All @@ -341,10 +357,10 @@ def fill_column(self, name: str, value: Any) -> Block:
return self._table.assign(**{name: value})

@staticmethod
def _build_tensor_row(row: PandasRow) -> np.ndarray:
def _build_tensor_row(row: PandasRow, row_idx: int) -> np.ndarray:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The type hint for the row parameter is PandasRow, which seems incorrect. If row were a PandasRow instance, row[TENSOR_COLUMN_NAME] would call PandasRow.__getitem__, which returns a tuple. Calling .iloc[row_idx] on a tuple would raise an error. The row parameter is likely expected to be a pandas.DataFrame. Please update the type hint to row: "pandas.DataFrame" for correctness and clarity.

Suggested change
def _build_tensor_row(row: PandasRow, row_idx: int) -> np.ndarray:
def _build_tensor_row(row: "pandas.DataFrame", row_idx: int) -> np.ndarray:

from ray.data.extensions import TensorArrayElement

tensor = row[TENSOR_COLUMN_NAME].iloc[0]
tensor = row[TENSOR_COLUMN_NAME].iloc[row_idx]
if isinstance(tensor, TensorArrayElement):
# Getting an item in a Pandas tensor column may return a TensorArrayElement,
# which we have to convert to an ndarray.
Expand Down Expand Up @@ -664,9 +680,10 @@ def block_type(self) -> BlockType:
def iter_rows(
self, public_row_format: bool
) -> Iterator[Union[Mapping, np.ndarray]]:
for i in range(self.num_rows()):
num_rows = self.num_rows()
for i in range(num_rows):
row = self._get_row(i)
if public_row_format and isinstance(row, TableRow):
if public_row_format:
yield row.as_pydict()
else:
yield row
Expand Down
49 changes: 11 additions & 38 deletions python/ray/data/_internal/row.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,19 @@
import abc
from collections.abc import Mapping
from typing import Any, Dict


class TableRow(Mapping):
"""
A dict-like row of a tabular ``Dataset``.
def row_str(row: Mapping) -> str:
"""Convert a row to string representation."""
return str(row.as_pydict())

This implements the dictionary mapping interface, but provides more
efficient access with less data copying than converting Arrow Tables
or Pandas DataFrames into per-row dicts. This class must be subclassed,
with subclasses implementing ``__getitem__``, ``__iter__``, and ``__len__``.

Concrete subclasses include ``ray.data._internal.arrow_block.ArrowRow`` and
``ray.data._internal.pandas_block.PandasRow``.
"""
def row_repr(row: Mapping) -> str:
"""Convert a row to repr representation."""
return str(row)

def __init__(self, row: Any):
"""
Construct a ``TableRow`` (internal API).

Args:
row: The tabular row that backs this row mapping.
"""
self._row = row
def row_repr_pretty(row: Mapping, p, cycle):
"""Pretty print a row."""
from IPython.lib.pretty import _dict_pprinter_factory

@abc.abstractmethod
def as_pydict(self) -> Dict[str, Any]:
"""Convert to a normal Python dict.

This can create a new copy of the row.
"""
...

def __str__(self):
return str(self.as_pydict())

def __repr__(self):
return str(self)

def _repr_pretty_(self, p, cycle):
from IPython.lib.pretty import _dict_pprinter_factory

pprinter = _dict_pprinter_factory("{", "}")
return pprinter(self, p, cycle)
pprinter = _dict_pprinter_factory("{", "}")
return pprinter(row, p, cycle)
15 changes: 4 additions & 11 deletions python/ray/data/_internal/table_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Dict,
Iterator,
List,
Mapping,
Optional,
Sequence,
Tuple,
Expand All @@ -18,7 +19,6 @@
from ray._private.ray_constants import env_integer
from ray.air.constants import TENSOR_COLUMN_NAME
from ray.data._internal.block_builder import BlockBuilder
from ray.data._internal.row import TableRow
from ray.data._internal.size_estimator import SizeEstimator
from ray.data._internal.util import (
NULL_SENTINEL,
Expand Down Expand Up @@ -73,8 +73,8 @@ def __init__(self, block_type):
self._num_compactions = 0
self._block_type = block_type

def add(self, item: Union[dict, TableRow, np.ndarray]) -> None:
if isinstance(item, TableRow):
def add(self, item: Union[dict, Mapping, np.ndarray]) -> None:
if hasattr(item, "as_pydict"):
item = item.as_pydict()
elif isinstance(item, np.ndarray):
item = {TENSOR_COLUMN_NAME: item}
Expand Down Expand Up @@ -169,22 +169,15 @@ def _compact_if_needed(self) -> None:


class TableBlockAccessor(BlockAccessor):
ROW_TYPE: TableRow = TableRow

def __init__(self, table: Any):
self._table = table

def _get_row(self, index: int, copy: bool = False) -> Union[TableRow, np.ndarray]:
base_row = self.slice(index, index + 1, copy=copy)
row = self.ROW_TYPE(base_row)
return row

@staticmethod
def _munge_conflict(name, count):
return f"{name}_{count + 1}"

@staticmethod
def _build_tensor_row(row: TableRow) -> np.ndarray:
def _build_tensor_row(row: Mapping, row_idx: int) -> np.ndarray:
raise NotImplementedError

def to_default(self) -> Block:
Expand Down