Skip to content

Commit

Permalink
[DataFrame] Impement sort_values and sort_index (ray-project#1977)
Browse files Browse the repository at this point in the history
  • Loading branch information
devin-petersohn authored and robertnishihara committed May 6, 2018
1 parent 9f28529 commit ad1afeb
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 14 deletions.
172 changes: 166 additions & 6 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3839,15 +3839,175 @@ def slice_shift(self, periods=1, axis=0):
def sort_index(self, axis=0, level=None, ascending=True, inplace=False,
kind='quicksort', na_position='last', sort_remaining=True,
by=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Sort a DataFrame by one of the indices (columns or index).
Args:
axis: The axis to sort over.
level: The MultiIndex level to sort over.
ascending: Ascending or descending
inplace: Whether or not to update this DataFrame inplace.
kind: How to perform the sort.
na_position: Where to position NA on the sort.
sort_remaining: On Multilevel Index sort based on all levels.
by: (Deprecated) argument to pass to sort_values.
Returns:
A sorted DataFrame
"""
if level is not None:
raise NotImplementedError("Multilevel index not yet implemented.")

if by is not None:
warnings.warn("by argument to sort_index is deprecated, "
"please use .sort_values(by=...)",
FutureWarning, stacklevel=2)
if level is not None:
raise ValueError("unable to simultaneously sort by and level")
return self.sort_values(by, axis=axis, ascending=ascending,
inplace=inplace)

axis = pd.DataFrame()._get_axis_number(axis)

args = (axis, level, ascending, False, kind, na_position,
sort_remaining)

def _sort_helper(df, index, axis, *args):
if axis == 0:
df.index = index
else:
df.columns = index

result = df.sort_index(*args)
df.reset_index(drop=True, inplace=True)
df.columns = pd.RangeIndex(len(df.columns))
return result

if axis == 0:
index = self.index
new_column_parts = _map_partitions(
lambda df: _sort_helper(df, index, axis, *args),
self._col_partitions)

new_columns = self.columns
new_index = self.index.sort_values()
new_row_parts = None
else:
columns = self.columns
new_row_parts = _map_partitions(
lambda df: _sort_helper(df, columns, axis, *args),
self._row_partitions)

new_columns = self.columns.sort_values()
new_index = self.index
new_column_parts = None

if not inplace:
return DataFrame(col_partitions=new_column_parts,
row_partitions=new_row_parts,
index=new_index,
columns=new_columns)
else:
self._update_inplace(row_partitions=new_row_parts,
col_partitions=new_column_parts,
columns=new_columns,
index=new_index)

def sort_values(self, by, axis=0, ascending=True, inplace=False,
kind='quicksort', na_position='last'):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Sorts by a column/row or list of columns/rows.
Args:
by: A list of labels for the axis to sort over.
axis: The axis to sort.
ascending: Sort in ascending or descending order.
inplace: If true, do the operation inplace.
kind: How to sort.
na_position: Where to put np.nan values.
Returns:
A sorted DataFrame.
"""

axis = pd.DataFrame()._get_axis_number(axis)

if not is_list_like(by):
by = [by]

if axis == 0:
broadcast_value_dict = {str(col): self[col] for col in by}
broadcast_values = pd.DataFrame(broadcast_value_dict)
else:
broadcast_value_list = [to_pandas(self[row::len(self.index)])
for row in by]

index_builder = list(zip(broadcast_value_list, by))

for row, idx in index_builder:
row.index = [str(idx)]

broadcast_values = pd.concat([row for row, idx in index_builder])

# We are converting the by to string here so that we don't have a
# collision with the RangeIndex on the inner frame. It is cheap and
# gaurantees that we sort by the correct column.
by = [str(col) for col in by]

args = (by, axis, ascending, False, kind, na_position)

def _sort_helper(df, broadcast_values, axis, *args):
"""Sorts the data on a partition.
Args:
df: The DataFrame to sort.
broadcast_values: The by DataFrame to use for the sort.
axis: The axis to sort over.
args: The args for the sort.
Returns:
A new sorted DataFrame.
"""
if axis == 0:
broadcast_values.index = df.index
names = broadcast_values.columns
else:
broadcast_values.columns = df.columns
names = broadcast_values.index

return pd.concat([df, broadcast_values], axis=axis ^ 1,
copy=False).sort_values(*args)\
.drop(names, axis=axis ^ 1)

if axis == 0:
new_column_partitions = _map_partitions(
lambda df: _sort_helper(df, broadcast_values, axis, *args),
self._col_partitions)

new_row_partitions = None
new_columns = self.columns

# This is important because it allows us to get the axis that we
# aren't sorting over. We need the order of the columns/rows and
# this will provide that in the return value.
new_index = broadcast_values.sort_values(*args).index
else:
new_row_partitions = _map_partitions(
lambda df: _sort_helper(df, broadcast_values, axis, *args),
self._row_partitions)

new_column_partitions = None
new_columns = broadcast_values.sort_values(*args).columns
new_index = self.index

if inplace:
self._update_inplace(row_partitions=new_row_partitions,
col_partitions=new_column_partitions,
columns=new_columns,
index=new_index)
else:
return DataFrame(row_partitions=new_row_partitions,
col_partitions=new_column_partitions,
columns=new_columns,
index=new_index)

def sortlevel(self, level=0, axis=0, ascending=True, inplace=False,
sort_remaining=True):
Expand Down
40 changes: 32 additions & 8 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -969,8 +969,6 @@ def test_append():

pandas_df2 = pd.DataFrame({"col5": [0], "col6": [1]})

print(ray_df.append(ray_df2))

assert ray_df_equals_pandas(ray_df.append(ray_df2),
pandas_df.append(pandas_df2))

Expand Down Expand Up @@ -2829,17 +2827,43 @@ def test_slice_shift():


def test_sort_index():
ray_df = create_test_dataframe()
pandas_df = pd.DataFrame(np.random.randint(0, 100, size=(1000, 100)))
ray_df = rdf.DataFrame(pandas_df)

with pytest.raises(NotImplementedError):
ray_df.sort_index()
pandas_result = pandas_df.sort_index()
ray_result = ray_df.sort_index()

ray_df_equals_pandas(ray_result, pandas_result)

pandas_result = pandas_df.sort_index(ascending=False)
ray_result = ray_df.sort_index(ascending=False)

ray_df_equals_pandas(ray_result, pandas_result)


def test_sort_values():
ray_df = create_test_dataframe()
pandas_df = pd.DataFrame(np.random.randint(0, 100, size=(1000, 100)))
ray_df = rdf.DataFrame(pandas_df)

with pytest.raises(NotImplementedError):
ray_df.sort_values(None)
pandas_result = pandas_df.sort_values(by=1)
ray_result = ray_df.sort_values(by=1)

ray_df_equals_pandas(ray_result, pandas_result)

pandas_result = pandas_df.sort_values(by=1, axis=1)
ray_result = ray_df.sort_values(by=1, axis=1)

ray_df_equals_pandas(ray_result, pandas_result)

pandas_result = pandas_df.sort_values(by=[1, 3])
ray_result = ray_df.sort_values(by=[1, 3])

ray_df_equals_pandas(ray_result, pandas_result)

pandas_result = pandas_df.sort_values(by=[1, 67], axis=1)
ray_result = ray_df.sort_values(by=[1, 67], axis=1)

ray_df_equals_pandas(ray_result, pandas_result)


def test_sortlevel():
Expand Down

0 comments on commit ad1afeb

Please sign in to comment.