-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DataFrame] Impement sort_values and sort_index #1977
Changes from all commits
e5289c5
bece269
02e6edf
7e8a9e3
a26b010
3bcad98
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3272,15 +3272,175 @@ def slice_shift(self, periods=1, axis=0): | |
def sort_index(self, axis=0, level=None, ascending=True, inplace=False, | ||
kind='quicksort', na_position='last', sort_remaining=True, | ||
by=None): | ||
raise NotImplementedError( | ||
"To contribute to Pandas on Ray, please visit " | ||
"github.com/ray-project/ray.") | ||
"""Sort a DataFrame by one of the indices (columns or index). | ||
|
||
Args: | ||
axis: The axis to sort over. | ||
level: The MultiIndex level to sort over. | ||
ascending: Ascending or descending | ||
inplace: Whether or not to update this DataFrame inplace. | ||
kind: How to perform the sort. | ||
na_position: Where to position NA on the sort. | ||
sort_remaining: On Multilevel Index sort based on all levels. | ||
by: (Deprecated) argument to pass to sort_values. | ||
|
||
Returns: | ||
A sorted DataFrame | ||
""" | ||
if level is not None: | ||
raise NotImplementedError("Multilevel index not yet implemented.") | ||
|
||
if by is not None: | ||
warnings.warn("by argument to sort_index is deprecated, " | ||
"please use .sort_values(by=...)", | ||
FutureWarning, stacklevel=2) | ||
if level is not None: | ||
raise ValueError("unable to simultaneously sort by and level") | ||
return self.sort_values(by, axis=axis, ascending=ascending, | ||
inplace=inplace) | ||
|
||
axis = pd.DataFrame()._get_axis_number(axis) | ||
|
||
args = (axis, level, ascending, False, kind, na_position, | ||
sort_remaining) | ||
|
||
def _sort_helper(df, index, axis, *args): | ||
if axis == 0: | ||
df.index = index | ||
else: | ||
df.columns = index | ||
|
||
result = df.sort_index(*args) | ||
df.reset_index(drop=True, inplace=True) | ||
df.columns = pd.RangeIndex(len(df.columns)) | ||
return result | ||
|
||
if axis == 0: | ||
index = self.index | ||
new_column_parts = _map_partitions( | ||
lambda df: _sort_helper(df, index, axis, *args), | ||
self._col_partitions) | ||
|
||
new_columns = self.columns | ||
new_index = self.index.sort_values() | ||
new_row_parts = None | ||
else: | ||
columns = self.columns | ||
new_row_parts = _map_partitions( | ||
lambda df: _sort_helper(df, columns, axis, *args), | ||
self._row_partitions) | ||
|
||
new_columns = self.columns.sort_values() | ||
new_index = self.index | ||
new_column_parts = None | ||
|
||
if not inplace: | ||
return DataFrame(col_partitions=new_column_parts, | ||
row_partitions=new_row_parts, | ||
index=new_index, | ||
columns=new_columns) | ||
else: | ||
self._update_inplace(row_partitions=new_row_parts, | ||
col_partitions=new_column_parts, | ||
columns=new_columns, | ||
index=new_index) | ||
|
||
def sort_values(self, by, axis=0, ascending=True, inplace=False, | ||
kind='quicksort', na_position='last'): | ||
raise NotImplementedError( | ||
"To contribute to Pandas on Ray, please visit " | ||
"github.com/ray-project/ray.") | ||
"""Sorts by a column/row or list of columns/rows. | ||
|
||
Args: | ||
by: A list of labels for the axis to sort over. | ||
axis: The axis to sort. | ||
ascending: Sort in ascending or descending order. | ||
inplace: If true, do the operation inplace. | ||
kind: How to sort. | ||
na_position: Where to put np.nan values. | ||
|
||
Returns: | ||
A sorted DataFrame. | ||
""" | ||
|
||
axis = pd.DataFrame()._get_axis_number(axis) | ||
|
||
if not is_list_like(by): | ||
by = [by] | ||
|
||
if axis == 0: | ||
broadcast_value_dict = {str(col): self[col] for col in by} | ||
broadcast_values = pd.DataFrame(broadcast_value_dict) | ||
else: | ||
broadcast_value_list = [to_pandas(self[row::len(self.index)]) | ||
for row in by] | ||
|
||
index_builder = list(zip(broadcast_value_list, by)) | ||
|
||
for row, idx in index_builder: | ||
row.index = [str(idx)] | ||
|
||
broadcast_values = pd.concat([row for row, idx in index_builder]) | ||
|
||
# We are converting the by to string here so that we don't have a | ||
# collision with the RangeIndex on the inner frame. It is cheap and | ||
# gaurantees that we sort by the correct column. | ||
by = [str(col) for col in by] | ||
|
||
args = (by, axis, ascending, False, kind, na_position) | ||
|
||
def _sort_helper(df, broadcast_values, axis, *args): | ||
"""Sorts the data on a partition. | ||
|
||
Args: | ||
df: The DataFrame to sort. | ||
broadcast_values: The by DataFrame to use for the sort. | ||
axis: The axis to sort over. | ||
args: The args for the sort. | ||
|
||
Returns: | ||
A new sorted DataFrame. | ||
""" | ||
if axis == 0: | ||
broadcast_values.index = df.index | ||
names = broadcast_values.columns | ||
else: | ||
broadcast_values.columns = df.columns | ||
names = broadcast_values.index | ||
|
||
return pd.concat([df, broadcast_values], axis=axis ^ 1, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately that is not always faster. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When it is slower it can be up to 3x slower, so to avoid that worst case we will leave it like this for now. |
||
copy=False).sort_values(*args)\ | ||
.drop(names, axis=axis ^ 1) | ||
|
||
if axis == 0: | ||
new_column_partitions = _map_partitions( | ||
lambda df: _sort_helper(df, broadcast_values, axis, *args), | ||
self._col_partitions) | ||
|
||
new_row_partitions = None | ||
new_columns = self.columns | ||
|
||
# This is important because it allows us to get the axis that we | ||
# aren't sorting over. We need the order of the columns/rows and | ||
# this will provide that in the return value. | ||
new_index = broadcast_values.sort_values(*args).index | ||
else: | ||
new_row_partitions = _map_partitions( | ||
lambda df: _sort_helper(df, broadcast_values, axis, *args), | ||
self._row_partitions) | ||
|
||
new_column_partitions = None | ||
new_columns = broadcast_values.sort_values(*args).columns | ||
new_index = self.index | ||
|
||
if inplace: | ||
self._update_inplace(row_partitions=new_row_partitions, | ||
col_partitions=new_column_partitions, | ||
columns=new_columns, | ||
index=new_index) | ||
else: | ||
return DataFrame(row_partitions=new_row_partitions, | ||
col_partitions=new_column_partitions, | ||
columns=new_columns, | ||
index=new_index) | ||
|
||
def sortlevel(self, level=0, axis=0, ascending=True, inplace=False, | ||
sort_remaining=True): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -927,8 +927,6 @@ def test_append(): | |
|
||
pandas_df2 = pd.DataFrame({"col5": [0], "col6": [1]}) | ||
|
||
print(ray_df.append(ray_df2)) | ||
|
||
assert ray_df_equals_pandas(ray_df.append(ray_df2), | ||
pandas_df.append(pandas_df2)) | ||
|
||
|
@@ -2619,17 +2617,43 @@ def test_slice_shift(): | |
|
||
|
||
def test_sort_index(): | ||
ray_df = create_test_dataframe() | ||
pandas_df = pd.DataFrame(np.random.randint(0, 100, size=(1000, 100))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: Share data between the two tests. |
||
ray_df = rdf.DataFrame(pandas_df) | ||
|
||
with pytest.raises(NotImplementedError): | ||
ray_df.sort_index() | ||
pandas_result = pandas_df.sort_index() | ||
ray_result = ray_df.sort_index() | ||
|
||
ray_df_equals_pandas(ray_result, pandas_result) | ||
|
||
pandas_result = pandas_df.sort_index(ascending=False) | ||
ray_result = ray_df.sort_index(ascending=False) | ||
|
||
ray_df_equals_pandas(ray_result, pandas_result) | ||
|
||
|
||
def test_sort_values(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a pytest fixture here, other tests can benefit from the large random dataframe being constructed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At some point I think this should happen, but this probably isn't the PR to go through and make all these changes to the tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, this can be done in the future - separately. |
||
ray_df = create_test_dataframe() | ||
pandas_df = pd.DataFrame(np.random.randint(0, 100, size=(1000, 100))) | ||
ray_df = rdf.DataFrame(pandas_df) | ||
|
||
with pytest.raises(NotImplementedError): | ||
ray_df.sort_values(None) | ||
pandas_result = pandas_df.sort_values(by=1) | ||
ray_result = ray_df.sort_values(by=1) | ||
|
||
ray_df_equals_pandas(ray_result, pandas_result) | ||
|
||
pandas_result = pandas_df.sort_values(by=1, axis=1) | ||
ray_result = ray_df.sort_values(by=1, axis=1) | ||
|
||
ray_df_equals_pandas(ray_result, pandas_result) | ||
|
||
pandas_result = pandas_df.sort_values(by=[1, 3]) | ||
ray_result = ray_df.sort_values(by=[1, 3]) | ||
|
||
ray_df_equals_pandas(ray_result, pandas_result) | ||
|
||
pandas_result = pandas_df.sort_values(by=[1, 67], axis=1) | ||
ray_result = ray_df.sort_values(by=[1, 67], axis=1) | ||
|
||
ray_df_equals_pandas(ray_result, pandas_result) | ||
|
||
|
||
def test_sortlevel(): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be done as a single getitem call? You can pass in a list of column names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It returns a ray DataFrame, so we'd have to
to_pandas
it. It's slower overall to build that DataFrame thangetitem
multiple times.