-
Notifications
You must be signed in to change notification settings - Fork 6.5k
[DataFrame] Improve performance of iteration methods #2026
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments, looks pretty good!
python/ray/dataframe/iterator.py
Outdated
@@ -0,0 +1,33 @@ | |||
class PartitionIterator(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should extend iterator
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
python/ray/dataframe/iterator.py
Outdated
n = next(self.iter_cache) | ||
return n | ||
except StopIteration: | ||
self.curr_partition += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do without the index reference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The index reference here is needed, as it is used to get the outer index
or columns
for that partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you handle the increment outside of this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of the Iterator
is to iterate through each partition, it would be possible to define a function to increment the partitions
outside of this class, but that would make the code much more complex.
Right now, it checks if there are any items remaining in the current partition, and if not, increments curr_partition
and gets the next one.
python/ray/dataframe/dataframe.py
Outdated
|
||
return zip(self.index, series) | ||
for v in partition_iterator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you not just return partition_iterator
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, I do this to ensure that the return type of the function is a generator
, which is concordant with pandas. Let me know if you think I should still change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds great, thanks for clarifying.
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a couple of other comments. Thanks!
python/ray/dataframe/iterator.py
Outdated
@@ -0,0 +1,36 @@ | |||
import collections |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: from collections import Iterator
python/ray/dataframe/iterator.py
Outdated
import collections | ||
|
||
|
||
class PartitionIterator(collections.Iterator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: class PartitionIterator(Iterator):
python/ray/dataframe/dataframe.py
Outdated
except AttributeError: # Tuple not namedtuple | ||
row_tuple = (idx,) + row_tuple[1:] | ||
return row_tuple | ||
def itertuples_helper(part, i): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking something along the lines of this to resolve the index
comment below:
index_iter = (obj.index for obj in self._row_metadata.partition_series)
def itertuples_helper(part):
df = ray.get(part)
df.columns = self.columns
df.index = next(index_iter)
return df.itertuples(index=index, name=name)
Something like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, this is much better. I've updated the PR in this way. Thanks!
python/ray/dataframe/dataframe.py
Outdated
|
||
return zip(self.index, series) | ||
for v in partition_iterator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds great, thanks for clarifying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, one minor nit.
python/ray/dataframe/dataframe.py
Outdated
series.index = self.columns | ||
series.name = list(self.index)[i] | ||
return series | ||
index_iter = iter([self._row_metadata.partition_series(i).index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer index_iter = (self._row_metadata.partition_series(i).index for i in range(len(self._row_partitions)))
Test PASSed. |
Test PASSed. |
Merged, thanks @kunalgosar! |
* master: (22 commits) [xray] Fix bug in updating actor execution dependencies (ray-project#2064) [DataFrame] Refactor __delitem__ (ray-project#2080) [xray] Better error messaging when pulling from self. (ray-project#2068) Use source code in hash where possible (fix ray-project#2089) (ray-project#2090) Functions for flushing done tasks and evicted objects. (ray-project#2033) Fix compilation error for RAY_USE_NEW_GCS with latest clang. (ray-project#2086) [xray] Corrects Error Handling During Push and Pull. (ray-project#2059) [xray] Sophisticated task dependency management (ray-project#2035) Support calling positional arguments by keyword (fix ray-project#998) (ray-project#2081) [DataFrame] Improve performance of iteration methods (ray-project#2026) [DataFrame] Implement to_csv (ray-project#2014) [xray] Lineage cache only requests notifications about remote parent tasks (ray-project#2066) [rllib] Add magic methods for rollouts (ray-project#2024) [DataFrame] Allows DataFrame constructor to take in another DataFrame (ray-project#2072) Pin Pandas version for Travis to 0.22 (ray-project#2075) Fix python linting (ray-project#2076) [xray] Fix GCS table prefixes (ray-project#2065) Some tests for _submit API. (ray-project#2062) [rllib] Queue lib for python 2.7 (ray-project#2057) [autoscaler] Remove faulty assert that breaks during downscaling, pull configs from env (ray-project#2006) ...
* master: (24 commits) Performance fix (ray-project#2110) Use flake8-comprehensions (ray-project#1976) Improve error message printing and suppression. (ray-project#2104) [rllib] [doc] Broken link in ddpg doc YAPF, take 3 (ray-project#2098) [rllib] rename async -> _async (ray-project#2097) fix unused lambda capture (ray-project#2102) [xray] Use pubsub instead of timeout for ObjectManager Pull. (ray-project#2079) [DataFrame] Update _inherit_docstrings (ray-project#2085) [JavaWorker] Changes to the build system for support java worker (ray-project#2092) [xray] Fix bug in updating actor execution dependencies (ray-project#2064) [DataFrame] Refactor __delitem__ (ray-project#2080) [xray] Better error messaging when pulling from self. (ray-project#2068) Use source code in hash where possible (fix ray-project#2089) (ray-project#2090) Functions for flushing done tasks and evicted objects. (ray-project#2033) Fix compilation error for RAY_USE_NEW_GCS with latest clang. (ray-project#2086) [xray] Corrects Error Handling During Push and Pull. (ray-project#2059) [xray] Sophisticated task dependency management (ray-project#2035) Support calling positional arguments by keyword (fix ray-project#998) (ray-project#2081) [DataFrame] Improve performance of iteration methods (ray-project#2026) ...
What do these changes do?
Make DataFrame iteration methods much more performant. Uses generators to iterate through row/column partitions and only fetches data as needed.
Performance Analysis:
New Performance:
Old Performance:
Standard Pandas Performance:
Related issue number
#2025