Skip to content

Commit

Permalink
Adding series and a way to validate our API. (ray-project#1435)
Browse files Browse the repository at this point in the history
* Adding series and a way to validate our API.

* Moving partitions into protected status
  • Loading branch information
devin-petersohn authored and robertnishihara committed Jan 22, 2018
1 parent 74718ef commit 4aca016
Show file tree
Hide file tree
Showing 7 changed files with 3,264 additions and 37 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ script:
- python test/trial_runner_test.py
- python test/trial_scheduler_test.py
- python test/cython_test.py
- python -m pytest test/dataframe.py
- python -m pytest python/ray/dataframe/test/test_dataframe.py
- python -m pytest python/ray/dataframe/test/test_series.py

- python -m pytest python/ray/rllib/test/test_catalog.py
- python -m pytest python/ray/rllib/test/test_filters.py
Expand Down
3 changes: 2 additions & 1 deletion python/ray/dataframe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from .dataframe import DataFrame
from .dataframe import from_pandas
from .dataframe import to_pandas
from .series import Series
import ray
import pandas as pd

__all__ = ["DataFrame", "from_pandas", "to_pandas"]
__all__ = ["DataFrame", "from_pandas", "to_pandas", "Series"]

ray.register_custom_serializer(pd.DataFrame, use_pickle=True)
ray.register_custom_serializer(pd.core.indexes.base.Index, use_pickle=True)
171 changes: 137 additions & 34 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ def __init__(self, df, columns):
"""
assert(len(df) > 0)

self.df = df
self._df = df
self.columns = columns

def __str__(self):
return str(pd.concat(ray.get(self.df)))
return "ray.DataFrame object"

def __repr__(self):
return str(pd.concat(ray.get(self.df)))
return "ray.DataFrame object"

@property
def index(self):
Expand All @@ -35,7 +35,7 @@ def index(self):
Returns:
The union of all indexes across the partitions.
"""
indices = ray.get(self.map_partitions(lambda df: df.index).df)
indices = ray.get(self._map_partitions(lambda df: df.index)._df)
return indices[0].append(indices[1:])

@property
Expand All @@ -45,7 +45,7 @@ def size(self):
Returns:
The number of elements in the DataFrame.
"""
sizes = ray.get(self.map_partitions(lambda df: df.size).df)
sizes = ray.get(self._map_partitions(lambda df: df.size)._df)
return sum(sizes)

@property
Expand All @@ -57,7 +57,7 @@ def ndim(self):
"""
# The number of dimensions is common across all partitions.
# The first partition will be enough.
return ray.get(_deploy_func.remote(lambda df: df.ndim, self.df[0]))
return ray.get(_deploy_func.remote(lambda df: df.ndim, self._df[0]))

@property
def ftypes(self):
Expand All @@ -68,7 +68,7 @@ def ftypes(self):
"""
# The ftypes are common across all partitions.
# The first partition will be enough.
return ray.get(_deploy_func.remote(lambda df: df.ftypes, self.df[0]))
return ray.get(_deploy_func.remote(lambda df: df.ftypes, self._df[0]))

@property
def dtypes(self):
Expand All @@ -79,7 +79,7 @@ def dtypes(self):
"""
# The dtypes are common across all partitions.
# The first partition will be enough.
return ray.get(_deploy_func.remote(lambda df: df.dtypes, self.df[0]))
return ray.get(_deploy_func.remote(lambda df: df.dtypes, self._df[0]))

@property
def empty(self):
Expand All @@ -89,7 +89,7 @@ def empty(self):
True if the DataFrame is empty.
False otherwise.
"""
all_empty = ray.get(self.map_partitions(lambda df: df.empty).df)
all_empty = ray.get(self._map_partitions(lambda df: df.empty)._df)
return False not in all_empty

@property
Expand All @@ -100,7 +100,7 @@ def values(self):
The numpy representation of this DataFrame.
"""
return np.concatenate(
ray.get(self.map_partitions(lambda df: df.values).df))
ray.get(self._map_partitions(lambda df: df.values)._df))

