Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DataFrame] Implemented cummax, cummin, cumsum, cumprod #1705

Merged
merged 5 commits into from
Mar 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 119 additions & 15 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
_shuffle,
_local_groupby,
_deploy_func,
_compute_length_and_index)
_compute_length_and_index,
_prepend_partitions)


class DataFrame(object):
Expand Down Expand Up @@ -698,24 +699,127 @@ def cov(self, min_periods=None):
"github.com/ray-project/ray.")

def cummax(self, axis=None, skipna=True, *args, **kwargs):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Perform a cumulative maximum across the DataFrame.

Args:
axis (int): The axis to take maximum on.
skipna (bool): True to skip NA values, false otherwise.

Returns:
The cumulative maximum of the DataFrame.
"""
if axis == 1:
return self._map_partitions(
lambda df: df.cummax(axis=axis, skipna=skipna,
*args, **kwargs))
else:
local_max = [_deploy_func.remote(
lambda df: pd.DataFrame(df.max()).T, self._df[i])
for i in range(len(self._df))]
new_df = DataFrame(local_max, self.columns)
last_row_df = pd.DataFrame([df.iloc[-1, :]
for df in ray.get(new_df._df)])
cum_df = [_prepend_partitions.remote(last_row_df, i, self._df[i],
lambda df:
df.cummax(axis=axis,
skipna=skipna,
*args, **kwargs))
for i in range(len(self._df))]
final_df = DataFrame(cum_df, self.columns)
return final_df

def cummin(self, axis=None, skipna=True, *args, **kwargs):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Perform a cumulative minimum across the DataFrame.

Args:
axis (int): The axis to cummin on.
skipna (bool): True to skip NA values, false otherwise.

Returns:
The cumulative minimum of the DataFrame.
"""
if axis == 1:
return self._map_partitions(
lambda df: df.cummin(axis=axis, skipna=skipna,
*args, **kwargs))
else:
local_min = [_deploy_func.remote(
lambda df: pd.DataFrame(df.min()).T, self._df[i])
for i in range(len(self._df))]
new_df = DataFrame(local_min, self.columns)
last_row_df = pd.DataFrame([df.iloc[-1, :]
for df in ray.get(new_df._df)])
cum_df = [_prepend_partitions.remote(last_row_df, i, self._df[i],
lambda df:
df.cummin(axis=axis,
skipna=skipna,
*args, **kwargs))
for i in range(len(self._df))]
final_df = DataFrame(cum_df, self.columns)
return final_df

def cumprod(self, axis=None, skipna=True, *args, **kwargs):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Perform a cumulative product across the DataFrame.

Args:
axis (int): The axis to take product on.
skipna (bool): True to skip NA values, false otherwise.

Returns:
The cumulative product of the DataFrame.
"""
if axis == 1:
return self._map_partitions(
lambda df: df.cumprod(axis=axis, skipna=skipna,
*args, **kwargs))
else:
local_prod = [_deploy_func.remote(
lambda df: pd.DataFrame(df.prod()).T, self._df[i])
for i in range(len(self._df))]
new_df = DataFrame(local_prod, self.columns)
last_row_df = pd.DataFrame([df.iloc[-1, :]
for df in ray.get(new_df._df)])
cum_df = [_prepend_partitions.remote(last_row_df, i, self._df[i],
lambda df:
df.cumprod(axis=axis,
skipna=skipna,
*args, **kwargs))
for i in range(len(self._df))]
final_df = DataFrame(cum_df, self.columns)
return final_df

def cumsum(self, axis=None, skipna=True, *args, **kwargs):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Perform a cumulative sum across the DataFrame.

Args:
axis (int): The axis to take sum on.
skipna (bool): True to skip NA values, false otherwise.

Returns:
The cumulative sum of the DataFrame.
"""
if axis == 1:
return self._map_partitions(
lambda df: df.cumsum(axis=axis, skipna=skipna,
*args, **kwargs))
else:
# first take the sum of each partition,
# append the sums of all previous partitions to current partition
# take cumsum and remove the appended rows
local_sum = [_deploy_func.remote(
lambda df: pd.DataFrame(df.sum()).T, self._df[i])
for i in range(len(self._df))]
new_df = DataFrame(local_sum, self.columns)
last_row_df = pd.DataFrame([df.iloc[-1, :]
for df in ray.get(new_df._df)])
cum_df = [_prepend_partitions.remote(last_row_df, i, self._df[i],
lambda df:
df.cumsum(axis=axis,
skipna=skipna,
*args, **kwargs))
for i in range(len(self._df))]
final_df = DataFrame(cum_df, self.columns)
return final_df

