Skip to content

Commit

Permalink
[DataFrame] Fully implement append, concat and join (ray-project#1932)
Browse files Browse the repository at this point in the history
  • Loading branch information
devin-petersohn authored and robertnishihara committed Apr 24, 2018
1 parent 29c36f2 commit 1d1df7b
Show file tree
Hide file tree
Showing 5 changed files with 362 additions and 123 deletions.
188 changes: 114 additions & 74 deletions python/ray/dataframe/concat.py
Original file line number Diff line number Diff line change
@@ -1,90 +1,130 @@
import pandas as pd
import numpy as np
from .dataframe import DataFrame as rdf
from .utils import (
from_pandas,
_deploy_func)
from functools import reduce
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import pandas
from .dataframe import DataFrame
from .utils import _reindex_helper


def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False,
keys=None, levels=None, names=None, verify_integrity=False,
copy=True):

def _concat(frame1, frame2):
# Check type on objects
# Case 1: Both are Pandas DF
if isinstance(frame1, pd.DataFrame) and \
isinstance(frame2, pd.DataFrame):

return pd.concat((frame1, frame2), axis, join, join_axes,
if keys is not None:
objs = [objs[k] for k in keys]
else:
objs = list(objs)

if len(objs) == 0:
raise ValueError("No objects to concatenate")

objs = [obj for obj in objs if obj is not None]

if len(objs) == 0:
raise ValueError("All objects passed were None")

try:
type_check = next(obj for obj in objs
if not isinstance(obj, (pandas.Series,
pandas.DataFrame,
DataFrame)))
except StopIteration:
type_check = None
if type_check is not None:
raise ValueError("cannot concatenate object of type \"{0}\"; only "
"pandas.Series, pandas.DataFrame, "
"and ray.dataframe.DataFrame objs are "
"valid", type(type_check))

all_series = all([isinstance(obj, pandas.Series)
for obj in objs])
if all_series:
return pandas.concat(objs, axis, join, join_axes,
ignore_index, keys, levels, names,
verify_integrity, copy)

if not (isinstance(frame1, rdf) and
isinstance(frame2, rdf)) and join == 'inner':
raise NotImplementedError(
"Obj as dicts not implemented. To contribute to "
"Pandas on Ray, please visit github.com/ray-project/ray."
)

# Case 2: Both are different types
if isinstance(frame1, pd.DataFrame):
frame1 = from_pandas(frame1, len(frame1) / 2**16 + 1)
if isinstance(frame2, pd.DataFrame):
frame2 = from_pandas(frame2, len(frame2) / 2**16 + 1)

# Case 3: Both are Ray DF
if isinstance(frame1, rdf) and \
isinstance(frame2, rdf):

new_columns = frame1.columns.join(frame2.columns, how=join)

def _reindex_helper(pdf, old_columns, join):
pdf.columns = old_columns
if join == 'outer':
pdf = pdf.reindex(columns=new_columns)
else:
pdf = pdf[new_columns]
pdf.columns = pd.RangeIndex(len(new_columns))

return pdf
if isinstance(objs, dict):
raise NotImplementedError(
"Obj as dicts not implemented. To contribute to "
"Pandas on Ray, please visit github.com/ray-project/ray.")

f1_columns, f2_columns = frame1.columns, frame2.columns
new_f1 = [_deploy_func.remote(lambda p: _reindex_helper(p,
f1_columns, join), part) for
part in frame1._row_partitions]
new_f2 = [_deploy_func.remote(lambda p: _reindex_helper(p,
f2_columns, join), part) for
part in frame2._row_partitions]
axis = pandas.DataFrame()._get_axis_number(axis)

return rdf(row_partitions=new_f1 + new_f2, columns=new_columns,
index=frame1.index.append(frame2.index))
if join not in ['inner', 'outer']:
raise ValueError("Only can inner (intersect) or outer (union) join the"
" other axis")

# (TODO) Group all the pandas dataframes
# We need this in a list because we use it later.
all_index, all_columns = list(zip(*[(obj.index, obj.columns)
for obj in objs]))

if isinstance(objs, dict):
raise NotImplementedError(
"Obj as dicts not implemented. To contribute to "
"Pandas on Ray, please visit github.com/ray-project/ray."
)
def series_to_df(series, columns):
df = pandas.DataFrame(series)
df.columns = columns
return DataFrame(df)

axis = pd.DataFrame()._get_axis_number(axis)
if axis == 1:
raise NotImplementedError(
"Concat not implemented for axis=1. To contribute to "
"Pandas on Ray, please visit github.com/ray-project/ray."
)

all_pd = np.all([isinstance(obj, pd.DataFrame) for obj in objs])
if all_pd:
result = pd.concat(objs, axis, join, join_axes,
ignore_index, keys, levels, names,
verify_integrity, copy)
# Pandas puts all of the Series in a single column named 0. This is
# true regardless of the existence of another column named 0 in the
# concat.
if axis == 0:
objs = [series_to_df(obj, [0])
if isinstance(obj, pandas.Series) else obj for obj in objs]
else:
result = reduce(_concat, objs)
# Pandas starts the count at 0 so this will increment the names as
# long as there's a new nameless Series being added.
def name_incrementer(i):
val = i[0]
i[0] += 1
return val

i = [0]
objs = [series_to_df(obj, obj.name if obj.name is not None
else name_incrementer(i))
if isinstance(obj, pandas.Series) else obj for obj in objs]

# Using concat on the columns and index is fast because they're empty,
# and it forces the error checking. It also puts the columns in the
# correct order for us.
final_index = \
pandas.concat([pandas.DataFrame(index=idx) for idx in all_index],
axis=axis, join=join, join_axes=join_axes,
ignore_index=ignore_index, keys=keys, levels=levels,
names=names, verify_integrity=verify_integrity,
copy=False).index
final_columns = \
pandas.concat([pandas.DataFrame(columns=col)
for col in all_columns],
axis=axis, join=join, join_axes=join_axes,
ignore_index=ignore_index, keys=keys, levels=levels,
names=names, verify_integrity=verify_integrity,
copy=False).columns

# Put all of the DataFrames into Ray format
# TODO just partition the DataFrames instead of building a new Ray DF.
objs = [DataFrame(obj) if isinstance(obj, (pandas.DataFrame,
pandas.Series)) else obj
for obj in objs]

# Here we reuse all_columns/index so we don't have to materialize objects
# from remote memory built in the previous line. In the future, we won't be
# building new DataFrames, rather just partitioning the DataFrames.
if axis == 0:
new_rows = [_reindex_helper.remote(part, all_columns[i],
final_columns, axis)
for i in range(len(objs))
for part in objs[i]._row_partitions]

return DataFrame(row_partitions=new_rows,
columns=final_columns,
index=final_index)

if isinstance(result, pd.DataFrame):
return from_pandas(result, len(result) / 2**16 + 1)

return result
else:
new_columns = [_reindex_helper.remote(part, all_index[i],
final_index, axis)
for i in range(len(objs))
for part in objs[i]._col_partitions]

return DataFrame(col_partitions=new_columns,
columns=final_columns,
index=final_index)
137 changes: 130 additions & 7 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
_blocks_to_col,
_blocks_to_row,
_create_block_partitions,
_inherit_docstrings)
_inherit_docstrings,
_reindex_helper)
from . import get_npartitions
from .index_metadata import _IndexMetadata

