Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dataset] implement from_spark, to_spark and some optimizations #17340

Merged
merged 24 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
afb1752
from arrow need a from bytes for java written data
kira-lin Jul 26, 2021
619404a
add bytes to block type
kira-lin Jul 29, 2021
0c8de45
owner address is needed for ensure locality
kira-lin Aug 17, 2021
0958dbe
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin Aug 17, 2021
cf9f4f4
use raydp provided functions
kira-lin Aug 17, 2021
68e109f
format
kira-lin Aug 18, 2021
dec202e
add a developer api for getting block locations
kira-lin Aug 23, 2021
93994c9
fix; add test
kira-lin Aug 27, 2021
48b99c1
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin Aug 27, 2021
407a7e6
fix
kira-lin Aug 27, 2021
0185788
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin Aug 30, 2021
6200f00
update
kira-lin Sep 2, 2021
6a27cac
format
kira-lin Sep 2, 2021
2bfc10a
add raydp in test dependency
kira-lin Sep 2, 2021
8a7e3bf
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin Sep 2, 2021
6f31d7e
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin Sep 6, 2021
b2b5a6f
get object locations in ray
kira-lin Sep 7, 2021
00272ca
move test location
kira-lin Sep 7, 2021
086f571
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin Sep 7, 2021
d99c7f4
move raydp dependency
kira-lin Sep 7, 2021
c4f62d2
passing only blocks and schema
kira-lin Sep 7, 2021
edfae6e
Update python/ray/data/read_api.py
kira-lin Sep 8, 2021
c1a59d2
minor updates
kira-lin Sep 8, 2021
2734119
lint
kira-lin Sep 9, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1720,6 +1720,12 @@ cdef class CoreWorker:
CCoreWorkerProcess.GetCoreWorker().RemoveLocalReference(
c_object_id)

def get_owner_address(self, ObjectRef object_ref):
cdef:
CObjectID c_object_id = object_ref.native()
return CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
c_object_id).SerializeAsString()
Comment on lines +1723 to +1727
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is added because locations of objects need to be passed to java, so that we can ensure locality in spark.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you calling this from RayDP? This isn't a public API, so we should call it only from within Ray code (and pass the addresses to external libraries).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment addressed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I added the developer api, I'm going to directly call it in to_spark now.


def serialize_and_promote_object_ref(self, ObjectRef object_ref):
cdef:
CObjectID c_object_id = object_ref.native()
Expand Down
6 changes: 5 additions & 1 deletion python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#
# Block data can be accessed in a uniform way via ``BlockAccessors`` such as
# ``SimpleBlockAccessor``, ``ArrowBlockAccessor``, and ``TensorBlockAccessor``.
Block = Union[List[T], np.ndarray, "pyarrow.Table"]
Block = Union[List[T], np.ndarray, "pyarrow.Table", bytes]


@DeveloperAPI
Expand Down Expand Up @@ -124,6 +124,10 @@ def for_block(block: Block) -> "BlockAccessor[T]":
from ray.data.impl.arrow_block import \
ArrowBlockAccessor
return ArrowBlockAccessor(block)
elif isinstance(block, bytes):
from ray.data.impl.arrow_block import \
ArrowBlockAccessor
return ArrowBlockAccessor.from_bytes(block)
elif isinstance(block, list):
from ray.data.impl.simple_block import \
SimpleBlockAccessor
Expand Down
12 changes: 10 additions & 2 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1257,15 +1257,23 @@ def to_modin(self) -> "modin.DataFrame":
pd_objs = self.to_pandas()
return from_partitions(pd_objs, axis=0)

def to_spark(self) -> "pyspark.sql.DataFrame":
def to_spark(self,
spark: "pyspark.sql.SparkSession") -> "pyspark.sql.DataFrame":
"""Convert this dataset into a Spark dataframe.

Time complexity: O(dataset size / parallelism)

Returns:
A Spark dataframe created from this dataset.
"""
raise NotImplementedError # P2
import raydp
core_worker = ray.worker.global_worker.core_worker
locations = [
core_worker.get_owner_address(block)
for block in self.get_blocks()
]
return raydp.spark.ray_dataset_to_spark_dataframe(
spark, self.schema(), self.get_blocks(), locations)

