Skip to content

Add initial support for Ray dataset #166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Sep 2, 2021

Conversation

kira-lin
Copy link
Collaborator

@kira-lin kira-lin commented Jul 9, 2021

This PR has to use ray nightly

@carsonwang
Copy link
Collaborator

@ericl, @wuisawesome , do you have any suggestion how to write an ArrowBlock to object store in Java? Currently what we can do is to write a pyarrow.table to object store in java. If we can not easily write an ArrowBlock to object store in Java, is it possible to change the definition of ArrowBlock to use a ref to pyarrow.table? So that both python and java can write a pyarrow.table to object store and then create an ArrowBlock.

@ericl
Copy link

ericl commented Jul 19, 2021

@carsonwang I filed this PR to support that, would it work for your use case? ray-project/ray#17186

For now, maybe making a memory copy is the way to go. We can add this as a TODO.

@carsonwang
Copy link
Collaborator

@ericl , thanks a lot! That will be very helpful.

Comment on lines +506 to +507
#TODO how to branch on type of block?
sample = ray.get(blocks[0])
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl , how to branch on type of block? Maybe we can save it in metadata or blocklist class?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should guarantee the block type in the dataset code, or pass in the type explicitly in the call here.

Comment on lines +70 to +75
driver_cp_key = "spark.driver.extraClassPath"
driver_cp = ":".join(glob.glob(RAYDP_CP) + glob.glob(RAY_CP))
if driver_cp_key in extra_conf:
extra_conf[driver_cp_key] = driver_cp + ":" + extra_conf[driver_cp_key]
else:
extra_conf[driver_cp] = ":".join(glob.glob(RAYDP_CP))
extra_conf[driver_cp_key] = driver_cp
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add this RAY_CP to spark's driver extra classpath because serialized owner_address is needed to register ownership in java, but we need to parse it to extract ip_address to use in spark. This might not be needed once we use cross language call to start our RayAppMaster, in that case we don't need to register ownership(?)

locations = []
for block in blocks:
# address is needed for locality
locations.append(ray.worker.global_worker.core_worker.get_owner_address(block))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will get_owner_address return multiple locations if the block is stored in more than one node?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a public API of Ray. If you really need it, you should call it from ray/data code and pass in the addresses here as an optional hint.

That way raydp isn't depending on private Ray APIs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a developer api to ray. For now addresses are needed to register ownership in java.

blocks_df = spark.createDataFrame(ref_list, schema)
return blocks_df.mapInPandas(_convert_blocks_to_dataframe, schema)

def _convert_by_rdd(spark: sql.SparkSession, blocks: Dataset, schema: StructType) -> DataFrame:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is blocks a List[ObjectRef] instead of Dataset?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I called get_blocks on dataset. Actually it should be List[ObjectRef[Block]].

words_df = spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
ds = raydp.spark.spark_dataframe_to_ray_dataset(words_df)
df = raydp.spark.ray_dataset_to_spark_dataframe(spark, ds)
assert words_df.toPandas().equals(df.toPandas())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a corresponding unit test on the Ray side that it isn't breaking raydp?

dfs.append(data.to_pandas())
yield pd.concat(dfs)

def _convert_by_udf(spark: sql.SparkSession,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When converting by UDF, we also want to utilize the customized the RDD so that we can have data locality.

@@ -61,6 +61,12 @@ def test_spark_driver_and_executor_hostname(spark_on_ray_small):
driver_bind_address = conf.get("spark.driver.bindAddress")
assert node_ip_address == driver_bind_address

def test_ray_dataset_from_and_to_spark(spark_on_ray_small):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a test by creating a Ray Dataset directly and then convert to Spark dataframe?

schema: StructType) -> DataFrame:
s = StructType([StructField("ref", BinaryType(), False)])
ref_list = [(ray.cloudpickle.dumps(block),) for block in blocks]
blocks_df = spark.createDataFrame(ref_list, schema)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we first create a RDD for ref_list here by solving the locality issue and specifying the correct partition number?

@kira-lin
Copy link
Collaborator Author

kira-lin commented Sep 2, 2021

The two PRs are depending on each other to pass the test, and thus is a deadlock. I have tested it in my local environment, and the newly added tests are also in the ray pr, thus will be tested there. I'm merging this now, and update later if needed.

@kira-lin kira-lin merged commit c97e68e into oap-project:master Sep 2, 2021
@kira-lin kira-lin deleted the ray-dataset branch September 2, 2021 07:38
ObjectId id = new ObjectId(obj);
ObjectRefImpl<T> ref = new ObjectRefImpl<>(id, clazz);
((RayRuntimeInternal) Ray.internal()).getObjectStore()
.registerOwnershipInfoAndResolveFuture(id, null, ownerAddress);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

registerOwnershipInfoAndResolveFuture is invoked in the constructor of ObjectRefImpl. I don't think we should add it any more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants