-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Dataset] implement from_spark
, to_spark
and some optimizations
#17340
Conversation
Besides, we still have some overhead after data is written to Ray, due to getting metadata. I have a few questions:
|
Yeah it's a good question. One advantage of keeping input files per block is that if the dataset is split, we can still show input files per split accuratley. Also, it avoids the need for both global and per-block metadata. But open to adding global metadata if storing per-block is overly redundant.
Note that you only need to ray.get() the metadata, which is quite small. The actual blocks do not need to be fetched. |
@@ -112,6 +112,10 @@ def for_block(block: Block) -> "BlockAccessor[T]": | |||
from ray.experimental.data.impl.arrow_block import \ | |||
ArrowBlockAccessor | |||
return ArrowBlockAccessor(block) | |||
elif isinstance(block, bytes): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also change the definition of the Block
type to include bytes?
Yes, only metadata is collected to one node. But the data would be made local on their own nodes, right? In this part no communication is involved, but how about time and memory overhead? Is it omittable? If not, maybe we can use a whole arrow table for the metadata, and this can be written in java. |
Hmm you could always leave the metadata blank. It's preferable to fill it out if possible, but maybe that can be an option if it incurs extra performance overhead. |
Hi @ericl , I've been benchmarking our udf
|
@stephanie-wang @rkooo567 any suggestions on how to handle the to_spark() issue above with serializing the object ref? |
Are you saying the object is transferred to the node where it calls cloudpickle.dumps? |
sorry, it turns out my timing went wrong 😭 |
@kira-lin sounds like the issue was resolved, is that right? Any other blocking issues? |
Hi @ericl , we were optimizing |
I see, that makes sense. Would you just add a case for 'bytes' to
BlockAccessor.for_block()?
I'm wondering if it makes sense to always use the cross language format and
not pyarrow.Table at all
…On Sun, Aug 15, 2021, 6:26 PM Zhi Lin ***@***.***> wrote:
Hi @ericl <https://github.com/ericl> , we were optimizing to_spark's
performance during these weeks. One problem is if block type is
pyarrow.Table, we cannot fetch it in java directly, and need to first
fetch in python then send to jvm through udf. This is not optimal. We want
to be able to directly fetch the data in java. That means block type should
be arrow streaming format(bytes). For now we can have these two
implementations for two types. What do you think?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#17340 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAADUSW5KQH7N57Q2L7TYMLT5BSNRANCNFSM5A7ZGCQQ>
.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&utm_campaign=notification-email>
.
|
Sure.
I think having these two types might confuse users. But I'm not sure about the overhead now. I'm wondering if it is necessary to use cross language format. Does Plasma have any optimization for such cases? |
By cross language format, I just meant always using the bytes format.
…On Sun, Aug 15, 2021, 8:26 PM Zhi Lin ***@***.***> wrote:
Would you just add a case for 'bytes' to BlockAccessor.for_block()?
Sure.
I'm wondering if it makes sense to always use the cross language format
and not pyarrow.Table at all
I think having these two types might confuse users. But I'm not sure about
the overhead now. I'm wondering if it is necessary to use cross language
format. Does Plasma have any optimization for such cases?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#17340 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAADUSQ27Y4ONZ6OYM6MJA3T5CANZANCNFSM5A7ZGCQQ>
.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&utm_campaign=notification-email>
.
|
Yeah, what I wanted to say is that using streaming format(bytes) introduces some overhead. If we can directly read/write the arrow standard columnar format with Plasma, then no conversions are needed. |
I see, why not use the random access format then? There should be no
overhead with that one.
…On Mon, Aug 16, 2021, 2:51 AM Zhi Lin ***@***.***> wrote:
Yeah, what I wanted to say is that using streaming format(bytes)
introduces some overhead. If we can directly read/write the arrow standard
columnar format with Plasma, then no conversions are needed.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#17340 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAADUSVQLZWFAADQKJNP7ADT5DNR5ANCNFSM5A7ZGCQQ>
.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&utm_campaign=notification-email>
.
|
I'm not sure, but Spark uses streaming format to communicate between python and jvm. I guess we'll just use this for now, and update whenever optimization is possible. |
def get_owner_address(self, ObjectRef object_ref): | ||
cdef: | ||
CObjectID c_object_id = object_ref.native() | ||
return CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress( | ||
c_object_id).SerializeAsString() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is added because locations of objects need to be passed to java, so that we can ensure locality in spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you calling this from RayDP? This isn't a public API, so we should call it only from within Ray code (and pass the addresses to external libraries).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment addressed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well I added the developer api, I'm going to directly call it in to_spark
now.
Hi @ericl , I'm wondering what happens when Maybe we should create a plasma buffer and use StreamWriter to write into it? I'm not sure whether this is better than just writing to bytes. BTW, as RecordBatch is for contiguous memory, I guess we should use it instead of Table for each block? |
There's no such thing as "Ray vs Arrow serialization" here; Ray is supporting pickle5 serialization natively. Hence we use the serializer defined by pyarrow.Table and it writes to Ray shared memory via the pickle5 protocol. Hence, the pickling of pyarrow.Table is already optimal in performance.
A table is just a convenience wrapper around a list of RecordBatches, it's the same thing. |
@ericl I'm not sure whether the tests I added get tested, can you please take a look? Where should I install raydp for testing? |
python/requirements.txt
Outdated
@@ -59,6 +59,8 @@ moto | |||
mypy | |||
networkx | |||
numba | |||
raydp-nightly; platform_system != "Windows" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clarkzinzow @iycheng could you provide the right place to add this for dataset tests?
//python/ray/data:tests/test_raydp_dataset PASSED in 35.5s Seems it worked? |
Yes, I just moved raydp to /python/requirements/data_processing/requirements.txt, which is installed only in that test. Hope it still works. |
python/ray/data/read_api.py
Outdated
df: A Spark dataframe, which must be created by RayDP (Spark-on-Ray). | ||
parallelism: The amount of parallelism to use for the dataset. | ||
If not provided, it will be equal to the number of partitions of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent
python/ray/data/read_api.py
Outdated
def from_arrow(tables: List[ObjectRef["pyarrow.Table"]]) -> Dataset[ArrowRow]: | ||
def from_arrow(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]], | ||
*, | ||
parallelism: int = 200) -> Dataset[ArrowRow]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove? This doesn't seem to be used.
python/ray/data/read_api.py
Outdated
dfs: A list of Ray object references to Arrow tables. | ||
tables: A list of Ray object references to Arrow tables, | ||
or its streaming format in bytes. | ||
parallelism: The amount of parallelism to use for the dataset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with minor comments.
Co-authored-by: Eric Liang <ekhliang@gmail.com>
def spark_dataframe_to_ray_dataset(df: sql.DataFrame, parallelism: int = 0):
E TypeError: '<' not supported between instances of 'NoneType' and 'int' |
Also LINT error. |
All fixed, @ericl |
Why are these changes needed?
@ericl Thanks for adding
from_arrow
, which makesfrom_spark
much easier and faster. It currently only acceptsList[ObjectRef[Pyarrow.Table]]
, however, in RayDP, as data is written to ray in java, the type of ObjectRef isbytes
. It can be read topyarrow.Table
bys=pyarrow.ipc.open_stream(data); tb=s.read_all()
. I added this tofor_block
function.Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.