Skip to content

Commit

Permalink
[Arrow] Unifying Arrow conversion handling, fixing handling of >2Gb o…
Browse files Browse the repository at this point in the history
…bjects (ray-project#48487)

## Why are these changes needed?

Addresses ray-project#48419

Currently, we defer to Pyarrow to infer corresponding data-type to
represent column values returned by the Map-based operators.

However, Arrow is somehow not inferring the `large_*` kinds of types
even in somewhat trivial cases of strings, byte-strings etc. resulting
in `ArrowCapacityError` when you try to add a single string/byte-string
>2Gb.

This change addresses that by 

- Unifying handling of conversion to Numpy/Arrow in a single place
(unifying it across different code-paths)
 - Fixing incorrect fallbacks to `ArrowPythonObjectType`
- Upscaling `binary`, `string` to their Large counterparts (ie
`large_list`, etc) if objects we're adding to the Arrow array > 2Gb

<!-- Please give a short summary of the change and the problem this
solves. -->

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
  • Loading branch information
alexeykudinkin authored Nov 19, 2024
1 parent 73c956e commit b5934e5
Show file tree
Hide file tree
Showing 26 changed files with 602 additions and 204 deletions.
8 changes: 8 additions & 0 deletions python/ray/air/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ py_test(
deps = [":ml_lib"]
)

py_test(
name = "test_arrow",
size = "small",
srcs = ["tests/test_arrow.py"],
tags = ["team:ml", "team:data", "ray_data", "exclusive"],
deps = [":ml_lib"]
)

py_test(
name = "test_air_usage",
size = "small",
Expand Down
71 changes: 71 additions & 0 deletions python/ray/air/tests/test_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from dataclasses import dataclass, field

import pyarrow as pa
import pytest

from ray.air.util.tensor_extensions.arrow import (
ArrowConversionError,
_convert_to_pyarrow_native_array,
_infer_pyarrow_type,
convert_to_pyarrow_array,
)
from ray.air.util.tensor_extensions.utils import create_ragged_ndarray


@dataclass
class UserObj:
i: int = field()


def test_pa_infer_type_failing_to_infer():
# Represent a single column that will be using `ArrowPythonObjectExtension` type
# to ser/de native Python objects into bytes
column_vals = create_ragged_ndarray(
[
"hi",
1,
None,
[[[[]]]],
{"a": [[{"b": 2, "c": UserObj(i=123)}]]},
UserObj(i=456),
]
)

inferred_dtype = _infer_pyarrow_type(column_vals)

# Arrow (17.0) seem to fallback to assume the dtype of the first element
assert pa.string().equals(inferred_dtype)


def test_convert_to_pyarrow_array_object_ext_type_fallback():
column_values = create_ragged_ndarray(
[
"hi",
1,
None,
[[[[]]]],
{"a": [[{"b": 2, "c": UserObj(i=123)}]]},
UserObj(i=456),
]
)
column_name = "py_object_column"

# First, assert that straightforward conversion into Arrow native types fails
with pytest.raises(ArrowConversionError) as exc_info:
_convert_to_pyarrow_native_array(column_values, column_name)

assert (
str(exc_info.value)
== "Error converting data to Arrow: ['hi' 1 None list([[[[]]]]) {'a': [[{'b': 2, 'c': UserObj(i=123)}]]}\n UserObj(i=456)]" # noqa: E501
)

# Subsequently, assert that fallback to `ArrowObjectExtensionType` succeeds
pa_array = convert_to_pyarrow_array(column_values, column_name)

assert pa_array.to_pylist() == column_values.tolist()


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", "-x", __file__]))
10 changes: 5 additions & 5 deletions python/ray/air/tests/test_object_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
from ray.air.util.object_extensions.arrow import (
ArrowPythonObjectArray,
ArrowPythonObjectType,
object_extension_type_allowed,
_object_extension_type_allowed,
)
from ray.air.util.object_extensions.pandas import PythonObjectArray


@pytest.mark.skipif(
not object_extension_type_allowed(), reason="Object extension not supported."
not _object_extension_type_allowed(), reason="Object extension not supported."
)
def test_object_array_validation():
# Test unknown input type raises TypeError.
Expand All @@ -25,7 +25,7 @@ def test_object_array_validation():


@pytest.mark.skipif(
not object_extension_type_allowed(), reason="Object extension not supported."
not _object_extension_type_allowed(), reason="Object extension not supported."
)
def test_arrow_scalar_object_array_roundtrip():
arr = np.array(
Expand All @@ -41,7 +41,7 @@ def test_arrow_scalar_object_array_roundtrip():


@pytest.mark.skipif(
not object_extension_type_allowed(), reason="Object extension not supported."
not _object_extension_type_allowed(), reason="Object extension not supported."
)
def test_arrow_python_object_array_slice():
arr = np.array(["test", 20, "test2", 40, "test3", 60], dtype=object)
Expand All @@ -51,7 +51,7 @@ def test_arrow_python_object_array_slice():


@pytest.mark.skipif(
not object_extension_type_allowed(), reason="Object extension not supported."
not _object_extension_type_allowed(), reason="Object extension not supported."
)
def test_arrow_pandas_roundtrip():
obj = types.SimpleNamespace(a=1, b="test")
Expand Down
6 changes: 4 additions & 2 deletions python/ray/air/util/object_extensions/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
PYARROW_VERSION = None if _VER is None else parse_version(_VER)


def object_extension_type_allowed() -> bool:
def _object_extension_type_allowed() -> bool:
return (
PYARROW_VERSION is not None
and PYARROW_VERSION >= MIN_PYARROW_VERSION_SCALAR_SUBCLASS
Expand Down Expand Up @@ -104,7 +104,9 @@ def from_objects(
arr = pa.array(all_dumped_bytes, type=type_.storage_type)
return ArrowPythonObjectArray.from_storage(type_, arr)

def to_numpy(self, zero_copy_only: bool = False) -> np.ndarray:
def to_numpy(
self, zero_copy_only: bool = False, writable: bool = False
) -> np.ndarray:
arr = np.empty(len(self), dtype=object)
arr[:] = self.to_pylist()
return arr
Expand Down
152 changes: 147 additions & 5 deletions python/ray/air/util/tensor_extensions/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@
from packaging.version import parse as parse_version

from ray._private.utils import _get_pyarrow_version
from ray.air.constants import TENSOR_COLUMN_NAME
from ray.air.util.tensor_extensions.utils import (
_is_ndarray_tensor,
_is_ndarray_variable_shaped_tensor,
create_ragged_ndarray,
)
from ray.data._internal.util import GiB
from ray.util import log_once
from ray.util.annotations import DeveloperAPI, PublicAPI

PYARROW_VERSION = _get_pyarrow_version()
Expand Down Expand Up @@ -85,14 +89,152 @@ def pyarrow_table_from_pydict(
raise ArrowConversionError(str(pydict)) from e


@DeveloperAPI
def convert_list_to_pyarrow_array(
val: List[Any], enclosing_dict: Dict[str, Any]
@DeveloperAPI(stability="alpha")
def convert_to_pyarrow_array(column_values: np.ndarray, column_name: str) -> pa.Array:
"""Converts provided NumPy `ndarray` into PyArrow's `array` while utilizing
both Arrow's natively supported types as well as custom extension types:
- ArrowTensorArray (for tensors)
- ArrowPythonObjectArray (for user-defined python class objects, as well as
any python object that aren't represented by a corresponding Arrow's native
scalar type)
"""

try:
# Since Arrow does NOT support tensors (aka multidimensional arrays) natively,
# we have to make sure that we handle this case utilizing `ArrowTensorArray`
# extension type
if column_name == TENSOR_COLUMN_NAME or _is_ndarray_tensor(column_values):
from ray.data.extensions.tensor_extension import ArrowTensorArray

return ArrowTensorArray.from_numpy(column_values, column_name)
else:
return _convert_to_pyarrow_native_array(column_values, column_name)

except ArrowConversionError as ace:
from ray.data.extensions.object_extension import (
ArrowPythonObjectArray,
_object_extension_type_allowed,
)

if not _object_extension_type_allowed():
should_serialize_as_object_ext_type = False
object_ext_type_detail = (
"skipping fallback to serialize as pickled python"
f" objects (due to unsupported Arrow version {PYARROW_VERSION}, "
f"min required version is {MIN_PYARROW_VERSION_SCALAR_SUBCLASS})"
)
else:
from ray.data import DataContext

if not DataContext.get_current().enable_fallback_to_arrow_object_ext_type:
should_serialize_as_object_ext_type = False
object_ext_type_detail = (
"skipping fallback to serialize as pickled python objects "
"(due to DataContext.enable_fallback_to_arrow_object_ext_type "
"= False)"
)
else:
should_serialize_as_object_ext_type = True
object_ext_type_detail = (
"falling back to serialize as pickled python objects"
)

# NOTE: To avoid logging following warning for every block it's
# only going to be logged in following cases
# - When fallback is disabled, or
# - It's being logged for the first time
if not should_serialize_as_object_ext_type or log_once(
"_fallback_to_arrow_object_extension_type_warning"
):
logger.warning(
f"Failed to convert column '{column_name}' into pyarrow "
f"array due to: {ace}; {object_ext_type_detail}",
exc_info=ace,
)

# If `ArrowPythonObjectType` is not supported raise original exception
if not should_serialize_as_object_ext_type:
raise

# Otherwise, attempt to fall back to serialize as python objects
return ArrowPythonObjectArray.from_objects(column_values)


def _convert_to_pyarrow_native_array(
column_values: np.ndarray, column_name: str
) -> pa.Array:
"""Converts provided NumPy `ndarray` into PyArrow's `array` while only utilizing
Arrow's natively supported types (ie no custom extension types)"""

try:
return pa.array(val)
# NOTE: We explicitly infer PyArrow `DataType` so that
# we can perform upcasting to be able to accommodate
# blocks that are larger than 2Gb in size (limited
# by int32 offsets used by Arrow internally)
dtype = _infer_pyarrow_type(column_values)

logger.log(
logging.getLevelName("TRACE"),
f"Inferred dtype of '{dtype}' for column '{column_name}'",
)

return pa.array(column_values, type=dtype)
except Exception as e:
raise ArrowConversionError(str(enclosing_dict)) from e
raise ArrowConversionError(str(column_values)) from e


def _infer_pyarrow_type(column_values: np.ndarray) -> Optional[pa.DataType]:
"""Infers target Pyarrow `DataType` based on the provided
columnar values.
NOTE: This is a wrapper on top of `pa.infer_type(...)` utility
performing up-casting of `binary` and `string` types to
corresponding `large_binary` and `large_string` types in case
any of the array elements exceeds 2Gb in size therefore
making it impossible for original types to accommodate such
values.
Unfortunately, for unknown reasons PA doesn't perform
that upcasting itself henceforth we have to do perform
it manually
Args:
column_values: List of columnar values
Returns:
Instance of PyArrow's `DataType` based on the provided
column values
"""

if len(column_values) == 0:
return None

inferred_pa_dtype = pa.infer_type(column_values)

def _len_gt_2gb(obj: Any) -> bool:
# NOTE: This utility could be seeing objects other than strings or bytes in
# cases when column contains non-scalar non-homogeneous object types as
# column values, therefore making Arrow unable to infer corresponding
# column type appropriately, therefore falling back to assume the type
# of the first element in the list.
#
# Check out test cases for this method for an additional context.
if isinstance(obj, (str, bytes)):
return len(obj) > 2 * GiB

return False

if pa.types.is_binary(inferred_pa_dtype) and any(
[_len_gt_2gb(v) for v in column_values]
):
return pa.large_binary()
elif pa.types.is_string(inferred_pa_dtype) and any(
[_len_gt_2gb(v) for v in column_values]
):
return pa.large_string()

return inferred_pa_dtype


@DeveloperAPI
Expand Down
25 changes: 22 additions & 3 deletions python/ray/air/util/tensor_extensions/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,28 @@
from pandas.core.dtypes.generic import ABCSeries


def _is_ndarray_tensor(arr: np.ndarray) -> bool:
"""Return whether the provided NumPy ndarray is comprised of tensors.
NOTE: Tensor is defined as a NumPy array such that `len(arr.shape) > 1`
"""

# Case of uniform-shaped (ie non-ragged) tensor
if arr.ndim > 1:
return True

# Case of ragged tensor (as produced by `create_ragged_ndarray` utility)
elif (
arr.dtype.type is np.object_ and len(arr) > 0 and isinstance(arr[0], np.ndarray)
):
return True

return False


def _is_ndarray_variable_shaped_tensor(arr: np.ndarray) -> bool:
"""Return whether the provided NumPy ndarray is representing a variable-shaped
tensor.
"""Return whether the provided NumPy ndarray is comprised of variable-shaped
tensors.
NOTE: This is an O(rows) check.
"""
Expand Down Expand Up @@ -69,7 +88,7 @@ def _create_possibly_ragged_ndarray(


@PublicAPI(stability="alpha")
def create_ragged_ndarray(values: Sequence[np.ndarray]) -> np.ndarray:
def create_ragged_ndarray(values: Sequence[Any]) -> np.ndarray:
"""Create an array that contains arrays of different length
If you're working with variable-length arrays like images, use this function to
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ py_test(

py_test(
name = "test_binary",
size = "small",
size = "medium",
srcs = ["tests/test_binary.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
Expand Down
Loading

0 comments on commit b5934e5

Please sign in to comment.