Skip to content

Commit

Permalink
[DataFrame] Fix transpose with nan values and add functionality neede…
Browse files Browse the repository at this point in the history
…d for Index (ray-project#1545)
  • Loading branch information
devin-petersohn authored and robertnishihara committed Feb 21, 2018
1 parent db4a920 commit de6fa02
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 38 deletions.
105 changes: 79 additions & 26 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import ray
import itertools

from .index import Index


class DataFrame(object):

def __init__(self, df, columns):
def __init__(self, df, columns, index=None):
"""Distributed DataFrame object backed by Pandas dataframes.
Args:
Expand All @@ -22,29 +24,53 @@ def __init__(self, df, columns):
assert(len(df) > 0)

self._df = df
# TODO: Clean up later.
# We will call get only when we access the object (and only once).
self._lengths = \
ray.get([_deploy_func.remote(_get_lengths, d) for d in self._df])
self.columns = columns

if index is None:
self._index = self._default_index()
else:
self._index = index

self._pd_index = None

def __str__(self):
return "ray.DataFrame object"

def __repr__(self):
return "ray.DataFrame object"

@property
def index(self):
def _get_index(self):
"""Get the index for this DataFrame.
Returns:
The union of all indexes across the partitions.
"""
indices = ray.get(self._map_partitions(lambda df: df.index)._df)
if isinstance(indices[0], pd.RangeIndex):
merged = indices[0]
for index in indices[1:]:
merged = merged.union(index)
return merged
else:
return indices[0].append(indices[1:])
if self._pd_index is None:
self._pd_index = Index.to_pandas(self._index)

return self._pd_index

def _set_index(self, new_index):
"""Set the index for this DataFrame.
Args:
new_index: The new index to set this
"""
self._pd_index = None
self._index = Index.from_pandas(new_index, self._lengths)

def _default_index(self):
dest_indices = [(i, j)
for i in range(len(self._lengths))
for j in range(self._lengths[i])]
return Index({i: dest_indices[i] for i in range(len(dest_indices))},
pd.RangeIndex)

index = property(_get_index, _set_index)

@property
def size(self):
Expand Down Expand Up @@ -140,7 +166,7 @@ def _map_partitions(self, func, *args):
assert(callable(func))
new_df = [_deploy_func.remote(func, part) for part in self._df]

return DataFrame(new_df, self.columns)
return DataFrame(new_df, self.columns, index=self._index)

def add_prefix(self, prefix):
"""Add a prefix to each of the column names.
Expand All @@ -150,7 +176,7 @@ def add_prefix(self, prefix):
"""
new_dfs = self._map_partitions(lambda df: df.add_prefix(prefix))
new_cols = self.columns.map(lambda x: str(prefix) + str(x))
return DataFrame(new_dfs._df, new_cols)
return DataFrame(new_dfs._df, new_cols, index=self._index)

def add_suffix(self, suffix):
"""Add a suffix to each of the column names.
Expand All @@ -160,7 +186,7 @@ def add_suffix(self, suffix):
"""
new_dfs = self._map_partitions(lambda df: df.add_suffix(suffix))
new_cols = self.columns.map(lambda x: str(x) + str(suffix))
return DataFrame(new_dfs._df, new_cols)
return DataFrame(new_dfs._df, new_cols, index=self._index)

def applymap(self, func):
"""Apply a function to a DataFrame elementwise.
Expand All @@ -177,7 +203,7 @@ def copy(self, deep=True):
Returns:
A new DataFrame pointing to the same partitions as this one.
"""
return DataFrame(self._df, self.columns)
return DataFrame(self._df, self.columns, index=self._index)

def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True,
group_keys=True, squeeze=False, **kwargs):
Expand All @@ -199,11 +225,8 @@ def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True,
[index for df in ray.get(self._df) for index in list(df.index)]))

chunksize = int(len(indices) / len(self._df))
partitions = []

for df in self._df:
partitions.append(_shuffle.remote(df, indices, chunksize))

partitions = [_shuffle.remote(df, indices, chunksize)
for df in self._df]
partitions = ray.get(partitions)

# Transpose the list of dataframes
Expand All @@ -213,7 +236,6 @@ def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True,
shuffle.append([])
for j in range(len(partitions)):
shuffle[i].append(partitions[j][i])

new_dfs = [_local_groupby.remote(part, axis=axis) for part in shuffle]

return DataFrame(new_dfs, self.columns)
Expand Down Expand Up @@ -311,8 +333,10 @@ def transpose(self, *args, **kwargs):
"""
local_transpose = self._map_partitions(
lambda df: df.transpose(*args, **kwargs))

# Sum will collapse the NAs from the groupby
return local_transpose.reduce_by_index(lambda df: df.sum(), axis=1)
return local_transpose.reduce_by_index(
lambda df: df.apply(lambda x: x), axis=1)

T = property(transpose)

Expand Down Expand Up @@ -1502,6 +1526,24 @@ def iloc(axis=None):
raise NotImplementedError("Not Yet implemented.")


def _get_lengths(df):
"""Gets the length of the dataframe.
Args:
df: A remote pd.DataFrame object.
Returns:
Returns an integer length of the dataframe object. If the attempt
fails, returns 0 as the length.
"""
try:
return len(df)
# Because we sometimes have cases where we have summary statistics in our
# DataFrames
except TypeError:
return 0


@ray.remote
def _shuffle(df, indices, chunksize):
"""Shuffle data by sending it through the Ray Store.
Expand All @@ -1518,12 +1560,12 @@ def _shuffle(df, indices, chunksize):
i = 0
partition = []
while len(indices) > chunksize:
oids = df.reindex(indices[:chunksize]).dropna()
oids = df.reindex(indices[:chunksize])
partition.append(oids)
indices = indices[chunksize:]
i += 1
else:
oids = df.reindex(indices).dropna()
oids = df.reindex(indices)
partition.append(oids)
return partition

Expand Down Expand Up @@ -1581,16 +1623,27 @@ def from_pandas(df, npartitions=None, chunksize=None, sort=True):
elif chunksize is None:
raise ValueError("The number of partitions or chunksize must be set.")

old_index = df.index

# TODO stop reassigning df
dataframes = []
lengths = []
while len(df) > chunksize:
top = ray.put(df[:chunksize])
t_df = df[:chunksize]
lengths.append(len(t_df))
# reindex here because we want a pd.RangeIndex within the partitions.
# It is smaller and sometimes faster.
t_df.reindex()
top = ray.put(t_df)
dataframes.append(top)
df = df[chunksize:]
else:
dataframes.append(ray.put(df))
lengths.append(len(df))

ray_index = Index.from_pandas(old_index, lengths)

return DataFrame(dataframes, df.columns)
return DataFrame(dataframes, df.columns, index=ray_index)


def to_pandas(df):
Expand Down
51 changes: 43 additions & 8 deletions python/ray/dataframe/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,50 @@

class Index(object):

def __init__(self, idx):
def __init__(self, idx, pandas_type):
self.idx = idx
self.pandas_type = pandas_type

def __getitem__(self, item):
return self.idx[item]

def __len__(self):
return len(self.idx)

@classmethod
def to_pandas(indices):
if isinstance(indices[0], pd.RangeIndex):
merged = indices[0]
for index in indices[1:]:
merged = merged.union(index)
return merged
def to_pandas(cls, index):
"""Convert a Ray Index object to a Pandas Index object.
Args:
index (ray.Index): A Ray Index object.
Returns:
A pandas Index object.
"""
k = index.idx.keys()
if index.pandas_type is pd.RangeIndex:
return pd.RangeIndex(min(k), max(k) + 1)
else:
return indices[0].append(indices[1:])
return pd.Index(k)

@classmethod
def from_pandas(cls, pd_index, lengths):
"""Convert a Pandas Index object to a Ray Index object.
Args:
pd_index (pd.Index): A Pandas Index object.
lengths ([int]): A list of lengths for the partitions.
Returns:
A Ray Index object.
"""
dest_indices = [(i, j)
for i in range(len(lengths))
for j in range(lengths[i])]
if len(pd_index) != len(dest_indices):
raise ValueError(
"Length of index given does not match current dataframe")

return Index(
{pd_index[i]: dest_indices[i] for i in range(len(dest_indices))},
type(pd_index))
56 changes: 52 additions & 4 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ def test_roundtrip(ray_df, pandas_df):
@pytest.fixture
def test_index(ray_df, pandas_df):
assert(ray_df.index.equals(pandas_df.index))
ray_df_cp = ray_df.copy()
pandas_df_cp = pandas_df.copy()

ray_df_cp.index = [str(i) for i in ray_df_cp.index]
pandas_df_cp.index = [str(i) for i in pandas_df_cp.index]
assert(ray_df_cp.index.sort_values().equals(pandas_df_cp.index))


@pytest.fixture
Expand All @@ -41,10 +47,7 @@ def test_ftypes(ray_df, pandas_df):

@pytest.fixture
def test_values(ray_df, pandas_df):
a = np.ndarray.flatten(ray_df.values)
b = np.ndarray.flatten(pandas_df.values)
for c, d in zip(a, b):
assert(c == d or (np.isnan(c) and np.isnan(d)))
np.testing.assert_equal(ray_df.values, pandas_df.values)


@pytest.fixture
Expand Down Expand Up @@ -339,6 +342,51 @@ def test_mixed_dtype_dataframe():
test_notnull(ray_df, pandas_df)


def test_nan_dataframe():
pandas_df = pd.DataFrame({
'col1': [1, 2, 3, np.nan],
'col2': [4, 5, np.nan, 7],
'col3': [8, np.nan, 10, 11],
'col4': [np.nan, 13, 14, 15]})

ray_df = rdf.from_pandas(pandas_df, 2)

testfuncs = [lambda x: x + x,
lambda x: str(x),
lambda x: x,
lambda x: False]

keys = ['col1',
'col2',
'col3',
'col4']

test_roundtrip(ray_df, pandas_df)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
test_ndim(ray_df, pandas_df)
test_ftypes(ray_df, pandas_df)
test_values(ray_df, pandas_df)
test_axes(ray_df, pandas_df)
test_shape(ray_df, pandas_df)
test_add_prefix(ray_df, pandas_df)
test_add_suffix(ray_df, pandas_df)

for testfunc in testfuncs:
test_applymap(ray_df, pandas_df, testfunc)

test_copy(ray_df)
test_sum(ray_df, pandas_df)
test_keys(ray_df, pandas_df)
test_transpose(ray_df, pandas_df)

for key in keys:
test_get(ray_df, pandas_df, key)

test_get_dtype_counts(ray_df, pandas_df)
test_get_ftype_counts(ray_df, pandas_df)


def test_add():
ray_df = create_test_dataframe()

Expand Down

0 comments on commit de6fa02

Please sign in to comment.