Skip to content

[DataFrame] Improve performance of iteration methods #2026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 17, 2018

Conversation

kunalgosar
Copy link
Contributor

@kunalgosar kunalgosar commented May 10, 2018

What do these changes do?

Make DataFrame iteration methods much more performant. Uses generators to iterate through row/column partitions and only fetches data as needed.

Performance Analysis:

New Performance:

In [6]: df = pd.DataFrame(np.random.randint(0,100,size=(100000, 200)))

In [7]: %time x = list(df.iterrows())
CPU times: user 6.68 s, sys: 106 ms, total: 6.79 s
Wall time: 7.35 s

In [8]: %time x = list(df.items())
CPU times: user 162 ms, sys: 40 ms, total: 202 ms
Wall time: 511 ms

In [9]: %time x = list(df.itertuples())
CPU times: user 1.69 s, sys: 167 ms, total: 1.86 s
Wall time: 2.18 s

Old Performance:

In [5]: df = pd.DataFrame(np.random.randint(0,100,size=(100000, 200)))

In [6]: %time x = list(df.iterrows())
CPU times: user 7min 16s, sys: 5.28 s, total: 7min 22s
Wall time: 7min 57s

In [7]: %time x = list(df.items())
CPU times: user 1.18 s, sys: 412 ms, total: 1.59 s
Wall time: 3.13 s

In [8]: %time x = list(df.itertuples())
CPU times: user 5.45 s, sys: 515 ms, total: 5.97 s
Wall time: 9.81 s

Standard Pandas Performance:

In [4]: df = pd.DataFrame(np.random.randint(0,100,size=(100000, 200)))

In [5]: %time x = list(df.iterrows())
CPU times: user 6.44 s, sys: 107 ms, total: 6.54 s
Wall time: 6.58 s

In [6]: %time x = list(df.items())
CPU times: user 128 ms, sys: 29.9 ms, total: 158 ms
Wall time: 163 ms

In [7]: %time x = list(df.itertuples())
CPU times: user 2.63 s, sys: 176 ms, total: 2.81 s
Wall time: 2.9 s

Related issue number

#2025

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5302/
Test PASSed.

Copy link
Member

@devin-petersohn devin-petersohn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments, looks pretty good!

@@ -0,0 +1,33 @@
class PartitionIterator(object):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should extend iterator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

n = next(self.iter_cache)
return n
except StopIteration:
self.curr_partition += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do without the index reference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The index reference here is needed, as it is used to get the outer index or columns for that partition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you handle the increment outside of this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of the Iterator is to iterate through each partition, it would be possible to define a function to increment the partitions outside of this class, but that would make the code much more complex.

Right now, it checks if there are any items remaining in the current partition, and if not, increments curr_partition and gets the next one.


return zip(self.index, series)
for v in partition_iterator:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you not just return partition_iterator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, I do this to ensure that the return type of the function is a generator, which is concordant with pandas. Let me know if you think I should still change it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds great, thanks for clarifying.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5318/
Test PASSed.

Copy link
Member

@devin-petersohn devin-petersohn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple of other comments. Thanks!

@@ -0,0 +1,36 @@
import collections
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: from collections import Iterator

import collections


class PartitionIterator(collections.Iterator):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: class PartitionIterator(Iterator):

except AttributeError: # Tuple not namedtuple
row_tuple = (idx,) + row_tuple[1:]
return row_tuple
def itertuples_helper(part, i):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking something along the lines of this to resolve the index comment below:

index_iter = (obj.index for obj in self._row_metadata.partition_series)

def itertuples_helper(part):
    df = ray.get(part)
    df.columns = self.columns
    df.index = next(index_iter)
    return df.itertuples(index=index, name=name)

Something like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this is much better. I've updated the PR in this way. Thanks!


return zip(self.index, series)
for v in partition_iterator:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds great, thanks for clarifying.

Copy link
Member

@devin-petersohn devin-petersohn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, one minor nit.

series.index = self.columns
series.name = list(self.index)[i]
return series
index_iter = iter([self._row_metadata.partition_series(i).index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer index_iter = (self._row_metadata.partition_series(i).index for i in range(len(self._row_partitions)))

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5436/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5443/
Test PASSed.

@devin-petersohn devin-petersohn merged commit afbb260 into ray-project:master May 17, 2018
@devin-petersohn
Copy link
Member

Merged, thanks @kunalgosar!

alok added a commit to alok/ray that referenced this pull request May 18, 2018
* master: (22 commits)
  [xray] Fix bug in updating actor execution dependencies (ray-project#2064)
  [DataFrame] Refactor __delitem__ (ray-project#2080)
  [xray] Better error messaging when pulling from self. (ray-project#2068)
  Use source code in hash where possible (fix ray-project#2089) (ray-project#2090)
  Functions for flushing done tasks and evicted objects. (ray-project#2033)
  Fix compilation error for RAY_USE_NEW_GCS with latest clang. (ray-project#2086)
  [xray] Corrects Error Handling During Push and Pull. (ray-project#2059)
  [xray] Sophisticated task dependency management (ray-project#2035)
  Support calling positional arguments by keyword (fix ray-project#998) (ray-project#2081)
  [DataFrame] Improve performance of iteration methods (ray-project#2026)
  [DataFrame] Implement to_csv (ray-project#2014)
  [xray] Lineage cache only requests notifications about remote parent tasks (ray-project#2066)
  [rllib] Add magic methods for rollouts (ray-project#2024)
  [DataFrame] Allows DataFrame constructor to take in another DataFrame (ray-project#2072)
  Pin Pandas version for Travis to 0.22 (ray-project#2075)
  Fix python linting (ray-project#2076)
  [xray] Fix GCS table prefixes (ray-project#2065)
  Some tests for _submit API. (ray-project#2062)
  [rllib] Queue lib for python 2.7 (ray-project#2057)
  [autoscaler] Remove faulty assert that breaks during downscaling, pull configs from env (ray-project#2006)
  ...
alok added a commit to alok/ray that referenced this pull request May 21, 2018
* master: (24 commits)
  Performance fix (ray-project#2110)
  Use flake8-comprehensions (ray-project#1976)
  Improve error message printing and suppression. (ray-project#2104)
  [rllib] [doc] Broken link in ddpg doc
  YAPF, take 3 (ray-project#2098)
  [rllib] rename async -> _async (ray-project#2097)
  fix unused lambda capture (ray-project#2102)
  [xray] Use pubsub instead of timeout for ObjectManager Pull. (ray-project#2079)
  [DataFrame] Update _inherit_docstrings (ray-project#2085)
  [JavaWorker] Changes to the build system for support java worker (ray-project#2092)
  [xray] Fix bug in updating actor execution dependencies (ray-project#2064)
  [DataFrame] Refactor __delitem__ (ray-project#2080)
  [xray] Better error messaging when pulling from self. (ray-project#2068)
  Use source code in hash where possible (fix ray-project#2089) (ray-project#2090)
  Functions for flushing done tasks and evicted objects. (ray-project#2033)
  Fix compilation error for RAY_USE_NEW_GCS with latest clang. (ray-project#2086)
  [xray] Corrects Error Handling During Push and Pull. (ray-project#2059)
  [xray] Sophisticated task dependency management (ray-project#2035)
  Support calling positional arguments by keyword (fix ray-project#998) (ray-project#2081)
  [DataFrame] Improve performance of iteration methods (ray-project#2026)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants