-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DataFrame] Fully implement append, concat and join #1932
Changes from all commits
92cb5f1
05f2130
37d376f
46e1ab4
ec6d9dd
246abed
92fe4bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,90 +1,130 @@ | ||
import pandas as pd | ||
import numpy as np | ||
from .dataframe import DataFrame as rdf | ||
from .utils import ( | ||
from_pandas, | ||
_deploy_func) | ||
from functools import reduce | ||
from __future__ import absolute_import | ||
from __future__ import division | ||
from __future__ import print_function | ||
|
||
import pandas | ||
from .dataframe import DataFrame | ||
from .utils import _reindex_helper | ||
|
||
|
||
def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False, | ||
keys=None, levels=None, names=None, verify_integrity=False, | ||
copy=True): | ||
|
||
def _concat(frame1, frame2): | ||
# Check type on objects | ||
# Case 1: Both are Pandas DF | ||
if isinstance(frame1, pd.DataFrame) and \ | ||
isinstance(frame2, pd.DataFrame): | ||
|
||
return pd.concat((frame1, frame2), axis, join, join_axes, | ||
if keys is not None: | ||
objs = [objs[k] for k in keys] | ||
else: | ||
objs = list(objs) | ||
|
||
if len(objs) == 0: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to handle case of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resolved below |
||
raise ValueError("No objects to concatenate") | ||
|
||
objs = [obj for obj in objs if obj is not None] | ||
|
||
if len(objs) == 0: | ||
raise ValueError("All objects passed were None") | ||
|
||
try: | ||
type_check = next(obj for obj in objs | ||
if not isinstance(obj, (pandas.Series, | ||
pandas.DataFrame, | ||
DataFrame))) | ||
except StopIteration: | ||
type_check = None | ||
if type_check is not None: | ||
raise ValueError("cannot concatenate object of type \"{0}\"; only " | ||
"pandas.Series, pandas.DataFrame, " | ||
"and ray.dataframe.DataFrame objs are " | ||
"valid", type(type_check)) | ||
|
||
all_series = all([isinstance(obj, pandas.Series) | ||
for obj in objs]) | ||
if all_series: | ||
return pandas.concat(objs, axis, join, join_axes, | ||
ignore_index, keys, levels, names, | ||
verify_integrity, copy) | ||
|
||
if not (isinstance(frame1, rdf) and | ||
isinstance(frame2, rdf)) and join == 'inner': | ||
raise NotImplementedError( | ||
"Obj as dicts not implemented. To contribute to " | ||
"Pandas on Ray, please visit github.com/ray-project/ray." | ||
) | ||
|
||
# Case 2: Both are different types | ||
if isinstance(frame1, pd.DataFrame): | ||
frame1 = from_pandas(frame1, len(frame1) / 2**16 + 1) | ||
if isinstance(frame2, pd.DataFrame): | ||
frame2 = from_pandas(frame2, len(frame2) / 2**16 + 1) | ||
|
||
# Case 3: Both are Ray DF | ||
if isinstance(frame1, rdf) and \ | ||
isinstance(frame2, rdf): | ||
|
||
new_columns = frame1.columns.join(frame2.columns, how=join) | ||
|
||
def _reindex_helper(pdf, old_columns, join): | ||
pdf.columns = old_columns | ||
if join == 'outer': | ||
pdf = pdf.reindex(columns=new_columns) | ||
else: | ||
pdf = pdf[new_columns] | ||
pdf.columns = pd.RangeIndex(len(new_columns)) | ||
|
||
return pdf | ||
if isinstance(objs, dict): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This case is not necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this case not necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually it is, my comment was wrong. I had understood that you could only pass in a dictionary with keys specified. Turns out you can pass in a dictionary by itself. |
||
raise NotImplementedError( | ||
"Obj as dicts not implemented. To contribute to " | ||
"Pandas on Ray, please visit github.com/ray-project/ray.") | ||
|
||
f1_columns, f2_columns = frame1.columns, frame2.columns | ||
new_f1 = [_deploy_func.remote(lambda p: _reindex_helper(p, | ||
f1_columns, join), part) for | ||
part in frame1._row_partitions] | ||
new_f2 = [_deploy_func.remote(lambda p: _reindex_helper(p, | ||
f2_columns, join), part) for | ||
part in frame2._row_partitions] | ||
axis = pandas.DataFrame()._get_axis_number(axis) | ||
|
||
return rdf(row_partitions=new_f1 + new_f2, columns=new_columns, | ||
index=frame1.index.append(frame2.index)) | ||
if join not in ['inner', 'outer']: | ||
raise ValueError("Only can inner (intersect) or outer (union) join the" | ||
" other axis") | ||
|
||
# (TODO) Group all the pandas dataframes | ||
# We need this in a list because we use it later. | ||
all_index, all_columns = list(zip(*[(obj.index, obj.columns) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will not work for Panel objects which do not have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True, I'm going to just drop |
||
for obj in objs])) | ||
|
||
if isinstance(objs, dict): | ||
raise NotImplementedError( | ||
"Obj as dicts not implemented. To contribute to " | ||
"Pandas on Ray, please visit github.com/ray-project/ray." | ||
) | ||
def series_to_df(series, columns): | ||
df = pandas.DataFrame(series) | ||
df.columns = columns | ||
return DataFrame(df) | ||
|
||
axis = pd.DataFrame()._get_axis_number(axis) | ||
if axis == 1: | ||
raise NotImplementedError( | ||
"Concat not implemented for axis=1. To contribute to " | ||
"Pandas on Ray, please visit github.com/ray-project/ray." | ||
) | ||
|
||
all_pd = np.all([isinstance(obj, pd.DataFrame) for obj in objs]) | ||
if all_pd: | ||
result = pd.concat(objs, axis, join, join_axes, | ||
ignore_index, keys, levels, names, | ||
verify_integrity, copy) | ||
# Pandas puts all of the Series in a single column named 0. This is | ||
# true regardless of the existence of another column named 0 in the | ||
# concat. | ||
if axis == 0: | ||
objs = [series_to_df(obj, [0]) | ||
if isinstance(obj, pandas.Series) else obj for obj in objs] | ||
else: | ||
result = reduce(_concat, objs) | ||
# Pandas starts the count at 0 so this will increment the names as | ||
# long as there's a new nameless Series being added. | ||
def name_incrementer(i): | ||
val = i[0] | ||
i[0] += 1 | ||
return val | ||
|
||
i = [0] | ||
objs = [series_to_df(obj, obj.name if obj.name is not None | ||
else name_incrementer(i)) | ||
if isinstance(obj, pandas.Series) else obj for obj in objs] | ||
|
||
# Using concat on the columns and index is fast because they're empty, | ||
# and it forces the error checking. It also puts the columns in the | ||
# correct order for us. | ||
final_index = \ | ||
pandas.concat([pandas.DataFrame(index=idx) for idx in all_index], | ||
axis=axis, join=join, join_axes=join_axes, | ||
ignore_index=ignore_index, keys=keys, levels=levels, | ||
names=names, verify_integrity=verify_integrity, | ||
copy=False).index | ||
final_columns = \ | ||
pandas.concat([pandas.DataFrame(columns=col) | ||
for col in all_columns], | ||
axis=axis, join=join, join_axes=join_axes, | ||
ignore_index=ignore_index, keys=keys, levels=levels, | ||
names=names, verify_integrity=verify_integrity, | ||
copy=False).columns | ||
|
||
# Put all of the DataFrames into Ray format | ||
# TODO just partition the DataFrames instead of building a new Ray DF. | ||
objs = [DataFrame(obj) if isinstance(obj, (pandas.DataFrame, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All pandas.Series objects would already be DataFrames by this point. Does it make sense to combine the steps? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's not completely efficient to do it this way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resolved in |
||
pandas.Series)) else obj | ||
for obj in objs] | ||
|
||
# Here we reuse all_columns/index so we don't have to materialize objects | ||
# from remote memory built in the previous line. In the future, we won't be | ||
# building new DataFrames, rather just partitioning the DataFrames. | ||
if axis == 0: | ||
new_rows = [_reindex_helper.remote(part, all_columns[i], | ||
final_columns, axis) | ||
for i in range(len(objs)) | ||
for part in objs[i]._row_partitions] | ||
|
||
return DataFrame(row_partitions=new_rows, | ||
columns=final_columns, | ||
index=final_index) | ||
|
||
if isinstance(result, pd.DataFrame): | ||
return from_pandas(result, len(result) / 2**16 + 1) | ||
|
||
return result | ||
else: | ||
new_columns = [_reindex_helper.remote(part, all_index[i], | ||
final_index, axis) | ||
for i in range(len(objs)) | ||
for part in objs[i]._col_partitions] | ||
|
||
return DataFrame(col_partitions=new_columns, | ||
columns=final_columns, | ||
index=final_index) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,7 +35,8 @@ | |
_blocks_to_col, | ||
_blocks_to_row, | ||
_create_block_partitions, | ||
_inherit_docstrings) | ||
_inherit_docstrings, | ||
_reindex_helper) | ||
from . import get_npartitions | ||
from .index_metadata import _IndexMetadata | ||
|
||
|
@@ -911,9 +912,49 @@ def remote_func(df): | |
return self._arithmetic_helper(remote_func, axis, level) | ||
|
||
def append(self, other, ignore_index=False, verify_integrity=False): | ||
raise NotImplementedError( | ||
"To contribute to Pandas on Ray, please visit " | ||
"github.com/ray-project/ray.") | ||
"""Append another DataFrame/list/Series to this one. | ||
|
||
Args: | ||
other: The object to append to this. | ||
ignore_index: Ignore the index on appending. | ||
verify_integrity: Verify the integrity of the index on completion. | ||
|
||
Returns: | ||
A new DataFrame containing the concatenated values. | ||
""" | ||
if isinstance(other, (pd.Series, dict)): | ||
if isinstance(other, dict): | ||
other = pd.Series(other) | ||
if other.name is None and not ignore_index: | ||
raise TypeError('Can only append a Series if ignore_index=True' | ||
' or if the Series has a name') | ||
|
||
if other.name is None: | ||
index = None | ||
else: | ||
# other must have the same index name as self, otherwise | ||
# index name will be reset | ||
index = pd.Index([other.name], name=self.index.name) | ||
|
||
combined_columns = self.columns.tolist() + self.columns.union( | ||
other.index).difference(self.columns).tolist() | ||
other = other.reindex(combined_columns, copy=False) | ||
other = pd.DataFrame(other.values.reshape((1, len(other))), | ||
index=index, | ||
columns=combined_columns) | ||
other = other._convert(datetime=True, timedelta=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the current DataFrame here need to reindex its columns to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will happen in |
||
elif isinstance(other, list) and not isinstance(other[0], DataFrame): | ||
other = pd.DataFrame(other) | ||
if (self.columns.get_indexer(other.columns) >= 0).all(): | ||
other = other.loc[:, self.columns] | ||
|
||
from .concat import concat | ||
if isinstance(other, (list, tuple)): | ||
to_concat = [self] + other | ||
else: | ||
to_concat = [self, other] | ||
return concat(to_concat, ignore_index=ignore_index, | ||
verify_integrity=verify_integrity) | ||
|
||
def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, | ||
args=(), **kwds): | ||
|
@@ -2028,9 +2069,91 @@ def _replace_index(row_tuple, idx): | |
|
||
def join(self, other, on=None, how='left', lsuffix='', rsuffix='', | ||
sort=False): | ||
raise NotImplementedError( | ||
"To contribute to Pandas on Ray, please visit " | ||
"github.com/ray-project/ray.") | ||
"""Join two or more DataFrames, or a DataFrame with a collection. | ||
|
||
Args: | ||
other: What to join this DataFrame with. | ||
on: A column name to use from the left for the join. | ||
how: What type of join to conduct. | ||
lsuffix: The suffix to add to column names that match on left. | ||
rsuffix: The suffix to add to column names that match on right. | ||
sort: Whether or not to sort. | ||
|
||
Returns: | ||
The joined DataFrame. | ||
""" | ||
|
||
if on is not None: | ||
raise NotImplementedError("Not yet.") | ||
|
||
if isinstance(other, pd.Series): | ||
if other.name is None: | ||
raise ValueError("Other Series must have a name") | ||
other = DataFrame({other.name: other}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can pass other directly into DataFrame constructor. It carries the series name over to the column name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is similar to how Pandas does it, so I vote we keep it this way. It's probably for clarity. |
||
|
||
if isinstance(other, DataFrame): | ||
if on is not None: | ||
index = self[on] | ||
else: | ||
index = self.index | ||
|
||
new_index = index.join(other.index, how=how, sort=sort) | ||
|
||
# Joining two empty DataFrames is fast, and error checks for us. | ||
new_column_labels = pd.DataFrame(columns=self.columns) \ | ||
.join(pd.DataFrame(columns=other.columns), | ||
lsuffix=lsuffix, rsuffix=rsuffix).columns | ||
|
||
# Join is a concat once we have shuffled the data internally. | ||
# We shuffle the data by computing the correct order. | ||
# Another important thing to note: We set the current self index | ||
# to the index variable which may be 'on'. | ||
new_self = [_reindex_helper.remote(col, index, new_index, 1) | ||
for col in self._col_partitions] | ||
new_other = [_reindex_helper.remote(col, other.index, new_index, 1) | ||
for col in other._col_partitions] | ||
|
||
# Append the columns together (i.e. concat) | ||
new_column_parts = new_self + new_other | ||
|
||
# Default index in the case that on is set. | ||
if on is not None: | ||
new_index = None | ||
|
||
# TODO join the two metadata tables for performance. | ||
return DataFrame(col_partitions=new_column_parts, | ||
index=new_index, | ||
columns=new_column_labels) | ||
else: | ||
# This constraint carried over from Pandas. | ||
if on is not None: | ||
raise ValueError("Joining multiple DataFrames only supported" | ||
" for joining on index") | ||
|
||
# Joining the empty DataFrames with either index or columns is | ||
# fast. It gives us proper error checking for the edge cases that | ||
# would otherwise require a lot more logic. | ||
new_index = pd.DataFrame(index=self.index).join( | ||
[pd.DataFrame(index=obj.index) for obj in other], | ||
how=how, sort=sort).index | ||
|
||
new_column_labels = pd.DataFrame(columns=self.columns).join( | ||
[pd.DataFrame(columns=obj.columns) for obj in other], | ||
lsuffix=lsuffix, rsuffix=rsuffix).columns | ||
|
||
new_self = [_reindex_helper.remote(col, self.index, new_index, 1) | ||
for col in self._col_partitions] | ||
|
||
new_others = [_reindex_helper.remote(col, obj.index, new_index, 1) | ||
for obj in other for col in obj._col_partitions] | ||
|
||
# Append the columns together (i.e. concat) | ||
new_column_parts = new_self + new_others | ||
|
||
# TODO join the two metadata tables for performance. | ||
return DataFrame(col_partitions=new_column_parts, | ||
index=new_index, | ||
columns=new_column_labels) | ||
|
||
def kurt(self, axis=None, skipna=None, level=None, numeric_only=None, | ||
**kwargs): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None
objects need to be dropped fromobjs
as specified in pandas docs.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved below