Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dataset] implement from_spark, to_spark and some optimizations #17340

Merged
merged 24 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
afb1752
from arrow need a from bytes for java written data
kira-lin Jul 26, 2021
619404a
add bytes to block type
kira-lin Jul 29, 2021
0c8de45
owner address is needed for ensure locality
kira-lin Aug 17, 2021
0958dbe
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin Aug 17, 2021
cf9f4f4
use raydp provided functions
kira-lin Aug 17, 2021
68e109f
format
kira-lin Aug 18, 2021
dec202e
add a developer api for getting block locations
kira-lin Aug 23, 2021
93994c9
fix; add test
kira-lin Aug 27, 2021
48b99c1
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin Aug 27, 2021
407a7e6
fix
kira-lin Aug 27, 2021
0185788
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin Aug 30, 2021
6200f00
update
kira-lin Sep 2, 2021
6a27cac
format
kira-lin Sep 2, 2021
2bfc10a
add raydp in test dependency
kira-lin Sep 2, 2021
8a7e3bf
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin Sep 2, 2021
6f31d7e
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin Sep 6, 2021
b2b5a6f
get object locations in ray
kira-lin Sep 7, 2021
00272ca
move test location
kira-lin Sep 7, 2021
086f571
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin Sep 7, 2021
d99c7f4
move raydp dependency
kira-lin Sep 7, 2021
c4f62d2
passing only blocks and schema
kira-lin Sep 7, 2021
edfae6e
Update python/ray/data/read_api.py
kira-lin Sep 8, 2021
c1a59d2
minor updates
kira-lin Sep 8, 2021
2734119
lint
kira-lin Sep 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,12 @@ def to_spark(self,
A Spark dataframe created from this dataset.
"""
import raydp
return raydp.spark.ray_dataset_to_spark_dataframe(spark, self)
core_worker = ray.worker.global_worker.core_worker
locations = [
core_worker.get_owner_address(block)
for block in self.get_blocks()
]
return raydp.spark.ray_dataset_to_spark_dataframe(spark, self, locations)

def to_pandas(self) -> List[ObjectRef["pandas.DataFrame"]]:
"""Convert this dataset into a distributed set of Pandas dataframes.
Expand Down Expand Up @@ -1460,14 +1465,6 @@ def get_blocks(self) -> List[ObjectRef[Block]]:
"""
return list(self._blocks)

@DeveloperAPI
def get_block_locations(self) -> List[bytes]:
core_worker = ray.worker.global_worker.core_worker
return [
core_worker.get_owner_address(block)
for block in self.get_blocks()
]

def _split(self, index: int,
return_right_half: bool) -> ("Dataset[T]", "Dataset[T]"):
get_num_rows = cached_remote_fn(_get_num_rows)
Expand Down