Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dataset] implement from_spark, to_spark and some optimizations #17340

Merged
merged 24 commits into from
Sep 9, 2021

Conversation

kira-lin
Copy link
Contributor

Why are these changes needed?

@ericl Thanks for adding from_arrow, which makes from_spark much easier and faster. It currently only accepts List[ObjectRef[Pyarrow.Table]], however, in RayDP, as data is written to ray in java, the type of ObjectRef is bytes. It can be read to pyarrow.Table by s=pyarrow.ipc.open_stream(data); tb=s.read_all(). I added this to for_block function.

Related issue number

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@kira-lin
Copy link
Contributor Author

Besides, we still have some overhead after data is written to Ray, due to getting metadata. I have a few questions:

  1. We store schema in metadata for each block. But a spark dataframe has a uniform schema. Is this necessary? Similarly for input_files.
  2. In the current implementation, we need to ray.get all of the blocks just to get the metadata. This is probably not ideal. In some cases, it might not make difference because anyway data needs to be fetched eventually. But if the dataset is huge, it might cause too much memory pressure.

@ericl ericl self-assigned this Jul 26, 2021
@ericl
Copy link
Contributor

ericl commented Jul 26, 2021

We store schema in metadata for each block. But a spark dataframe has a uniform schema. Is this necessary? Similarly for input_files.

Yeah it's a good question. One advantage of keeping input files per block is that if the dataset is split, we can still show input files per split accuratley. Also, it avoids the need for both global and per-block metadata. But open to adding global metadata if storing per-block is overly redundant.

In the current implementation, we need to ray.get all of the blocks just to get the metadata. This is probably not ideal. In some cases, it might not make difference because anyway data needs to be fetched eventually. But if the dataset is huge, it might cause too much memory pressure.

Note that you only need to ray.get() the metadata, which is quite small. The actual blocks do not need to be fetched.

@@ -112,6 +112,10 @@ def for_block(block: Block) -> "BlockAccessor[T]":
from ray.experimental.data.impl.arrow_block import \
ArrowBlockAccessor
return ArrowBlockAccessor(block)
elif isinstance(block, bytes):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also change the definition of the Block type to include bytes?

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jul 26, 2021
@kira-lin
Copy link
Contributor Author

Note that you only need to ray.get() the metadata, which is quite small. The actual blocks do not need to be fetched.

Yes, only metadata is collected to one node. But the data would be made local on their own nodes, right? In this part no communication is involved, but how about time and memory overhead? Is it omittable?

If not, maybe we can use a whole arrow table for the metadata, and this can be written in java.

@ericl
Copy link
Contributor

ericl commented Jul 27, 2021

Hmm you could always leave the metadata blank. It's preferable to fill it out if possible, but maybe that can be an option if it incurs extra performance overhead.

@ericl ericl added this to the Datasets Beta milestone Jul 30, 2021
@kira-lin
Copy link
Contributor Author

kira-lin commented Aug 4, 2021

Hi @ericl , I've been benchmarking our udf to_spark implementation. And I have a few problems:

  1. logging/printing in the udf seems buggy. We need to connect to the ray cluster in the udf function(which is executed in other processes) by either ray.init or ray.client. They both print a single line of log too much times. But I was able to bypass this bug by writing information to a named actor.
  2. Our current approach for to_spark is to first create a spark dataframe, which contains the object ref in ray, and map this dataframe using a udf where the object is retrieved and converted. You can refer to here. The problem is when we use ray.cloudpickle.dumps to serialize the object ref so that we can put them into a dataframe(and be able to retrieve it in udf), it seems like the address of the object changes to the node where cloudpickle.dumps is run. Is it right? We want to have as little remote read as possible. How to solve this problem?

@ericl
Copy link
Contributor

ericl commented Aug 4, 2021

@stephanie-wang @rkooo567 any suggestions on how to handle the to_spark() issue above with serializing the object ref?

@rkooo567
Copy link
Contributor

rkooo567 commented Aug 4, 2021

it seems like the address of the object changes to the node where cloudpickle.dumps is run.

Are you saying the object is transferred to the node where it calls cloudpickle.dumps?

@kira-lin
Copy link
Contributor Author

kira-lin commented Aug 5, 2021

sorry, it turns out my timing went wrong 😭

@ericl
Copy link
Contributor

ericl commented Aug 13, 2021

@kira-lin sounds like the issue was resolved, is that right? Any other blocking issues?

@kira-lin
Copy link
Contributor Author

Hi @ericl , we were optimizing to_spark's performance during these weeks. One problem is if block type is pyarrow.Table, we cannot fetch it in java directly, and need to first fetch in python then send to jvm through udf. This is not optimal. We want to be able to directly fetch the data in java. That means block type should be arrow streaming format(bytes). For now we can have these two implementations for two types. What do you think?

@ericl
Copy link
Contributor

ericl commented Aug 16, 2021 via email

@kira-lin
Copy link
Contributor Author

kira-lin commented Aug 16, 2021

Would you just add a case for 'bytes' to BlockAccessor.for_block()?

