-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Dataset] implement from_spark
, to_spark
and some optimizations
#17340
Merged
Merged
Changes from 6 commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
afb1752
from arrow need a from bytes for java written data
kira-lin 619404a
add bytes to block type
kira-lin 0c8de45
owner address is needed for ensure locality
kira-lin 0958dbe
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin cf9f4f4
use raydp provided functions
kira-lin 68e109f
format
kira-lin dec202e
add a developer api for getting block locations
kira-lin 93994c9
fix; add test
kira-lin 48b99c1
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin 407a7e6
fix
kira-lin 0185788
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin 6200f00
update
kira-lin 6a27cac
format
kira-lin 2bfc10a
add raydp in test dependency
kira-lin 8a7e3bf
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin 6f31d7e
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin b2b5a6f
get object locations in ray
kira-lin 00272ca
move test location
kira-lin 086f571
Merge remote-tracking branch 'upstream/master' into fix-from-arrow
kira-lin d99c7f4
move raydp dependency
kira-lin c4f62d2
passing only blocks and schema
kira-lin edfae6e
Update python/ray/data/read_api.py
kira-lin c1a59d2
minor updates
kira-lin 2734119
lint
kira-lin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -477,13 +477,14 @@ def from_pandas(dfs: List[ObjectRef["pandas.DataFrame"]], | |
|
||
|
||
@PublicAPI(stability="beta") | ||
def from_arrow(tables: List[ObjectRef["pyarrow.Table"]], | ||
def from_arrow(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]], | ||
*, | ||
parallelism: int = 200) -> Dataset[ArrowRow]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove? This doesn't seem to be used. |
||
"""Create a dataset from a set of Arrow tables. | ||
|
||
Args: | ||
dfs: A list of Ray object references to Arrow tables. | ||
tables: A list of Ray object references to Arrow tables, | ||
or its streaming format in bytes. | ||
parallelism: The amount of parallelism to use for the dataset. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove? |
||
|
||
Returns: | ||
|
@@ -501,13 +502,15 @@ def from_spark(df: "pyspark.sql.DataFrame", *, | |
"""Create a dataset from a Spark dataframe. | ||
|
||
Args: | ||
spark: A SparkSession, which must be created by RayDP (Spark-on-Ray). | ||
df: A Spark dataframe, which must be created by RayDP (Spark-on-Ray). | ||
parallelism: The amount of parallelism to use for the dataset. | ||
|
||
Returns: | ||
Dataset holding Arrow records read from the dataframe. | ||
""" | ||
raise NotImplementedError # P2 | ||
import raydp | ||
return raydp.spark.spark_dataframe_to_ray_dataset(df, parallelism) | ||
|
||
|
||
def _df_to_block(df: "pandas.DataFrame") -> Block[ArrowRow]: | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is added because locations of objects need to be passed to java, so that we can ensure locality in spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you calling this from RayDP? This isn't a public API, so we should call it only from within Ray code (and pass the addresses to external libraries).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment addressed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well I added the developer api, I'm going to directly call it in
to_spark
now.