-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DataFrame] Handling zero length partitions #2210
[DataFrame] Handling zero length partitions #2210
Conversation
Test FAILed. |
b970a4c
to
1da7e03
Compare
Test FAILed. |
python/ray/dataframe/dataframe.py
Outdated
empty_cols_mask = self._col_metadata._lengths > 0 | ||
if any(empty_cols_mask): | ||
self._col_metadata._lengths \ | ||
= self._col_metadata._lengths[empty_cols_mask] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting is a bit weird. Move =
to previous line to be consistent with line 200?
python/ray/dataframe/groupby.py
Outdated
@@ -208,6 +209,9 @@ def apply_helper(df): | |||
num_return_vals=len(new_df._block_partitions)) | |||
for block in new_df._block_partitions.T]).T | |||
new_df.index = self._index | |||
new_df._row_metadata = \ | |||
_IndexMetadata(new_df._block_partitions[:, 0], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concat
from earlier constructs a new DataFrame
with a new _IndexMetadata
which gets overwritten here. Ideally, we could modify concat to pass this _indexMetadata
to the DataFrame
constructor, but I'm not sure if this is possible to do elegantly with the current code base.
No need to make a change to the PR, but would like to point this out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue here is that the reindex that is applied on columns changes the partition structure and the old _IndexMetadata
created on concat
is invalid. I agree that duplicate work is being done here, but I'm not sure there is a nice solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm planning to rewrite the majority of this class sometime this week, both to make it more performant and more clean.
1da7e03
to
e30c4af
Compare
Test FAILed. |
Jenkins, retest this please |
Test FAILed. |
Jenkins, retest this please |
Test FAILed. |
Jenkins, retest this please |
Test FAILed. |
Jenkins, retest this please |
Jenkins, retest this please. |
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comment. Thanks for the fix!
python/ray/dataframe/groupby.py
Outdated
@@ -208,6 +209,9 @@ def apply_helper(df): | |||
num_return_vals=len(new_df._block_partitions)) | |||
for block in new_df._block_partitions.T]).T | |||
new_df.index = self._index | |||
new_df._row_metadata = \ | |||
_IndexMetadata(new_df._block_partitions[:, 0], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm planning to rewrite the majority of this class sometime this week, both to make it more performant and more clean.
python/ray/dataframe/dataframe.py
Outdated
all_empty = ray.get(_map_partitions( | ||
lambda df: df.empty, self._row_partitions)) | ||
return False not in all_empty | ||
return self._row_metadata._empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more efficient approach would be return len(self.columns) == 0 or len(self.rows) == 0
. This would also simplify a lot of the code in _IndexMetadata
.
@@ -48,6 +49,9 @@ def __init__(self, dfs=None, index=None, axis=0, lengths_oid=None, | |||
else: | |||
lengths_oid = _build_col_widths.remote(dfs) | |||
coord_df_oid = _build_coord_df.remote(lengths_oid, index) | |||
self._empty = _check_empty.remote(dfs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are doing this twice, once for each of column_metadata
and row_metadata
.
python/ray/dataframe/utils.py
Outdated
@ray.remote | ||
def _check_empty(dfs): | ||
"""Check if all partitions are empty""" | ||
return all(ray.get([_deploy_func.remote(_get_empty, d) for d in dfs])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need this (or _get_empty
if you make some of the other changes, but a more efficient way to implement something like this is:
return next((False for d in dfs if not ray.get(_deploy_func.remote(_get_empty, d))), True)
e30c4af
to
5ccca59
Compare
Test FAILed. |
Jenkins, retest this please. |
1 similar comment
Jenkins, retest this please. |
Test PASSed. |
* 'master' of https://github.com/ray-project/ray: (157 commits) Fix build failure while using make -j1. Issue 2257 (ray-project#2279) Cast locator with index type (ray-project#2274) fixing zero length partitions (ray-project#2237) Make actor handles work in Python mode. (ray-project#2283) [xray] Add error table and push error messages to driver through node manager. (ray-project#2256) addressing comments (ray-project#2210) Re-enable some actor tests. (ray-project#2276) Experimental: enable automatic GCS flushing with configurable policy. (ray-project#2266) [xray] Sets good object manager defaults. (ray-project#2255) [tune] Update Trainable doc to expose interface (ray-project#2272) [rllib] Add a simple REST policy server and client example (ray-project#2232) [asv] Pushing to s3 (ray-project#2246) [rllib] Remove need to pass around registry (ray-project#2250) Support multiple availability zones in AWS (fix ray-project#2177) (ray-project#2254) [rllib] Add squash_to_range model option (ray-project#2239) Mitigate randomly building failure: adding gen_local_scheduler_fbs to raylet lib. (ray-project#2271) [rllib] Refactor Multi-GPU for PPO (ray-project#1646) [rllib] Envs for vectorized execution, async execution, and policy serving (ray-project#2170) [Dataframe] Change pandas and ray.dataframe imports (ray-project#1942) [Java] Replace binary rewrite with Remote Lambda Cache (SerdeLambda) (ray-project#2245) ...
This filters out zero length partitions on getting
col_partitions
orrow_partitions
.