Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DataFrame] Fix dtypes #1930

Merged
merged 2 commits into from
May 3, 2018

Conversation

pschafhalter
Copy link
Contributor

What do these changes do?

  • Calculate dtypes by merging dtypes from all partitions.
  • Unify dtypes across partitions.
  • Cache dtypes.

Possible Issues

  • There are many operations that invalidate the dtypes cache. There's a high chance of edge-case bugs if all operations that modify dataframe data don't invalidate the dtypes cache.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5020/
Test PASSed.

Copy link
Member

@devin-petersohn devin-petersohn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of complicated logic here, but I think all that is really needed to make sure dtypes are all the same is the following:

1.) Materialize column partitions
2.) Split up column partitions into blocks again
3.) Reassign blocks to new blocks
4.) Dtypes will now be unified because the concat in column partitions will do it for you.

@@ -282,3 +282,60 @@ def decorator(cls):
return cls

return decorator


def _map(func, *objects):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this instead of _map_partitions()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it would go well with the _reduce() implementation for generic objects in the object store. I can remove it and change back to _map_partitions() instead.



@ray.remote
def _reduce(func, *sequence):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you're doing here, but there might be a cheaper/simpler way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure (at least for getting dtypes) whether this needs a tree-reduce implementation. Regardless, I think you can make it simpler by performing it iteratively, something like

def _reduce(func, oids):
    while len(oids) > 1:
        oids = [_remote_reduce_helper.remote(func, item1, item2) for item1, item2 in pairwise(oids)]
    return oids[0]

You can use _submit to perform on more than just a pair at a time, but this way you construct the entire computation graph locally.

@pschafhalter
Copy link
Contributor Author

@devin-petersohn Thanks for the feedback!

That would work as well, but isn't materializing column partitions and splitting them up back into blocks be more expensive? From my understanding, this would result in 2 expensive operations on partitions:

  1. Convert block partitions to column partitions.
  2. Convert column partitions back to block partitions.

Whereas this implementation only involves 1 expensive operation on partitions:

  1. Set type of block partitions.

@devin-petersohn
Copy link
Member

@pschafhalter, I think that materializing the columns would be faster because it's not an iterative computation. You can create the block partitions and then return to blocks in the same remote task:

def correct_dtypes(*column_blocks):
    columns = pandas.concat(column_blocks, axis=0, copy=False)
    blocks = # create blocks from columns

@pschafhalter
Copy link
Contributor Author

@devin-petersohn that makes sense. In order to get an object ID for each corrected block, I need to specify num_return_types in correct_dtypes, right?

@devin-petersohn
Copy link
Member

devin-petersohn commented Apr 23, 2018

@pschafhalter Yes, you can do that with correct_dtypes._submit(args=(*args), num_return_vals=n)

Object returned by cumulatively applying func.
"""
size = len(sequence)
if size == 1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally most tree-reduce implementations don't reduce down to one value due to overhead, so it's probably more worth it to place a larger base case size and enforce that the reduction function is compatible with python's builtin reduce.

if size == 1:
return sequence[0]
left = ray.get(_reduce.remote(func, *sequence[:size // 2]))
right = ray.get(_reduce.remote(func, *sequence[size // 2:]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should start both remote tasks first, then perform the get afterwards.

@pschafhalter
Copy link
Contributor Author

@devin-petersohn @Veryku Sorry for sitting on this for a while. The latest commit should implement the suggested changes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5081/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5082/
Test PASSed.

@kunalgosar
Copy link
Contributor

Brief performance test:

This was done on a file with 79 columns and of size 1.4mb:

In [17]: df = pd.read_csv("/Users/kunalgosar/Downloads/fundamentals.csv")

In [18]: %time df.dtypes
CPU times: user 22.7 ms, sys: 2.97 ms, total: 25.6 ms
Wall time: 103 ms

This was done on a file with 31 columns and of size 150mb:

In [19]: df = pd.read_csv("/Users/kunalgosar/Downloads/creditcard.csv")

In [20]: %time df.dtypes
CPU times: user 21.8 ms, sys: 2.98 ms, total: 24.8 ms
Wall time: 1.47 s

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5085/
Test PASSed.

self._row_partitions[0]))
result.index = self.columns
return result
# Cache dtypes if necessary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be made as an OID initially, and it is got and cached when called for. This is similar to index.

@@ -71,6 +72,7 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
Metadata for the new dataframe's columns
"""
self._row_metadata = self._col_metadata = None
self._dtypes = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the code below is made to store a remote object, make a dummy call to it here to start the asynchronous call, but do not get the object yet. That is a blocking step and can be done as needed.

@pschafhalter
Copy link
Contributor Author

@kunalgosar most of that cost in the performance test is from materializing the DataFrame through read_csv:

In [3]: df = pd.read_csv("/home/peter/Downloads/creditcard.csv")

In [4]: df_repr = repr(df)

In [5]: df._dtypes # Not cached yet

In [6]: %time df.dtypes
CPU times: user 23.7 ms, sys: 2.23 ms, total: 25.9 ms
Wall time: 231 ms

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5121/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5134/
Test FAILed.

Copy link
Contributor