Expand Down Expand Up @@ -911,9 +912,49 @@ def remote_func(df):
return self._arithmetic_helper(remote_func, axis, level)

def append(self, other, ignore_index=False, verify_integrity=False):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Append another DataFrame/list/Series to this one.
Args:
other: The object to append to this.
ignore_index: Ignore the index on appending.
verify_integrity: Verify the integrity of the index on completion.
Returns:
A new DataFrame containing the concatenated values.
"""
if isinstance(other, (pd.Series, dict)):
if isinstance(other, dict):
other = pd.Series(other)
if other.name is None and not ignore_index:
raise TypeError('Can only append a Series if ignore_index=True'
' or if the Series has a name')

if other.name is None:
index = None
else:
# other must have the same index name as self, otherwise
# index name will be reset
index = pd.Index([other.name], name=self.index.name)

combined_columns = self.columns.tolist() + self.columns.union(
other.index).difference(self.columns).tolist()
other = other.reindex(combined_columns, copy=False)
other = pd.DataFrame(other.values.reshape((1, len(other))),
index=index,
columns=combined_columns)
other = other._convert(datetime=True, timedelta=True)
elif isinstance(other, list) and not isinstance(other[0], DataFrame):
other = pd.DataFrame(other)
if (self.columns.get_indexer(other.columns) >= 0).all():
other = other.loc[:, self.columns]

from .concat import concat
if isinstance(other, (list, tuple)):
to_concat = [self] + other
else:
to_concat = [self, other]
return concat(to_concat, ignore_index=ignore_index,
verify_integrity=verify_integrity)

def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None,
args=(), **kwds):
Expand Down Expand Up @@ -2028,9 +2069,91 @@ def _replace_index(row_tuple, idx):

def join(self, other, on=None, how='left', lsuffix='', rsuffix='',
sort=False):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Join two or more DataFrames, or a DataFrame with a collection.
Args:
other: What to join this DataFrame with.
on: A column name to use from the left for the join.
how: What type of join to conduct.
lsuffix: The suffix to add to column names that match on left.
rsuffix: The suffix to add to column names that match on right.
sort: Whether or not to sort.
Returns:
The joined DataFrame.
"""