Sure.

I'm wondering if it makes sense to always use the cross language format and not pyarrow.Table at all

I think having these two types might confuse users. But I'm not sure about the overhead now. I'm wondering if it is necessary to use cross language format. Does Plasma have any optimization for such cases?

@ericl
Copy link
Contributor

ericl commented Aug 16, 2021 via email

@kira-lin
Copy link
Contributor Author

Yeah, what I wanted to say is that using streaming format(bytes) introduces some overhead. If we can directly read/write the arrow standard columnar format with Plasma, then no conversions are needed.

@ericl
Copy link
Contributor

ericl commented Aug 16, 2021 via email

@kira-lin
Copy link
Contributor Author

I'm not sure, but Spark uses streaming format to communicate between python and jvm. I guess we'll just use this for now, and update whenever optimization is possible.

@kira-lin kira-lin marked this pull request as ready for review August 17, 2021 08:33
Comment on lines +1670 to +1674
def get_owner_address(self, ObjectRef object_ref):
cdef:
CObjectID c_object_id = object_ref.native()
return CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
c_object_id).SerializeAsString()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is added because locations of objects need to be passed to java, so that we can ensure locality in spark.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you calling this from RayDP? This isn't a public API, so we should call it only from within Ray code (and pass the addresses to external libraries).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment addressed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I added the developer api, I'm going to directly call it in to_spark now.

@kira-lin
Copy link
Contributor Author

kira-lin commented Aug 19, 2021

Hi @ericl , I'm wondering what happens when pyarrow.Table is put into Ray object store? Normally objects are serialized, but how does pyarrow.Table get serialized? Because from my understanding, serializing an arrow Table/Record Batch is just to write to IPC stream/file format(this is what pyarrow.RecordBatch.serialize do though it is now deprecated). In other words, the block type of the dataset is still pyarrow.Table, but we use arrow's serialization instead of ray's.

Maybe we should create a plasma buffer and use StreamWriter to write into it? I'm not sure whether this is better than just writing to bytes.

BTW, as RecordBatch is for contiguous memory, I guess we should use it instead of Table for each block?

@ericl
Copy link
Contributor

ericl commented Aug 19, 2021

In other words, the block type of the dataset is still pyarrow.Table, but we use arrow's serialization instead of ray's.

There's no such thing as "Ray vs Arrow serialization" here; Ray is supporting pickle5 serialization natively. Hence we use the serializer defined by pyarrow.Table and it writes to Ray shared memory via the pickle5 protocol. Hence, the pickling of pyarrow.Table is already optimal in performance.

BTW, as RecordBatch is for contiguous memory, I guess we should use it instead of Table for each block?

A table is just a convenience wrapper around a list of RecordBatches, it's the same thing.

@kira-lin
Copy link
Contributor Author

kira-lin commented Sep 6, 2021

@ericl I'm not sure whether the tests I added get tested, can you please take a look? Where should I install raydp for testing?

@@ -59,6 +59,8 @@ moto
mypy
networkx
numba
raydp-nightly; platform_system != "Windows"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clarkzinzow @iycheng could you provide the right place to add this for dataset tests?

@ericl
Copy link
Contributor

ericl commented Sep 7, 2021

//python/ray/data:tests/test_raydp_dataset PASSED in 35.5s

Seems it worked?

@kira-lin
Copy link
Contributor Author

kira-lin commented Sep 7, 2021

Seems it worked?

Yes, I just moved raydp to /python/requirements/data_processing/requirements.txt, which is installed only in that test. Hope it still works.

df: A Spark dataframe, which must be created by RayDP (Spark-on-Ray).
parallelism: The amount of parallelism to use for the dataset.
If not provided, it will be equal to the number of partitions of the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent

def from_arrow(tables: List[ObjectRef["pyarrow.Table"]]) -> Dataset[ArrowRow]:
def from_arrow(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]],
*,
parallelism: int = 200) -> Dataset[ArrowRow]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove? This doesn't seem to be used.

dfs: A list of Ray object references to Arrow tables.
tables: A list of Ray object references to Arrow tables,
or its streaming format in bytes.
parallelism: The amount of parallelism to use for the dataset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove?

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with minor comments.

kira-lin and others added 2 commits September 8, 2021 10:29
Co-authored-by: Eric Liang <ekhliang@gmail.com>
@ericl
Copy link
Contributor

ericl commented Sep 8, 2021

def spark_dataframe_to_ray_dataset(df: sql.DataFrame, parallelism: int = 0):
num_part = df.rdd.getNumPartitions()
if parallelism in (0, num_part):
pass

  elif parallelism < num_part:

E TypeError: '<' not supported between instances of 'NoneType' and 'int'

@ericl
Copy link
Contributor

ericl commented Sep 8, 2021

Also LINT error.

@kira-lin
Copy link
Contributor Author

kira-lin commented Sep 9, 2021

All fixed, @ericl

@ericl ericl merged commit 2fcd1bc into ray-project:master Sep 9, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. size-medium
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants