Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DataFrame] Fully implement append, concat and join #1932

Merged
merged 7 commits into from
Apr 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 114 additions & 74 deletions python/ray/dataframe/concat.py
Original file line number Diff line number Diff line change
@@ -1,90 +1,130 @@
import pandas as pd
import numpy as np
from .dataframe import DataFrame as rdf
from .utils import (
from_pandas,
_deploy_func)
from functools import reduce
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import pandas
from .dataframe import DataFrame
from .utils import _reindex_helper


def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False,
keys=None, levels=None, names=None, verify_integrity=False,
copy=True):

def _concat(frame1, frame2):
# Check type on objects
# Case 1: Both are Pandas DF
if isinstance(frame1, pd.DataFrame) and \
isinstance(frame2, pd.DataFrame):

return pd.concat((frame1, frame2), axis, join, join_axes,
if keys is not None:
objs = [objs[k] for k in keys]
else:
objs = list(objs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None objects need to be dropped from objs as specified in pandas docs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved below


if len(objs) == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to handle case of ValueError: All objects passed were None

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved below

raise ValueError("No objects to concatenate")

objs = [obj for obj in objs if obj is not None]

if len(objs) == 0:
raise ValueError("All objects passed were None")

try:
type_check = next(obj for obj in objs
if not isinstance(obj, (pandas.Series,
pandas.DataFrame,
DataFrame)))
except StopIteration:
type_check = None
if type_check is not None:
raise ValueError("cannot concatenate object of type \"{0}\"; only "
"pandas.Series, pandas.DataFrame, "
"and ray.dataframe.DataFrame objs are "
"valid", type(type_check))

all_series = all([isinstance(obj, pandas.Series)
for obj in objs])
if all_series:
return pandas.concat(objs, axis, join, join_axes,
ignore_index, keys, levels, names,
verify_integrity, copy)

if not (isinstance(frame1, rdf) and
isinstance(frame2, rdf)) and join == 'inner':
raise NotImplementedError(
"Obj as dicts not implemented. To contribute to "
"Pandas on Ray, please visit github.com/ray-project/ray."
)

# Case 2: Both are different types
if isinstance(frame1, pd.DataFrame):
frame1 = from_pandas(frame1, len(frame1) / 2**16 + 1)
if isinstance(frame2, pd.DataFrame):
frame2 = from_pandas(frame2, len(frame2) / 2**16 + 1)

# Case 3: Both are Ray DF
if isinstance(frame1, rdf) and \
isinstance(frame2, rdf):

new_columns = frame1.columns.join(frame2.columns, how=join)

def _reindex_helper(pdf, old_columns, join):
pdf.columns = old_columns
if join == 'outer':
pdf = pdf.reindex(columns=new_columns)
else:
pdf = pdf[new_columns]
pdf.columns = pd.RangeIndex(len(new_columns))

return pdf
if isinstance(objs, dict):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is not necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this case not necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it is, my comment was wrong. I had understood that you could only pass in a dictionary with keys specified. Turns out you can pass in a dictionary by itself.

raise NotImplementedError(
"Obj as dicts not implemented. To contribute to "
"Pandas on Ray, please visit github.com/ray-project/ray.")

f1_columns, f2_columns = frame1.columns, frame2.columns
new_f1 = [_deploy_func.remote(lambda p: _reindex_helper(p,
f1_columns, join), part) for
part in frame1._row_partitions]
new_f2 = [_deploy_func.remote(lambda p: _reindex_helper(p,
f2_columns, join), part) for
part in frame2._row_partitions]
axis = pandas.DataFrame()._get_axis_number(axis)

return rdf(row_partitions=new_f1 + new_f2, columns=new_columns,
index=frame1.index.append(frame2.index))
if join not in ['inner', 'outer']:
raise ValueError("Only can inner (intersect) or outer (union) join the"
" other axis")

# (TODO) Group all the pandas dataframes
# We need this in a list because we use it later.
all_index, all_columns = list(zip(*[(obj.index, obj.columns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not work for Panel objects which do not have index or columns properties

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I'm going to just drop Panel support.

for obj in objs]))

if isinstance(objs, dict):
raise NotImplementedError(
"Obj as dicts not implemented. To contribute to "
"Pandas on Ray, please visit github.com/ray-project/ray."
)
def series_to_df(series, columns):
df = pandas.DataFrame(series)
df.columns = columns
return DataFrame(df)

axis = pd.DataFrame()._get_axis_number(axis)
if axis == 1:
raise NotImplementedError(
"Concat not implemented for axis=1. To contribute to "
"Pandas on Ray, please visit github.com/ray-project/ray."
)

all_pd = np.all([isinstance(obj, pd.DataFrame) for obj in objs])
if all_pd:
result = pd.concat(objs, axis, join, join_axes,
ignore_index, keys, levels, names,
verify_integrity, copy)
# Pandas puts all of the Series in a single column named 0. This is
# true regardless of the existence of another column named 0 in the
# concat.
if axis == 0:
objs = [series_to_df(obj, [0])
if isinstance(obj, pandas.Series) else obj for obj in objs]
else:
result = reduce(_concat, objs)
# Pandas starts the count at 0 so this will increment the names as
# long as there's a new nameless Series being added.
def name_incrementer(i):
val = i[0]
i[0] += 1
return val

i = [0]
objs = [series_to_df(obj, obj.name if obj.name is not None
else name_incrementer(i))
if isinstance(obj, pandas.Series) else obj for obj in objs]

# Using concat on the columns and index is fast because they're empty,
# and it forces the error checking. It also puts the columns in the
# correct order for us.
final_index = \
pandas.concat([pandas.DataFrame(index=idx) for idx in all_index],
axis=axis, join=join, join_axes=join_axes,
ignore_index=ignore_index, keys=keys, levels=levels,
names=names, verify_integrity=verify_integrity,
copy=False).index
final_columns = \
pandas.concat([pandas.DataFrame(columns=col)
for col in all_columns],
axis=axis, join=join, join_axes=join_axes,
ignore_index=ignore_index, keys=keys, levels=levels,
names=names, verify_integrity=verify_integrity,
copy=False).columns

# Put all of the DataFrames into Ray format
# TODO just partition the DataFrames instead of building a new Ray DF.
objs = [DataFrame(obj) if isinstance(obj, (pandas.DataFrame,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All pandas.Series objects would already be DataFrames by this point. Does it make sense to combine the steps?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not completely efficient to do it this way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved in series_to_df.

pandas.Series)) else obj
for obj in objs]

# Here we reuse all_columns/index so we don't have to materialize objects
# from remote memory built in the previous line. In the future, we won't be
# building new DataFrames, rather just partitioning the DataFrames.
if axis == 0:
new_rows = [_reindex_helper.remote(part, all_columns[i],
final_columns, axis)
for i in range(len(objs))
for part in objs[i]._row_partitions]

return DataFrame(row_partitions=new_rows,
columns=final_columns,
index=final_index)

if isinstance(result, pd.DataFrame):
return from_pandas(result, len(result) / 2**16 + 1)

return result
else:
new_columns = [_reindex_helper.remote(part, all_index[i],
final_index, axis)
for i in range(len(objs))
for part in objs[i]._col_partitions]

return DataFrame(col_partitions=new_columns,
columns=final_columns,
index=final_index)
137 changes: 130 additions & 7 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
_blocks_to_col,
_blocks_to_row,
_create_block_partitions,
_inherit_docstrings)
_inherit_docstrings,
_reindex_helper)
from . import get_npartitions
from .index_metadata import _IndexMetadata

Expand Down Expand Up @@ -911,9 +912,49 @@ def remote_func(df):
return self._arithmetic_helper(remote_func, axis, level)

def append(self, other, ignore_index=False, verify_integrity=False):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Append another DataFrame/list/Series to this one.

Args:
other: The object to append to this.
ignore_index: Ignore the index on appending.
verify_integrity: Verify the integrity of the index on completion.

Returns:
A new DataFrame containing the concatenated values.
"""
if isinstance(other, (pd.Series, dict)):
if isinstance(other, dict):
other = pd.Series(other)
if other.name is None and not ignore_index:
raise TypeError('Can only append a Series if ignore_index=True'
' or if the Series has a name')

if other.name is None:
index = None
else:
# other must have the same index name as self, otherwise
# index name will be reset
index = pd.Index([other.name], name=self.index.name)

combined_columns = self.columns.tolist() + self.columns.union(
other.index).difference(self.columns).tolist()
other = other.reindex(combined_columns, copy=False)
other = pd.DataFrame(other.values.reshape((1, len(other))),
index=index,
columns=combined_columns)
other = other._convert(datetime=True, timedelta=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the current DataFrame here need to reindex its columns to the combined_columns?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will happen in concat.

elif isinstance(other, list) and not isinstance(other[0], DataFrame):
other = pd.DataFrame(other)
if (self.columns.get_indexer(other.columns) >= 0).all():
other = other.loc[:, self.columns]

from .concat import concat
if isinstance(other, (list, tuple)):
to_concat = [self] + other
else:
to_concat = [self, other]
return concat(to_concat, ignore_index=ignore_index,
verify_integrity=verify_integrity)

def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None,
args=(), **kwds):
Expand Down Expand Up @@ -2028,9 +2069,91 @@ def _replace_index(row_tuple, idx):

def join(self, other, on=None, how='left', lsuffix='', rsuffix='',
sort=False):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Join two or more DataFrames, or a DataFrame with a collection.

Args:
other: What to join this DataFrame with.
on: A column name to use from the left for the join.
how: What type of join to conduct.
lsuffix: The suffix to add to column names that match on left.
rsuffix: The suffix to add to column names that match on right.
sort: Whether or not to sort.

Returns:
The joined DataFrame.
"""

if on is not None:
raise NotImplementedError("Not yet.")

if isinstance(other, pd.Series):
if other.name is None:
raise ValueError("Other Series must have a name")
other = DataFrame({other.name: other})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can pass other directly into DataFrame constructor. It carries the series name over to the column name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to how Pandas does it, so I vote we keep it this way. It's probably for clarity.


if isinstance(other, DataFrame):
if on is not None:
index = self[on]
else:
index = self.index

new_index = index.join(other.index, how=how, sort=sort)

# Joining two empty DataFrames is fast, and error checks for us.
new_column_labels = pd.DataFrame(columns=self.columns) \
.join(pd.DataFrame(columns=other.columns),
lsuffix=lsuffix, rsuffix=rsuffix).columns

# Join is a concat once we have shuffled the data internally.
# We shuffle the data by computing the correct order.
# Another important thing to note: We set the current self index
# to the index variable which may be 'on'.
new_self = [_reindex_helper.remote(col, index, new_index, 1)
for col in self._col_partitions]
new_other = [_reindex_helper.remote(col, other.index, new_index, 1)
for col in other._col_partitions]

# Append the columns together (i.e. concat)
new_column_parts = new_self + new_other

# Default index in the case that on is set.
if on is not None:
new_index = None

# TODO join the two metadata tables for performance.
return DataFrame(col_partitions=new_column_parts,
index=new_index,
columns=new_column_labels)
else:
# This constraint carried over from Pandas.
if on is not None:
raise ValueError("Joining multiple DataFrames only supported"
" for joining on index")

# Joining the empty DataFrames with either index or columns is
# fast. It gives us proper error checking for the edge cases that
# would otherwise require a lot more logic.
new_index = pd.DataFrame(index=self.index).join(
[pd.DataFrame(index=obj.index) for obj in other],
how=how, sort=sort).index

new_column_labels = pd.DataFrame(columns=self.columns).join(
[pd.DataFrame(columns=obj.columns) for obj in other],
lsuffix=lsuffix, rsuffix=rsuffix).columns

new_self = [_reindex_helper.remote(col, self.index, new_index, 1)
for col in self._col_partitions]

new_others = [_reindex_helper.remote(col, obj.index, new_index, 1)
for obj in other for col in obj._col_partitions]

# Append the columns together (i.e. concat)
new_column_parts = new_self + new_others

# TODO join the two metadata tables for performance.
return DataFrame(col_partitions=new_column_parts,
index=new_index,
columns=new_column_labels)

def kurt(self, axis=None, skipna=None, level=None, numeric_only=None,
**kwargs):
Expand Down
Loading