@p-yang p-yang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some functions should have well-known output dtypes (isna comes to mind again). Would it be possible to pass dtypes_cache for those functions?

@@ -112,6 +118,10 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
elif col_partitions is not None:
axis = 1
partitions = col_partitions
# All partitions will already have correct dtypes
self._dtypes_cache = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it more performant to have a special case here for col_partitions? Otherwise I'd imagine this is already handled by the added code below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. Otherwise, we create block partitions from column partitions, materialize column partitions again to correct dtypes, and split that back into block partitions.

@pschafhalter
Copy link
Contributor Author

@Veryku agreed on passing dtypes_cache for function with well-known output types. I discussed this with @devin-petersohn, and created issue #1987. I'll add the optimizations in a subsequent PR.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5152/
Test PASSed.

bug fixes

Unify dtypes on DataFrame creation

Formatting and comments

Cache dtypes

Fix bug in _merge_dtypes

Fix bug

Changed caching logic

Fix dtypes issue in read_csv

Invalidate dtypes cache when inserting column

Simplify unifying dtypes and improve caching

Fix typo

Better caching of dtypes

Fix merge conflicts
@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/5154/
Test PASSed.

@devin-petersohn
Copy link
Member

Passes private-travis. Thanks @pschafhalter!

@devin-petersohn devin-petersohn merged commit d67b786 into ray-project:master May 3, 2018
alok added a commit to alok/ray that referenced this pull request May 6, 2018
* magic-methods:
  fmt
  Fix IndentationError
  Write magic methods for SampleBatch/PartialRollout
  Clean up syntax for supported Python versions. (ray-project#1963)
  [DataFrame] Implements mode, to_datetime, and get_dummies (ray-project#1956)
  [DataFrame] Fix dtypes (ray-project#1930)
  keep_dims -> keepdims (ray-project#1980)
  add pthread linking (ray-project#1986)
  [DataFrame] Add layer of abstraction to allow OID instantiation (ray-project#1984)
  [DataFrame] Fix blocking issue on _IndexMetadata passing (ray-project#1965)
alok added a commit to alok/ray that referenced this pull request May 8, 2018
* master: (21 commits)
  Expand local_dir in Trial init (ray-project#2013)
  Fixing ascii error for Python2 (ray-project#2009)
  [DataFrame] Implements df.update (ray-project#1997)
  [DataFrame] Implements df.as_matrix (ray-project#2001)
  [DataFrame] Implement quantile (ray-project#1992)
  [DataFrame] Impement sort_values and sort_index (ray-project#1977)
  [DataFrame] Implement rank (ray-project#1991)
  [DataFrame] Implemented prod, product, added test suite (ray-project#1994)
  [DataFrame] Implemented __setitem__, select_dtypes, and astype (ray-project#1941)
  [DataFrame] Implement diff (ray-project#1996)
  [DataFrame] Implemented nunique, skew (ray-project#1995)
  [DataFrame] Implements filter and dropna (ray-project#1959)
  [DataFrame] Implements df.pipe (ray-project#1999)
  [DataFrame] Apply() for Lists and Dicts (ray-project#1973)
  Clean up syntax for supported Python versions. (ray-project#1963)
  [DataFrame] Implements mode, to_datetime, and get_dummies (ray-project#1956)
  [DataFrame] Fix dtypes (ray-project#1930)
  keep_dims -> keepdims (ray-project#1980)
  add pthread linking (ray-project#1986)
  [DataFrame] Add layer of abstraction to allow OID instantiation (ray-project#1984)
  ...
alok added a commit to alok/ray that referenced this pull request May 9, 2018
* master: (25 commits)
  [DataFrame] Add direct pandas imports for MVP (ray-project#1960)
  Make ActorHandles pickleable, also make proper ActorHandle and ActorC… (ray-project#2007)
  Expand local_dir in Trial init (ray-project#2013)
  Fixing ascii error for Python2 (ray-project#2009)
  [DataFrame] Implements df.update (ray-project#1997)
  [DataFrame] Implements df.as_matrix (ray-project#2001)
  [DataFrame] Implement quantile (ray-project#1992)
  [DataFrame] Impement sort_values and sort_index (ray-project#1977)
  [DataFrame] Implement rank (ray-project#1991)
  [DataFrame] Implemented prod, product, added test suite (ray-project#1994)
  [DataFrame] Implemented __setitem__, select_dtypes, and astype (ray-project#1941)
  [DataFrame] Implement diff (ray-project#1996)
  [DataFrame] Implemented nunique, skew (ray-project#1995)
  [DataFrame] Implements filter and dropna (ray-project#1959)
  [DataFrame] Implements df.pipe (ray-project#1999)
  [DataFrame] Apply() for Lists and Dicts (ray-project#1973)
  Clean up syntax for supported Python versions. (ray-project#1963)
  [DataFrame] Implements mode, to_datetime, and get_dummies (ray-project#1956)
  [DataFrame] Fix dtypes (ray-project#1930)
  keep_dims -> keepdims (ray-project#1980)
  ...
@pschafhalter pschafhalter deleted the df-fix-dtypes branch May 22, 2018 17:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants