Skip to content

Commit

Permalink
[DataFrame] Implementing API correct groupby with aggregation methods (
Browse files Browse the repository at this point in the history
  • Loading branch information
devin-petersohn authored and robertnishihara committed Apr 22, 2018
1 parent 8264e64 commit 8f59546
Show file tree
Hide file tree
Showing 4 changed files with 609 additions and 159 deletions.
255 changes: 225 additions & 30 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pandas.core.index import _ensure_index_from_sequences
from pandas._libs import lib
from pandas.core.dtypes.cast import maybe_upcast_putmask
from pandas import compat
from pandas.compat import lzip
import pandas.core.common as com
from pandas.core.dtypes.common import (
Expand All @@ -25,6 +26,7 @@
import sys
import re

from .groupby import DataFrameGroupBy
from .utils import (
_deploy_func,
_map_partitions,
Expand Down Expand Up @@ -90,7 +92,6 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
axis = 0
columns = pd_df.columns
index = pd_df.index
self._row_metadata = self._col_metadata = None
else:
# created this invariant to make sure we never have to go into the
# partitions to get the columns
Expand All @@ -101,28 +102,25 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,
if block_partitions is not None:
# put in numpy array here to make accesses easier since it's 2D
self._block_partitions = np.array(block_partitions)
if row_metadata is not None:
self._row_metadata = row_metadata.copy()
if col_metadata is not None:
self._col_metadata = col_metadata.copy()
assert self._block_partitions.ndim == 2, \
"Block Partitions must be 2D."
else:
if row_partitions is not None:
axis = 0
partitions = row_partitions
if row_metadata is not None:
self._row_metadata = row_metadata.copy()
elif col_partitions is not None:
axis = 1
partitions = col_partitions
if col_metadata is not None:
self._col_metadata = col_metadata.copy()

self._block_partitions = \
_create_block_partitions(partitions, axis=axis,
length=len(columns))

if row_metadata is not None:
self._row_metadata = row_metadata.copy()
if col_metadata is not None:
self._col_metadata = col_metadata.copy()

# Sometimes we only get a single column or row, which is
# problematic for building blocks from the partitions, so we
# add whatever dimension we're missing from the input.
Expand All @@ -132,10 +130,10 @@ def __init__(self, data=None, index=None, columns=None, dtype=None,

# Create the row and column index objects for using our partitioning.
# If the objects haven't been inherited, then generate them
if not self._row_metadata:
if self._row_metadata is None:
self._row_metadata = _IndexMetadata(self._block_partitions[:, 0],
index=index, axis=0)
if not self._col_metadata:
if self._col_metadata is None:
self._col_metadata = _IndexMetadata(self._block_partitions[0, :],
index=columns, axis=1)

Expand Down Expand Up @@ -562,9 +560,23 @@ def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True,
Returns:
A new DataFrame resulting from the groupby.
"""
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
axis = pd.DataFrame()._get_axis_number(axis)
if callable(by):
by = by(self.index)
elif isinstance(by, compat.string_types):
by = self.__getitem__(by).values.tolist()
elif is_list_like(by):
mismatch = len(by) != len(self) if axis == 0 \
else len(by) != len(self.columns)

if all([obj in self for obj in by]) and mismatch:
raise NotImplementedError(
"Groupby with lists of columns not yet supported.")
elif mismatch:
raise KeyError(next(x for x in by if x not in self))

return DataFrameGroupBy(self, by, axis, level, as_index, sort,
group_keys, squeeze, **kwargs)

def sum(self, axis=None, skipna=True, level=None, numeric_only=None):
"""Perform a sum across the DataFrame.
Expand Down Expand Up @@ -635,7 +647,9 @@ def isna(self):

return DataFrame(block_partitions=new_block_partitions,
columns=self.columns,
index=self.index)
index=self.index,
row_metadata=self._row_metadata,
col_metadata=self._col_metadata)

def isnull(self):
"""Fill a DataFrame with booleans for cells containing a null value.
Expand Down Expand Up @@ -704,14 +718,162 @@ def add(self, other, axis='columns', level=None, fill_value=None):
"github.com/ray-project/ray.")

def agg(self, func, axis=0, *args, **kwargs):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
return self.aggregate(func, axis, *args, **kwargs)

def aggregate(self, func, axis=0, *args, **kwargs):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
axis = pd.DataFrame()._get_axis_number(axis)

result = None

if axis == 0:
try:
result = self._aggregate(func, axis=axis, *args, **kwargs)
except TypeError:
pass

if result is None:
kwargs.pop('is_transform', None)
return self.apply(func, axis=axis, args=args, **kwargs)

return result

def _aggregate(self, arg, *args, **kwargs):
_axis = kwargs.pop('_axis', None)
if _axis is None:
_axis = getattr(self, 'axis', 0)
kwargs.pop('_level', None)

if isinstance(arg, compat.string_types):
return self._string_function(arg, *args, **kwargs)

# Dictionaries have complex behavior because they can be renamed here.
elif isinstance(arg, dict):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
elif is_list_like(arg):
from .concat import concat

x = [self._aggregate(func, *args, **kwargs)
for func in arg]

new_dfs = [x[i] if not isinstance(x[i], pd.Series)
else pd.DataFrame(x[i], columns=[arg[i]]).T
for i in range(len(x))]

return concat(new_dfs)
elif callable(arg):
self._callable_function(arg, _axis, *args, **kwargs)
else:
# TODO Make pandas error
raise ValueError("type {} is not callable".format(type(arg)))

def _string_function(self, func, *args, **kwargs):
assert isinstance(func, compat.string_types)

f = getattr(self, func, None)

if f is not None:
if callable(f):
return f(*args, **kwargs)

assert len(args) == 0
assert len([kwarg
for kwarg in kwargs
if kwarg not in ['axis', '_level']]) == 0
return f

f = getattr(np, func, None)
if f is not None:
raise NotImplementedError("Numpy aggregates not yet supported.")

raise ValueError("{} is an unknown string function".format(func))

def _callable_function(self, func, axis, *args, **kwargs):
if axis == 0:
partitions = self._col_partitions
else:
partitions = self._row_partitions

if axis == 1:
kwargs['axis'] = axis
kwargs['temp_columns'] = self.columns
else:
kwargs['temp_index'] = self.index

def agg_helper(df, arg, *args, **kwargs):
if 'temp_index' in kwargs:
df.index = kwargs.pop('temp_index', None)
else:
df.columns = kwargs.pop('temp_columns', None)
is_transform = kwargs.pop('is_transform', False)
new_df = df.agg(arg, *args, **kwargs)

is_series = False

if isinstance(new_df, pd.Series):
is_series = True
index = None
columns = None
else:
index = new_df.index \
if not isinstance(new_df.index, pd.RangeIndex) \
else None
columns = new_df.columns
new_df.columns = pd.RangeIndex(0, len(new_df.columns))
new_df.reset_index(drop=True, inplace=True)

if is_transform:
if is_scalar(new_df) or len(new_df) != len(df):
raise ValueError("transforms cannot produce "
"aggregated results")

return is_series, new_df, index, columns

remote_result = \
[_deploy_func._submit(args=(lambda df: agg_helper(df,
func,
*args,
**kwargs),
part), num_return_vals=4)
for part in partitions]

# This magic transposes the list comprehension returned from remote
is_series, new_parts, index, columns = \
[list(t) for t in zip(*remote_result)]

# This part is because agg can allow returning a Series or a
# DataFrame, and we have to determine which here. Shouldn't add
# too much to latency in either case because the booleans can
# be returned immediately
is_series = ray.get(is_series)
if all(is_series):
new_series = pd.concat(ray.get(new_parts))
new_series.index = self.columns if axis == 0 else self.index
return new_series
# This error is thrown when some of the partitions return Series and
# others return DataFrames. We do not allow mixed returns.
elif any(is_series):
raise ValueError("no results.")
# The remaining logic executes when we have only DataFrames in the
# remote objects. We build a Ray DataFrame from the Pandas partitions.
elif axis == 0:
new_index = ray.get(index[0])
columns = ray.get(columns)
columns = columns[0].append(columns[1:])

return DataFrame(col_partitions=new_parts,
columns=columns,
index=self.index if new_index is None
else new_index)
else:
new_index = ray.get(index[0])
columns = ray.get(columns)
columns = columns[0].append(columns[1:])
return DataFrame(row_partitions=new_parts,
columns=columns,
index=self.index if new_index is None
else new_index)

def align(self, other, join='outer', axis=None, level=None, copy=True,
fill_value=None, method=None, limit=None, fill_axis=0,
Expand Down Expand Up @@ -755,9 +917,38 @@ def append(self, other, ignore_index=False, verify_integrity=False):

def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None,
args=(), **kwds):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Apply a function along input axis of DataFrame.
Args:
func: The function to apply
axis: The axis over which to apply the func.
broadcast: Whether or not to broadcast.
raw: Whether or not to convert to a Series.
reduce: Whether or not to try to apply reduction procedures.
Returns:
Series or DataFrame, depending on func.
"""
axis = pd.DataFrame()._get_axis_number(axis)

if is_list_like(func) and not all([isinstance(obj, str)
for obj in func]):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")

if axis == 0 and is_list_like(func):
return self.aggregate(func, axis, *args, **kwds)
if isinstance(func, compat.string_types):
if axis == 1:
kwds['axis'] = axis
return getattr(self, func)(*args, **kwds)
elif callable(func):
return self._callable_function(func, axis=axis, *args, **kwds)
else:
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")

def as_blocks(self, copy=True):
raise NotImplementedError(
Expand Down Expand Up @@ -2879,9 +3070,14 @@ def to_xarray(self):
"github.com/ray-project/ray.")

def transform(self, func, *args, **kwargs):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
kwargs["is_transform"] = True
result = self.agg(func, *args, **kwargs)
try:
result.columns = self.columns
result.index = self.index
except ValueError:
raise ValueError("transforms cannot produce aggregated results")
return result

def truediv(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
Expand Down Expand Up @@ -3135,9 +3331,8 @@ def __round__(self, decimals=0):
"github.com/ray-project/ray.")

def __array__(self, dtype=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
# TODO: This is very inefficient and needs fix
return np.array(to_pandas(self))

def __array_wrap__(self, result, context=None):
raise NotImplementedError(
Expand Down
Loading

0 comments on commit 8f59546

Please sign in to comment.