Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DataFrame] Handling zero length partitions #2210

Merged

Conversation

kunalgosar
Copy link
Contributor

@kunalgosar kunalgosar commented Jun 8, 2018

This filters out zero length partitions on getting col_partitions or row_partitions.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5930/
Test FAILed.

@kunalgosar kunalgosar force-pushed the zero_length_partitions branch from b970a4c to 1da7e03 Compare June 11, 2018 06:47
@kunalgosar kunalgosar changed the title [WIP] [DataFrame] Handling zero length partitions [DataFrame] Handling zero length partitions Jun 11, 2018
@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6001/
Test FAILed.

empty_cols_mask = self._col_metadata._lengths > 0
if any(empty_cols_mask):
self._col_metadata._lengths \
= self._col_metadata._lengths[empty_cols_mask]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting is a bit weird. Move = to previous line to be consistent with line 200?

@@ -208,6 +209,9 @@ def apply_helper(df):
num_return_vals=len(new_df._block_partitions))
for block in new_df._block_partitions.T]).T
new_df.index = self._index
new_df._row_metadata = \
_IndexMetadata(new_df._block_partitions[:, 0],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concat from earlier constructs a new DataFrame with a new _IndexMetadata which gets overwritten here. Ideally, we could modify concat to pass this _indexMetadata to the DataFrame constructor, but I'm not sure if this is possible to do elegantly with the current code base.

No need to make a change to the PR, but would like to point this out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue here is that the reindex that is applied on columns changes the partition structure and the old _IndexMetadata created on concat is invalid. I agree that duplicate work is being done here, but I'm not sure there is a nice solution.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning to rewrite the majority of this class sometime this week, both to make it more performant and more clean.

@kunalgosar kunalgosar force-pushed the zero_length_partitions branch from 1da7e03 to e30c4af Compare June 11, 2018 21:34
@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6009/
Test FAILed.

@kunalgosar
Copy link
Contributor Author

Jenkins, retest this please

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6020/
Test FAILed.

@kunalgosar
Copy link
Contributor Author

Jenkins, retest this please

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6030/
Test FAILed.

@kunalgosar
Copy link
Contributor Author

Jenkins, retest this please

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6034/
Test FAILed.

@kunalgosar
Copy link
Contributor Author

Jenkins, retest this please

@devin-petersohn
Copy link
Member

Jenkins, retest this please.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6098/
Test PASSed.

Copy link
Member

@devin-petersohn devin-petersohn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comment. Thanks for the fix!

@@ -208,6 +209,9 @@ def apply_helper(df):
num_return_vals=len(new_df._block_partitions))
for block in new_df._block_partitions.T]).T
new_df.index = self._index
new_df._row_metadata = \
_IndexMetadata(new_df._block_partitions[:, 0],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning to rewrite the majority of this class sometime this week, both to make it more performant and more clean.

all_empty = ray.get(_map_partitions(
lambda df: df.empty, self._row_partitions))
return False not in all_empty
return self._row_metadata._empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more efficient approach would be return len(self.columns) == 0 or len(self.rows) == 0. This would also simplify a lot of the code in _IndexMetadata.

@@ -48,6 +49,9 @@ def __init__(self, dfs=None, index=None, axis=0, lengths_oid=None,
else:
lengths_oid = _build_col_widths.remote(dfs)
coord_df_oid = _build_coord_df.remote(lengths_oid, index)
self._empty = _check_empty.remote(dfs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are doing this twice, once for each of column_metadata and row_metadata.

@ray.remote
def _check_empty(dfs):
"""Check if all partitions are empty"""
return all(ray.get([_deploy_func.remote(_get_empty, d) for d in dfs]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this (or _get_empty if you make some of the other changes, but a more efficient way to implement something like this is:

return next((False for d in dfs if not ray.get(_deploy_func.remote(_get_empty, d))), True)

@kunalgosar kunalgosar force-pushed the zero_length_partitions branch from e30c4af to 5ccca59 Compare June 20, 2018 20:20
@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6159/
Test FAILed.

@kunalgosar
Copy link
Contributor Author

Jenkins, retest this please.

1 similar comment
@devin-petersohn
Copy link
Member

Jenkins, retest this please.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6161/
Test PASSed.

@devin-petersohn devin-petersohn merged commit 6bf48f4 into ray-project:master Jun 20, 2018
royf added a commit to royf/ray that referenced this pull request Jun 22, 2018
* 'master' of https://github.com/ray-project/ray: (157 commits)
  Fix build failure while using make -j1. Issue 2257 (ray-project#2279)
  Cast locator with index type (ray-project#2274)
  fixing zero length partitions (ray-project#2237)
  Make actor handles work in Python mode. (ray-project#2283)
  [xray] Add error table and push error messages to driver through node manager. (ray-project#2256)
  addressing comments (ray-project#2210)
  Re-enable some actor tests. (ray-project#2276)
  Experimental: enable automatic GCS flushing with configurable policy. (ray-project#2266)
  [xray] Sets good object manager defaults. (ray-project#2255)
  [tune] Update Trainable doc to expose interface (ray-project#2272)
  [rllib] Add a simple REST policy server and client example (ray-project#2232)
  [asv] Pushing to s3 (ray-project#2246)
  [rllib] Remove need to pass around registry (ray-project#2250)
  Support multiple availability zones in AWS (fix ray-project#2177) (ray-project#2254)
  [rllib] Add squash_to_range model option (ray-project#2239)
  Mitigate randomly building failure: adding gen_local_scheduler_fbs to raylet lib. (ray-project#2271)
  [rllib] Refactor Multi-GPU for PPO (ray-project#1646)
  [rllib] Envs for vectorized execution, async execution, and policy serving (ray-project#2170)
  [Dataframe] Change pandas and ray.dataframe imports (ray-project#1942)
  [Java] Replace binary rewrite with Remote Lambda Cache (SerdeLambda) (ray-project#2245)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants