-
Notifications
You must be signed in to change notification settings - Fork 75
Add initial support for Ray dataset #166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@ericl, @wuisawesome , do you have any suggestion how to write an ArrowBlock to object store in Java? Currently what we can do is to write a |
@carsonwang I filed this PR to support that, would it work for your use case? ray-project/ray#17186 For now, maybe making a memory copy is the way to go. We can add this as a TODO. |
@ericl , thanks a lot! That will be very helpful. |
implements rdd to ensure locality
#TODO how to branch on type of block? | ||
sample = ray.get(blocks[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericl , how to branch on type of block? Maybe we can save it in metadata or blocklist class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should guarantee the block type in the dataset code, or pass in the type explicitly in the call here.
driver_cp_key = "spark.driver.extraClassPath" | ||
driver_cp = ":".join(glob.glob(RAYDP_CP) + glob.glob(RAY_CP)) | ||
if driver_cp_key in extra_conf: | ||
extra_conf[driver_cp_key] = driver_cp + ":" + extra_conf[driver_cp_key] | ||
else: | ||
extra_conf[driver_cp] = ":".join(glob.glob(RAYDP_CP)) | ||
extra_conf[driver_cp_key] = driver_cp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to add this RAY_CP to spark's driver extra classpath because serialized owner_address is needed to register ownership in java, but we need to parse it to extract ip_address to use in spark. This might not be needed once we use cross language call to start our RayAppMaster, in that case we don't need to register ownership(?)
python/raydp/spark/dataset.py
Outdated
locations = [] | ||
for block in blocks: | ||
# address is needed for locality | ||
locations.append(ray.worker.global_worker.core_worker.get_owner_address(block)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will get_owner_address
return multiple locations if the block is stored in more than one node?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't a public API of Ray. If you really need it, you should call it from ray/data code and pass in the addresses here as an optional hint.
That way raydp isn't depending on private Ray APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a developer api to ray. For now addresses are needed to register ownership in java.
python/raydp/spark/dataset.py
Outdated
blocks_df = spark.createDataFrame(ref_list, schema) | ||
return blocks_df.mapInPandas(_convert_blocks_to_dataframe, schema) | ||
|
||
def _convert_by_rdd(spark: sql.SparkSession, blocks: Dataset, schema: StructType) -> DataFrame: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is blocks a List[ObjectRef]
instead of Dataset
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I called get_blocks on dataset. Actually it should be List[ObjectRef[Block]]
.
words_df = spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word']) | ||
ds = raydp.spark.spark_dataframe_to_ray_dataset(words_df) | ||
df = raydp.spark.ray_dataset_to_spark_dataframe(spark, ds) | ||
assert words_df.toPandas().equals(df.toPandas()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a corresponding unit test on the Ray side that it isn't breaking raydp?
dfs.append(data.to_pandas()) | ||
yield pd.concat(dfs) | ||
|
||
def _convert_by_udf(spark: sql.SparkSession, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When converting by UDF, we also want to utilize the customized the RDD so that we can have data locality.
@@ -61,6 +61,12 @@ def test_spark_driver_and_executor_hostname(spark_on_ray_small): | |||
driver_bind_address = conf.get("spark.driver.bindAddress") | |||
assert node_ip_address == driver_bind_address | |||
|
|||
def test_ray_dataset_from_and_to_spark(spark_on_ray_small): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add a test by creating a Ray Dataset directly and then convert to Spark dataframe?
python/raydp/spark/dataset.py
Outdated
schema: StructType) -> DataFrame: | ||
s = StructType([StructField("ref", BinaryType(), False)]) | ||
ref_list = [(ray.cloudpickle.dumps(block),) for block in blocks] | ||
blocks_df = spark.createDataFrame(ref_list, schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we first create a RDD for ref_list here by solving the locality issue and specifying the correct partition number?
The two PRs are depending on each other to pass the test, and thus is a deadlock. I have tested it in my local environment, and the newly added tests are also in the ray pr, thus will be tested there. I'm merging this now, and update later if needed. |
ObjectId id = new ObjectId(obj); | ||
ObjectRefImpl<T> ref = new ObjectRefImpl<>(id, clazz); | ||
((RayRuntimeInternal) Ray.internal()).getObjectStore() | ||
.registerOwnershipInfoAndResolveFuture(id, null, ownerAddress); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
registerOwnershipInfoAndResolveFuture
is invoked in the constructor of ObjectRefImpl. I don't think we should add it any more.
This PR has to use ray nightly