def to_pandas(self) -> List[ObjectRef["pandas.DataFrame"]]:
"""Convert this dataset into a distributed set of Pandas dataframes.
Expand Down
5 changes: 5 additions & 0 deletions python/ray/data/impl/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ def __init__(self, table: "pyarrow.Table"):
raise ImportError("Run `pip install pyarrow` for Arrow support")
self._table = table

@classmethod
def from_bytes(cls, data: bytes):
reader = pyarrow.ipc.open_stream(data)
return cls(reader.read_all())

def iter_rows(self) -> Iterator[ArrowRow]:
outer = self

Expand Down
18 changes: 12 additions & 6 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,6 @@ def from_pandas(dfs: List[ObjectRef["pandas.DataFrame"]]) -> Dataset[ArrowRow]:
return Dataset(BlockList(blocks, ray.get(list(metadata))))


@PublicAPI(stability="beta")
def from_numpy(ndarrays: List[ObjectRef[np.ndarray]]) -> Dataset[np.ndarray]:
"""Create a dataset from a set of NumPy ndarrays.

Expand All @@ -490,34 +489,41 @@ def from_numpy(ndarrays: List[ObjectRef[np.ndarray]]) -> Dataset[np.ndarray]:


@PublicAPI(stability="beta")
def from_arrow(tables: List[ObjectRef["pyarrow.Table"]]) -> Dataset[ArrowRow]:
def from_arrow(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]],
*,
parallelism: int = 200) -> Dataset[ArrowRow]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove? This doesn't seem to be used.

"""Create a dataset from a set of Arrow tables.

Args:
dfs: A list of Ray object references to Arrow tables.
tables: A list of Ray object references to Arrow tables,
or its streaming format in bytes.
parallelism: The amount of parallelism to use for the dataset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove?


Returns:
Dataset holding Arrow records from the tables.
"""

get_metadata = cached_remote_fn(_get_metadata)
metadata = [get_metadata.remote(t) for t in tables]
return Dataset(BlockList(tables, ray.get(metadata)))


@PublicAPI(stability="beta")
def from_spark(df: "pyspark.sql.DataFrame", *,
parallelism: int = 200) -> Dataset[ArrowRow]:
parallelism: int = 0) -> Dataset[ArrowRow]:
"""Create a dataset from a Spark dataframe.

Args:
spark: A SparkSession, which must be created by RayDP (Spark-on-Ray).
df: A Spark dataframe, which must be created by RayDP (Spark-on-Ray).
parallelism: The amount of parallelism to use for the dataset.
If not provided, it will be equal to the number of partitions of the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent

original Spark dataframe.

Returns:
Dataset holding Arrow records read from the dataframe.
"""
raise NotImplementedError # P2
import raydp
return raydp.spark.spark_dataframe_to_ray_dataset(df, parallelism)


def _df_to_block(df: "pandas.DataFrame") -> Block[ArrowRow]:
Expand Down
44 changes: 44 additions & 0 deletions python/ray/data/tests/test_raydp_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import pytest
import ray
import raydp


@pytest.fixture(scope="function")
def spark_on_ray_small(request):
ray.init(num_cpus=2, include_dashboard=False)
spark = raydp.init_spark("test", 1, 1, "500 M")

def stop_all():
raydp.stop_spark()
ray.shutdown()

request.addfinalizer(stop_all)
return spark


def test_raydp_roundtrip(spark_on_ray_small):
spark = spark_on_ray_small
spark_df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")],
["one", "two"])
rows = [(r.one, r.two) for r in spark_df.take(3)]
ds = ray.data.from_spark(spark_df)
values = [(r["one"], r["two"]) for r in ds.take(6)]
assert values == rows
df = ds.to_spark(spark)
rows_2 = [(r.one, r.two) for r in df.take(3)]
assert values == rows_2


def test_raydp_to_spark(spark_on_ray_small):
spark = spark_on_ray_small
n = 5
ds = ray.data.range_arrow(n)
values = [r["value"] for r in ds.take(5)]
df = ds.to_spark(spark)
rows = [r.value for r in df.take(5)]
assert values == rows


if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))
1 change: 1 addition & 0 deletions python/requirements/data_processing/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ s3fs
modin>=0.8.3; python_version < '3.7'
modin>=0.10.0; python_version >= '3.7'
pytest-repeat
raydp-nightly