if on is not None:
raise NotImplementedError("Not yet.")

if isinstance(other, pd.Series):
if other.name is None:
raise ValueError("Other Series must have a name")
other = DataFrame({other.name: other})

if isinstance(other, DataFrame):
if on is not None:
index = self[on]
else:
index = self.index

new_index = index.join(other.index, how=how, sort=sort)

# Joining two empty DataFrames is fast, and error checks for us.
new_column_labels = pd.DataFrame(columns=self.columns) \
.join(pd.DataFrame(columns=other.columns),
lsuffix=lsuffix, rsuffix=rsuffix).columns

# Join is a concat once we have shuffled the data internally.
# We shuffle the data by computing the correct order.
# Another important thing to note: We set the current self index
# to the index variable which may be 'on'.
new_self = [_reindex_helper.remote(col, index, new_index, 1)
for col in self._col_partitions]
new_other = [_reindex_helper.remote(col, other.index, new_index, 1)
for col in other._col_partitions]

# Append the columns together (i.e. concat)
new_column_parts = new_self + new_other

# Default index in the case that on is set.
if on is not None:
new_index = None

# TODO join the two metadata tables for performance.
return DataFrame(col_partitions=new_column_parts,
index=new_index,
columns=new_column_labels)
else:
# This constraint carried over from Pandas.
if on is not None:
raise ValueError("Joining multiple DataFrames only supported"
" for joining on index")

# Joining the empty DataFrames with either index or columns is
# fast. It gives us proper error checking for the edge cases that
# would otherwise require a lot more logic.
new_index = pd.DataFrame(index=self.index).join(
[pd.DataFrame(index=obj.index) for obj in other],
how=how, sort=sort).index

new_column_labels = pd.DataFrame(columns=self.columns).join(
[pd.DataFrame(columns=obj.columns) for obj in other],
lsuffix=lsuffix, rsuffix=rsuffix).columns

new_self = [_reindex_helper.remote(col, self.index, new_index, 1)
for col in self._col_partitions]

new_others = [_reindex_helper.remote(col, obj.index, new_index, 1)
for obj in other for col in obj._col_partitions]

# Append the columns together (i.e. concat)
new_column_parts = new_self + new_others

# TODO join the two metadata tables for performance.
return DataFrame(col_partitions=new_column_parts,
index=new_index,
columns=new_column_labels)

def kurt(self, axis=None, skipna=None, level=None, numeric_only=None,
**kwargs):
Expand Down
Loading

0 comments on commit 1d1df7b

Please sign in to comment.