Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DataFrame] Impement sort_values and sort_index #1977

Merged
merged 6 commits into from
May 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 166 additions & 6 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3272,15 +3272,175 @@ def slice_shift(self, periods=1, axis=0):
def sort_index(self, axis=0, level=None, ascending=True, inplace=False,
kind='quicksort', na_position='last', sort_remaining=True,
by=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Sort a DataFrame by one of the indices (columns or index).

Args:
axis: The axis to sort over.
level: The MultiIndex level to sort over.
ascending: Ascending or descending
inplace: Whether or not to update this DataFrame inplace.
kind: How to perform the sort.
na_position: Where to position NA on the sort.
sort_remaining: On Multilevel Index sort based on all levels.
by: (Deprecated) argument to pass to sort_values.

Returns:
A sorted DataFrame
"""
if level is not None:
raise NotImplementedError("Multilevel index not yet implemented.")

if by is not None:
warnings.warn("by argument to sort_index is deprecated, "
"please use .sort_values(by=...)",
FutureWarning, stacklevel=2)
if level is not None:
raise ValueError("unable to simultaneously sort by and level")
return self.sort_values(by, axis=axis, ascending=ascending,
inplace=inplace)

axis = pd.DataFrame()._get_axis_number(axis)

args = (axis, level, ascending, False, kind, na_position,
sort_remaining)

def _sort_helper(df, index, axis, *args):
if axis == 0:
df.index = index
else:
df.columns = index

result = df.sort_index(*args)
df.reset_index(drop=True, inplace=True)
df.columns = pd.RangeIndex(len(df.columns))
return result

if axis == 0:
index = self.index
new_column_parts = _map_partitions(
lambda df: _sort_helper(df, index, axis, *args),
self._col_partitions)

new_columns = self.columns
new_index = self.index.sort_values()
new_row_parts = None
else:
columns = self.columns
new_row_parts = _map_partitions(
lambda df: _sort_helper(df, columns, axis, *args),
self._row_partitions)

new_columns = self.columns.sort_values()
new_index = self.index
new_column_parts = None

if not inplace:
return DataFrame(col_partitions=new_column_parts,
row_partitions=new_row_parts,
index=new_index,
columns=new_columns)
else:
self._update_inplace(row_partitions=new_row_parts,
col_partitions=new_column_parts,
columns=new_columns,
index=new_index)

def sort_values(self, by, axis=0, ascending=True, inplace=False,
kind='quicksort', na_position='last'):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Sorts by a column/row or list of columns/rows.

Args:
by: A list of labels for the axis to sort over.
axis: The axis to sort.
ascending: Sort in ascending or descending order.
inplace: If true, do the operation inplace.
kind: How to sort.
na_position: Where to put np.nan values.

Returns:
A sorted DataFrame.
"""

axis = pd.DataFrame()._get_axis_number(axis)

if not is_list_like(by):
by = [by]

if axis == 0:
broadcast_value_dict = {str(col): self[col] for col in by}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be done as a single getitem call? You can pass in a list of column names.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns a ray DataFrame, so we'd have to to_pandas it. It's slower overall to build that DataFrame than getitem multiple times.

broadcast_values = pd.DataFrame(broadcast_value_dict)
else:
broadcast_value_list = [to_pandas(self[row::len(self.index)])
for row in by]

index_builder = list(zip(broadcast_value_list, by))

for row, idx in index_builder:
row.index = [str(idx)]

broadcast_values = pd.concat([row for row, idx in index_builder])

# We are converting the by to string here so that we don't have a
# collision with the RangeIndex on the inner frame. It is cheap and
# gaurantees that we sort by the correct column.
by = [str(col) for col in by]

args = (by, axis, ascending, False, kind, na_position)

def _sort_helper(df, broadcast_values, axis, *args):
"""Sorts the data on a partition.

Args:
df: The DataFrame to sort.
broadcast_values: The by DataFrame to use for the sort.
axis: The axis to sort over.
args: The args for the sort.

Returns:
A new sorted DataFrame.
"""
if axis == 0:
broadcast_values.index = df.index
names = broadcast_values.columns
else:
broadcast_values.columns = df.columns
names = broadcast_values.index

return pd.concat([df, broadcast_values], axis=axis ^ 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the broadcast_values be sorted alone (which is done anyways below) and then the new index be used to reindex each of the partitions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately that is not always faster.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it is slower it can be up to 3x slower, so to avoid that worst case we will leave it like this for now.

copy=False).sort_values(*args)\
.drop(names, axis=axis ^ 1)

if axis == 0:
new_column_partitions = _map_partitions(
lambda df: _sort_helper(df, broadcast_values, axis, *args),
self._col_partitions)

new_row_partitions = None
new_columns = self.columns

# This is important because it allows us to get the axis that we
# aren't sorting over. We need the order of the columns/rows and
# this will provide that in the return value.
new_index = broadcast_values.sort_values(*args).index
else:
new_row_partitions = _map_partitions(
lambda df: _sort_helper(df, broadcast_values, axis, *args),
self._row_partitions)

new_column_partitions = None
new_columns = broadcast_values.sort_values(*args).columns
new_index = self.index

if inplace:
self._update_inplace(row_partitions=new_row_partitions,
col_partitions=new_column_partitions,
columns=new_columns,
index=new_index)
else:
return DataFrame(row_partitions=new_row_partitions,
col_partitions=new_column_partitions,
columns=new_columns,
index=new_index)

def sortlevel(self, level=0, axis=0, ascending=True, inplace=False,
sort_remaining=True):
Expand Down
40 changes: 32 additions & 8 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,8 +927,6 @@ def test_append():

pandas_df2 = pd.DataFrame({"col5": [0], "col6": [1]})

print(ray_df.append(ray_df2))

assert ray_df_equals_pandas(ray_df.append(ray_df2),
pandas_df.append(pandas_df2))

Expand Down Expand Up @@ -2619,17 +2617,43 @@ def test_slice_shift():


def test_sort_index():
ray_df = create_test_dataframe()
pandas_df = pd.DataFrame(np.random.randint(0, 100, size=(1000, 100)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Share data between the two tests.

ray_df = rdf.DataFrame(pandas_df)

with pytest.raises(NotImplementedError):
ray_df.sort_index()
pandas_result = pandas_df.sort_index()
ray_result = ray_df.sort_index()

ray_df_equals_pandas(ray_result, pandas_result)

pandas_result = pandas_df.sort_index(ascending=False)
ray_result = ray_df.sort_index(ascending=False)

ray_df_equals_pandas(ray_result, pandas_result)


def test_sort_values():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a pytest fixture here, other tests can benefit from the large random dataframe being constructed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point I think this should happen, but this probably isn't the PR to go through and make all these changes to the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, this can be done in the future - separately.

ray_df = create_test_dataframe()
pandas_df = pd.DataFrame(np.random.randint(0, 100, size=(1000, 100)))
ray_df = rdf.DataFrame(pandas_df)

with pytest.raises(NotImplementedError):
ray_df.sort_values(None)
pandas_result = pandas_df.sort_values(by=1)
ray_result = ray_df.sort_values(by=1)

ray_df_equals_pandas(ray_result, pandas_result)

pandas_result = pandas_df.sort_values(by=1, axis=1)
ray_result = ray_df.sort_values(by=1, axis=1)

ray_df_equals_pandas(ray_result, pandas_result)

pandas_result = pandas_df.sort_values(by=[1, 3])
ray_result = ray_df.sort_values(by=[1, 3])

ray_df_equals_pandas(ray_result, pandas_result)

pandas_result = pandas_df.sort_values(by=[1, 67], axis=1)
ray_result = ray_df.sort_values(by=[1, 67], axis=1)

ray_df_equals_pandas(ray_result, pandas_result)


def test_sortlevel():
Expand Down