-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DataFrame] Fix dtypes #1930
[DataFrame] Fix dtypes #1930
Conversation
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a lot of complicated logic here, but I think all that is really needed to make sure dtypes
are all the same is the following:
1.) Materialize column partitions
2.) Split up column partitions into blocks again
3.) Reassign blocks to new blocks
4.) Dtypes will now be unified because the concat
in column partitions will do it for you.
python/ray/dataframe/utils.py
Outdated
@@ -282,3 +282,60 @@ def decorator(cls): | |||
return cls | |||
|
|||
return decorator | |||
|
|||
|
|||
def _map(func, *objects): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this instead of _map_partitions()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it would go well with the _reduce()
implementation for generic objects in the object store. I can remove it and change back to _map_partitions()
instead.
python/ray/dataframe/utils.py
Outdated
|
||
|
||
@ray.remote | ||
def _reduce(func, *sequence): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you're doing here, but there might be a cheaper/simpler way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure (at least for getting dtypes) whether this needs a tree-reduce implementation. Regardless, I think you can make it simpler by performing it iteratively, something like
def _reduce(func, oids):
while len(oids) > 1:
oids = [_remote_reduce_helper.remote(func, item1, item2) for item1, item2 in pairwise(oids)]
return oids[0]
You can use _submit
to perform on more than just a pair at a time, but this way you construct the entire computation graph locally.
@devin-petersohn Thanks for the feedback! That would work as well, but isn't materializing column partitions and splitting them up back into blocks be more expensive? From my understanding, this would result in 2 expensive operations on partitions:
Whereas this implementation only involves 1 expensive operation on partitions:
|
@pschafhalter, I think that materializing the columns would be faster because it's not an iterative computation. You can create the block partitions and then return to blocks in the same remote task: def correct_dtypes(*column_blocks):
columns = pandas.concat(column_blocks, axis=0, copy=False)
blocks = # create blocks from columns |
@devin-petersohn that makes sense. In order to get an object ID for each corrected block, I need to specify |
@pschafhalter Yes, you can do that with |
python/ray/dataframe/utils.py
Outdated
Object returned by cumulatively applying func. | ||
""" | ||
size = len(sequence) | ||
if size == 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally most tree-reduce implementations don't reduce down to one value due to overhead, so it's probably more worth it to place a larger base case size and enforce that the reduction function is compatible with python's builtin reduce
.
python/ray/dataframe/utils.py
Outdated
if size == 1: | ||
return sequence[0] | ||
left = ray.get(_reduce.remote(func, *sequence[:size // 2])) | ||
right = ray.get(_reduce.remote(func, *sequence[size // 2:])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should start both remote tasks first, then perform the get afterwards.
@devin-petersohn @Veryku Sorry for sitting on this for a while. The latest commit should implement the suggested changes. |
237e963
to
a37a0e4
Compare
Test PASSed. |
Test PASSed. |
Brief performance test: This was done on a file with 79 columns and of size 1.4mb:
This was done on a file with 31 columns and of size 150mb:
|
Test PASSed. |
python/ray/dataframe/dataframe.py
Outdated
self._row_partitions[0])) | ||
result.index = self.columns | ||
return result | ||
# Cache dtypes if necessary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be made as an OID initially, and it is got and cached when called for. This is similar to index.
python/ray/dataframe/dataframe.py
Outdated
@@ -71,6 +72,7 @@ def __init__(self, data=None, index=None, columns=None, dtype=None, | |||
Metadata for the new dataframe's columns | |||
""" | |||
self._row_metadata = self._col_metadata = None | |||
self._dtypes = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once the code below is made to store a remote object, make a dummy call to it here to start the asynchronous call, but do not get the object yet. That is a blocking step and can be done as needed.
@kunalgosar most of that cost in the performance test is from materializing the DataFrame through
|
Test PASSed. |
a67a931
to
7f6f703
Compare
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some functions should have well-known output dtypes (isna
comes to mind again). Would it be possible to pass dtypes_cache
for those functions?
python/ray/dataframe/dataframe.py
Outdated
@@ -112,6 +118,10 @@ def __init__(self, data=None, index=None, columns=None, dtype=None, | |||
elif col_partitions is not None: | |||
axis = 1 | |||
partitions = col_partitions | |||
# All partitions will already have correct dtypes | |||
self._dtypes_cache = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it more performant to have a special case here for col_partitions
? Otherwise I'd imagine this is already handled by the added code below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. Otherwise, we create block partitions from column partitions, materialize column partitions again to correct dtypes, and split that back into block partitions.
@Veryku agreed on passing |
7f6f703
to
27f3929
Compare
Test PASSed. |
bug fixes Unify dtypes on DataFrame creation Formatting and comments Cache dtypes Fix bug in _merge_dtypes Fix bug Changed caching logic Fix dtypes issue in read_csv Invalidate dtypes cache when inserting column Simplify unifying dtypes and improve caching Fix typo Better caching of dtypes Fix merge conflicts
27f3929
to
e715e12
Compare
Test PASSed. |
Passes private-travis. Thanks @pschafhalter! |
* magic-methods: fmt Fix IndentationError Write magic methods for SampleBatch/PartialRollout Clean up syntax for supported Python versions. (ray-project#1963) [DataFrame] Implements mode, to_datetime, and get_dummies (ray-project#1956) [DataFrame] Fix dtypes (ray-project#1930) keep_dims -> keepdims (ray-project#1980) add pthread linking (ray-project#1986) [DataFrame] Add layer of abstraction to allow OID instantiation (ray-project#1984) [DataFrame] Fix blocking issue on _IndexMetadata passing (ray-project#1965)
* master: (21 commits) Expand local_dir in Trial init (ray-project#2013) Fixing ascii error for Python2 (ray-project#2009) [DataFrame] Implements df.update (ray-project#1997) [DataFrame] Implements df.as_matrix (ray-project#2001) [DataFrame] Implement quantile (ray-project#1992) [DataFrame] Impement sort_values and sort_index (ray-project#1977) [DataFrame] Implement rank (ray-project#1991) [DataFrame] Implemented prod, product, added test suite (ray-project#1994) [DataFrame] Implemented __setitem__, select_dtypes, and astype (ray-project#1941) [DataFrame] Implement diff (ray-project#1996) [DataFrame] Implemented nunique, skew (ray-project#1995) [DataFrame] Implements filter and dropna (ray-project#1959) [DataFrame] Implements df.pipe (ray-project#1999) [DataFrame] Apply() for Lists and Dicts (ray-project#1973) Clean up syntax for supported Python versions. (ray-project#1963) [DataFrame] Implements mode, to_datetime, and get_dummies (ray-project#1956) [DataFrame] Fix dtypes (ray-project#1930) keep_dims -> keepdims (ray-project#1980) add pthread linking (ray-project#1986) [DataFrame] Add layer of abstraction to allow OID instantiation (ray-project#1984) ...
* master: (25 commits) [DataFrame] Add direct pandas imports for MVP (ray-project#1960) Make ActorHandles pickleable, also make proper ActorHandle and ActorC… (ray-project#2007) Expand local_dir in Trial init (ray-project#2013) Fixing ascii error for Python2 (ray-project#2009) [DataFrame] Implements df.update (ray-project#1997) [DataFrame] Implements df.as_matrix (ray-project#2001) [DataFrame] Implement quantile (ray-project#1992) [DataFrame] Impement sort_values and sort_index (ray-project#1977) [DataFrame] Implement rank (ray-project#1991) [DataFrame] Implemented prod, product, added test suite (ray-project#1994) [DataFrame] Implemented __setitem__, select_dtypes, and astype (ray-project#1941) [DataFrame] Implement diff (ray-project#1996) [DataFrame] Implemented nunique, skew (ray-project#1995) [DataFrame] Implements filter and dropna (ray-project#1959) [DataFrame] Implements df.pipe (ray-project#1999) [DataFrame] Apply() for Lists and Dicts (ray-project#1973) Clean up syntax for supported Python versions. (ray-project#1963) [DataFrame] Implements mode, to_datetime, and get_dummies (ray-project#1956) [DataFrame] Fix dtypes (ray-project#1930) keep_dims -> keepdims (ray-project#1980) ...
What do these changes do?
Possible Issues