@property
def axes(self):
Expand All @@ -120,7 +120,7 @@ def shape(self):
"""
return (len(self.index), len(self.columns))

def map_partitions(self, func, *args):
def _map_partitions(self, func, *args):
"""Apply a function on each partition.
Args:
Expand All @@ -130,7 +130,7 @@ def map_partitions(self, func, *args):
A new DataFrame containing the result of the function.
"""
assert(callable(func))
new_df = [_deploy_func.remote(func, part) for part in self.df]
new_df = [_deploy_func.remote(func, part) for part in self._df]

return DataFrame(new_df, self.columns)

Expand All @@ -140,19 +140,19 @@ def add_prefix(self, prefix):
Returns:
A new DataFrame containing the new column names.
"""
new_dfs = self.map_partitions(lambda df: df.add_prefix(prefix))
new_dfs = self._map_partitions(lambda df: df.add_prefix(prefix))
new_cols = self.columns.map(lambda x: str(prefix) + str(x))
return DataFrame(new_dfs.df, new_cols)
return DataFrame(new_dfs._df, new_cols)

def add_suffix(self, suffix):
"""Add a suffix to each of the column names.
Returns:
A new DataFrame containing the new column names.
"""
new_dfs = self.map_partitions(lambda df: df.add_suffix(suffix))
new_dfs = self._map_partitions(lambda df: df.add_suffix(suffix))
new_cols = self.columns.map(lambda x: str(x) + str(suffix))
return DataFrame(new_dfs.df, new_cols)
return DataFrame(new_dfs._df, new_cols)

def applymap(self, func):
"""Apply a function to a DataFrame elementwise.
Expand All @@ -161,15 +161,15 @@ def applymap(self, func):
func (callable): The function to apply.
"""
assert(callable(func))
return self.map_partitions(lambda df: df.applymap(lambda x: func(x)))
return self._map_partitions(lambda df: df.applymap(lambda x: func(x)))

def copy(self, deep=True):
"""Creates a shallow copy of the DataFrame.
Returns:
A new DataFrame pointing to the same partitions as this one.
"""
return DataFrame(self.df, self.columns)
return DataFrame(self._df, self.columns)

def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True,
group_keys=True, squeeze=False, **kwargs):
Expand All @@ -188,12 +188,12 @@ def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True,
"""

indices = list(set(
[index for df in ray.get(self.df) for index in list(df.index)]))
[index for df in ray.get(self._df) for index in list(df.index)]))

chunksize = int(len(indices) / len(self.df))
chunksize = int(len(indices) / len(self._df))
partitions = []

for df in self.df:
for df in self._df:
partitions.append(_shuffle.remote(df, indices, chunksize))

partitions = ray.get(partitions)
Expand All @@ -220,7 +220,7 @@ def reduce_by_index(self, func, axis=0):
Returns:
A new DataFrame with the result of the reduction.
"""
return self.groupby(axis=axis).map_partitions(func)
return self.groupby(axis=axis)._map_partitions(func)