def describe(self, percentiles=None, include=None, exclude=None):
raise NotImplementedError(
Expand Down Expand Up @@ -1503,7 +1607,7 @@ def max(self, axis=None, skipna=None, level=None, numeric_only=None,
Returns:
The max of the DataFrame.
"""
if(axis == 1):
if axis == 1:
return self._map_partitions(
lambda df: df.max(axis=axis, skipna=skipna, level=level,
numeric_only=numeric_only, **kwargs))
Expand Down Expand Up @@ -1553,7 +1657,7 @@ def min(self, axis=None, skipna=None, level=None, numeric_only=None,
Returns:
The min of the DataFrame.
"""
if(axis == 1):
if axis == 1:
return self._map_partitions(
lambda df: df.min(axis=axis, skipna=skipna, level=level,
numeric_only=numeric_only, **kwargs))
Expand Down
70 changes: 39 additions & 31 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ def test_int_dataframe():
test_min(ray_df, pandas_df)
test_notna(ray_df, pandas_df)
test_notnull(ray_df, pandas_df)
test_cummax(ray_df, pandas_df)
test_cummin(ray_df, pandas_df)
test_cumprod(ray_df, pandas_df)
test_cumsum(ray_df, pandas_df)

test_loc(ray_df, pandas_df)
test_iloc(ray_df, pandas_df)
Expand Down Expand Up @@ -333,6 +337,10 @@ def test_float_dataframe():
test_min(ray_df, pandas_df)
test_notna(ray_df, pandas_df)
test_notnull(ray_df, pandas_df)
test_cummax(ray_df, pandas_df)
test_cummin(ray_df, pandas_df)
test_cumprod(ray_df, pandas_df)
test_cumsum(ray_df, pandas_df)

test___len__(ray_df, pandas_df)
test_first_valid_index(ray_df, pandas_df)
Expand Down Expand Up @@ -451,6 +459,10 @@ def test_mixed_dtype_dataframe():
test_min(ray_df, pandas_df)
test_notna(ray_df, pandas_df)
test_notnull(ray_df, pandas_df)
test_cummax(ray_df, pandas_df)
test_cummin(ray_df, pandas_df)
# test_cumprod(ray_df, pandas_df)
test_cumsum(ray_df, pandas_df)

test___len__(ray_df, pandas_df)
test_first_valid_index(ray_df, pandas_df)
Expand Down Expand Up @@ -558,6 +570,10 @@ def test_nan_dataframe():
test_min(ray_df, pandas_df)
test_notna(ray_df, pandas_df)
test_notnull(ray_df, pandas_df)
test_cummax(ray_df, pandas_df)
test_cummin(ray_df, pandas_df)
test_cumprod(ray_df, pandas_df)
test_cumsum(ray_df, pandas_df)

test___len__(ray_df, pandas_df)
test_first_valid_index(ray_df, pandas_df)
Expand Down Expand Up @@ -824,32 +840,24 @@ def test_cov():
ray_df.cov()


def test_cummax():
ray_df = create_test_dataframe()

with pytest.raises(NotImplementedError):
ray_df.cummax()


def test_cummin():
ray_df = create_test_dataframe()
@pytest.fixture
def test_cummax(ray_df, pandas_df):
assert(ray_df_equals_pandas(ray_df.cummax(), pandas_df.cummax()))

with pytest.raises(NotImplementedError):
ray_df.cummin()

@pytest.fixture
def test_cummin(ray_df, pandas_df):
assert(ray_df_equals_pandas(ray_df.cummin(), pandas_df.cummin()))

def test_cumprod():
ray_df = create_test_dataframe()

with pytest.raises(NotImplementedError):
ray_df.cumprod()

@pytest.fixture
def test_cumprod(ray_df, pandas_df):
assert(ray_df_equals_pandas(ray_df.cumprod(), pandas_df.cumprod()))

def test_cumsum():
ray_df = create_test_dataframe()

with pytest.raises(NotImplementedError):
ray_df.cumsum()
@pytest.fixture
def test_cumsum(ray_df, pandas_df):
assert(ray_df_equals_pandas(ray_df.cumsum(), pandas_df.cumsum()))


def test_describe():
Expand Down Expand Up @@ -1897,7 +1905,7 @@ def test_prod():
ray_df = create_test_dataframe()

with pytest.raises(NotImplementedError):
ray_df.prod()
ray_df.prod(None)


def test_product():
Expand Down Expand Up @@ -1985,7 +1993,7 @@ def test_rename_sanity(num_partitions=2):
'D': 'd'
}

ray_df = rdf.from_pandas(test_data.frame, num_partitions)
ray_df = from_pandas(test_data.frame, num_partitions)
assert ray_df_equals_pandas(
ray_df.rename(columns=mapping),
test_data.frame.rename(columns=mapping)
Expand All @@ -1997,7 +2005,7 @@ def test_rename_sanity(num_partitions=2):
renamed2
)

ray_df = rdf.from_pandas(renamed2, num_partitions)
ray_df = from_pandas(renamed2, num_partitions)
assert ray_df_equals_pandas(
ray_df.rename(columns=str.upper),
renamed2.rename(columns=str.upper)
Expand All @@ -2010,7 +2018,7 @@ def test_rename_sanity(num_partitions=2):

# gets sorted alphabetical
df = pd.DataFrame(data)
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
tm.assert_index_equal(
ray_df.rename(index={'foo': 'bar', 'bar': 'foo'}).index,
df.rename(index={'foo': 'bar', 'bar': 'foo'}).index
Expand All @@ -2026,7 +2034,7 @@ def test_rename_sanity(num_partitions=2):

# partial columns
renamed = test_data.frame.rename(columns={'C': 'foo', 'D': 'bar'})
ray_df = rdf.from_pandas(test_data.frame, num_partitions)
ray_df = from_pandas(test_data.frame, num_partitions)
tm.assert_index_equal(
ray_df.rename(columns={'C': 'foo', 'D': 'bar'}).index,
test_data.frame.rename(columns={'C': 'foo', 'D': 'bar'}).index
Expand All @@ -2044,7 +2052,7 @@ def test_rename_sanity(num_partitions=2):
index = pd.Index(['foo', 'bar'], name='name')
renamer = pd.DataFrame(data, index=index)

ray_df = rdf.from_pandas(renamer, num_partitions)
ray_df = from_pandas(renamer, num_partitions)
renamed = renamer.rename(index={'foo': 'bar', 'bar': 'foo'})
ray_renamed = ray_df.rename(index={'foo': 'bar', 'bar': 'foo'})
tm.assert_index_equal(
Expand All @@ -2062,7 +2070,7 @@ def test_rename_multiindex(num_partitions=2):
columns = pd.MultiIndex.from_tuples(
tuples_columns, names=['fizz', 'buzz'])
df = pd.DataFrame([(0, 0), (1, 1)], index=index, columns=columns)
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)

#
# without specifying level -> accross all levels
Expand Down Expand Up @@ -2133,7 +2141,7 @@ def test_rename_multiindex(num_partitions=2):
@pytest.fixture
def test_rename_nocopy(num_partitions=2):
test_data = TestData().frame
ray_df = rdf.from_pandas(test_data, num_partitions)
ray_df = from_pandas(test_data, num_partitions)
ray_renamed = ray_df.rename(columns={'C': 'foo'}, copy=False)
ray_renamed['foo'] = 1
assert (ray_df['C'] == 1).all()
Expand All @@ -2142,7 +2150,7 @@ def test_rename_nocopy(num_partitions=2):
@pytest.fixture
def test_rename_inplace(num_partitions=2):
test_data = TestData().frame
ray_df = rdf.from_pandas(test_data, num_partitions)
ray_df = from_pandas(test_data, num_partitions)

assert ray_df_equals_pandas(
ray_df.rename(columns={'C': 'foo'}),
Expand All @@ -2165,7 +2173,7 @@ def test_rename_bug(num_partitions=2):
# GH 5344
# rename set ref_locs, and set_index was not resetting
df = pd.DataFrame({0: ['foo', 'bar'], 1: ['bah', 'bas'], 2: [1, 2]})
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
df = df.rename(columns={0: 'a'})
df = df.rename(columns={1: 'b'})
# TODO: Uncomment when set_index is implemented
Expand All @@ -2191,7 +2199,7 @@ def test_rename_axis():
@pytest.fixture
def test_rename_axis_inplace(num_partitions=2):
test_frame = TestData().frame
ray_df = rdf.from_pandas(test_frame, num_partitions)
ray_df = from_pandas(test_frame, num_partitions)

# GH 15704
result = test_frame.copy()
Expand Down
7 changes: 7 additions & 0 deletions python/ray/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,10 @@ def _compute_length_and_index(dfs):
for j in range(lengths[i])]}

return lengths, pd.DataFrame(dest_indices)


@ray.remote
def _prepend_partitions(last_vals, index, partition, func):
appended_df = last_vals[:index].append(partition)
cum_df = func(appended_df)
return cum_df[index:]