def sum(self, axis=None, skipna=True, level=None, numeric_only=None):
"""Perform a sum across the DataFrame.
Expand All @@ -232,7 +232,7 @@ def sum(self, axis=None, skipna=True, level=None, numeric_only=None):
Returns:
The sum of the DataFrame.
"""
sum_of_partitions = self.map_partitions(
sum_of_partitions = self._map_partitions(
lambda df: df.sum(axis=axis, skipna=skipna, level=level,
numeric_only=numeric_only))

Expand All @@ -246,7 +246,7 @@ def abs(self):
Returns:
A new DataFrame with the applied absolute value.
"""
return self.map_partitions(lambda df: df.abs())
return self._map_partitions(lambda df: df.abs())

def isin(self, values):
"""Fill a DataFrame with booleans for cells contained in values.
Expand All @@ -260,7 +260,7 @@ def isin(self, values):
True: cell is contained in values.
False: otherwise
"""
return self.map_partitions(lambda df: df.isin(values))
return self._map_partitions(lambda df: df.isin(values))

def isna(self):
"""Fill a DataFrame with booleans for cells containing NA.
Expand All @@ -271,7 +271,7 @@ def isna(self):
True: cell contains NA.
False: otherwise.
"""
return self.map_partitions(lambda df: df.isna())
return self._map_partitions(lambda df: df.isna())

def isnull(self):
"""Fill a DataFrame with booleans for cells containing a null value.
Expand All @@ -282,7 +282,7 @@ def isnull(self):
True: cell contains null.
False: otherwise.
"""
return self.map_partitions(lambda df: df.isnull)
return self._map_partitions(lambda df: df.isnull)

def keys(self):
"""Get the info axis for the DataFrame.
Expand All @@ -291,7 +291,7 @@ def keys(self):
A pandas Index for this DataFrame.
"""
# Each partition should have the same index, so we'll use 0's
return ray.get(_deploy_func.remote(lambda df: df.keys(), self.df[0]))
return ray.get(_deploy_func.remote(lambda df: df.keys(), self._df[0]))

def transpose(self, *args, **kwargs):
"""Transpose columns and rows for the DataFrame.
Expand All @@ -301,7 +301,7 @@ def transpose(self, *args, **kwargs):
Returns:
A new DataFrame transposed from this DataFrame.
"""
local_transpose = self.map_partitions(
local_transpose = self._map_partitions(
lambda df: df.transpose(*args, **kwargs))
# Sum will collapse the NAs from the groupby
return local_transpose.reduce_by_index(lambda df: df.sum(), axis=1)
Expand Down Expand Up @@ -1019,9 +1019,6 @@ def __len__(self):
def __unicode__(self):
raise NotImplementedError("Not Yet implemented.")

def __neg__(self):
raise NotImplementedError("Not Yet implemented.")

def __invert__(self):
raise NotImplementedError("Not Yet implemented.")

Expand Down Expand Up @@ -1070,6 +1067,112 @@ def __copy__(self, deep=True):
def __deepcopy__(self, memo=None):
raise NotImplementedError("Not Yet implemented.")

def __and__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __or__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __xor__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __lt__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __le__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __gt__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __ge__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __eq__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __ne__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __add__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __iadd__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __mul__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __imul__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __pow__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __ipow__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __sub__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __isub__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __neg__(self):
raise NotImplementedError("Not Yet implemented.")

def __floordiv__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __truediv__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __mod__(self, other):
raise NotImplementedError("Not Yet implemented.")

def __sizeof__(self):
raise NotImplementedError("Not Yet implemented.")

@property
def __doc__(self):
raise NotImplementedError("Not Yet implemented.")

@property
def blocks(self):
raise NotImplementedError("Not Yet implemented.")

@property
def style(self):
raise NotImplementedError("Not Yet implemented.")

def iat(axis=None):
raise NotImplementedError("Not Yet implemented.")

def __rsub__(other, axis=None, level=None, fill_value=None):
raise NotImplementedError("Not Yet implemented.")

def loc(axis=None):
raise NotImplementedError("Not Yet implemented.")

@property
def is_copy(self):
raise NotImplementedError("Not Yet implemented.")

def __itruediv__(other):
raise NotImplementedError("Not Yet implemented.")

def __div__(other, axis=None, level=None, fill_value=None):
raise NotImplementedError("Not Yet implemented.")

def at(axis=None):
raise NotImplementedError("Not Yet implemented.")

def ix(axis=None):
raise NotImplementedError("Not Yet implemented.")

def iloc(axis=None):
raise NotImplementedError("Not Yet implemented.")


@ray.remote
def _shuffle(df, indices, chunksize):
Expand Down Expand Up @@ -1114,7 +1217,7 @@ def _local_groupby(df_rows, axis=0):

@ray.remote
def _deploy_func(func, dataframe, *args):
"""Deploys a function for the map_partitions call.
"""Deploys a function for the _map_partitions call.
Args:
dataframe (pandas.DataFrame): The pandas DataFrame for this partition.
Expand Down Expand Up @@ -1171,4 +1274,4 @@ def to_pandas(df):
Returns:
A new pandas DataFrame.
"""
return pd.concat(ray.get(df.df))
return pd.concat(ray.get(df._df))
Loading

0 comments on commit 4aca016

Please sign in to comment.