From ad1afeb268fcfe5cf1e9ce1a030445f9b324412b Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Sun, 6 May 2018 09:53:29 -0700 Subject: [PATCH] [DataFrame] Impement sort_values and sort_index (#1977) --- python/ray/dataframe/dataframe.py | 172 +++++++++++++++++++- python/ray/dataframe/test/test_dataframe.py | 40 ++++- 2 files changed, 198 insertions(+), 14 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index bd6558fd3dcb..5abd98f361bf 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -3839,15 +3839,175 @@ def slice_shift(self, periods=1, axis=0): def sort_index(self, axis=0, level=None, ascending=True, inplace=False, kind='quicksort', na_position='last', sort_remaining=True, by=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Sort a DataFrame by one of the indices (columns or index). + + Args: + axis: The axis to sort over. + level: The MultiIndex level to sort over. + ascending: Ascending or descending + inplace: Whether or not to update this DataFrame inplace. + kind: How to perform the sort. + na_position: Where to position NA on the sort. + sort_remaining: On Multilevel Index sort based on all levels. + by: (Deprecated) argument to pass to sort_values. + + Returns: + A sorted DataFrame + """ + if level is not None: + raise NotImplementedError("Multilevel index not yet implemented.") + + if by is not None: + warnings.warn("by argument to sort_index is deprecated, " + "please use .sort_values(by=...)", + FutureWarning, stacklevel=2) + if level is not None: + raise ValueError("unable to simultaneously sort by and level") + return self.sort_values(by, axis=axis, ascending=ascending, + inplace=inplace) + + axis = pd.DataFrame()._get_axis_number(axis) + + args = (axis, level, ascending, False, kind, na_position, + sort_remaining) + + def _sort_helper(df, index, axis, *args): + if axis == 0: + df.index = index + else: + df.columns = index + + result = df.sort_index(*args) + df.reset_index(drop=True, inplace=True) + df.columns = pd.RangeIndex(len(df.columns)) + return result + + if axis == 0: + index = self.index + new_column_parts = _map_partitions( + lambda df: _sort_helper(df, index, axis, *args), + self._col_partitions) + + new_columns = self.columns + new_index = self.index.sort_values() + new_row_parts = None + else: + columns = self.columns + new_row_parts = _map_partitions( + lambda df: _sort_helper(df, columns, axis, *args), + self._row_partitions) + + new_columns = self.columns.sort_values() + new_index = self.index + new_column_parts = None + + if not inplace: + return DataFrame(col_partitions=new_column_parts, + row_partitions=new_row_parts, + index=new_index, + columns=new_columns) + else: + self._update_inplace(row_partitions=new_row_parts, + col_partitions=new_column_parts, + columns=new_columns, + index=new_index) def sort_values(self, by, axis=0, ascending=True, inplace=False, kind='quicksort', na_position='last'): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Sorts by a column/row or list of columns/rows. + + Args: + by: A list of labels for the axis to sort over. + axis: The axis to sort. + ascending: Sort in ascending or descending order. + inplace: If true, do the operation inplace. + kind: How to sort. + na_position: Where to put np.nan values. + + Returns: + A sorted DataFrame. + """ + + axis = pd.DataFrame()._get_axis_number(axis) + + if not is_list_like(by): + by = [by] + + if axis == 0: + broadcast_value_dict = {str(col): self[col] for col in by} + broadcast_values = pd.DataFrame(broadcast_value_dict) + else: + broadcast_value_list = [to_pandas(self[row::len(self.index)]) + for row in by] + + index_builder = list(zip(broadcast_value_list, by)) + + for row, idx in index_builder: + row.index = [str(idx)] + + broadcast_values = pd.concat([row for row, idx in index_builder]) + + # We are converting the by to string here so that we don't have a + # collision with the RangeIndex on the inner frame. It is cheap and + # gaurantees that we sort by the correct column. + by = [str(col) for col in by] + + args = (by, axis, ascending, False, kind, na_position) + + def _sort_helper(df, broadcast_values, axis, *args): + """Sorts the data on a partition. + + Args: + df: The DataFrame to sort. + broadcast_values: The by DataFrame to use for the sort. + axis: The axis to sort over. + args: The args for the sort. + + Returns: + A new sorted DataFrame. + """ + if axis == 0: + broadcast_values.index = df.index + names = broadcast_values.columns + else: + broadcast_values.columns = df.columns + names = broadcast_values.index + + return pd.concat([df, broadcast_values], axis=axis ^ 1, + copy=False).sort_values(*args)\ + .drop(names, axis=axis ^ 1) + + if axis == 0: + new_column_partitions = _map_partitions( + lambda df: _sort_helper(df, broadcast_values, axis, *args), + self._col_partitions) + + new_row_partitions = None + new_columns = self.columns + + # This is important because it allows us to get the axis that we + # aren't sorting over. We need the order of the columns/rows and + # this will provide that in the return value. + new_index = broadcast_values.sort_values(*args).index + else: + new_row_partitions = _map_partitions( + lambda df: _sort_helper(df, broadcast_values, axis, *args), + self._row_partitions) + + new_column_partitions = None + new_columns = broadcast_values.sort_values(*args).columns + new_index = self.index + + if inplace: + self._update_inplace(row_partitions=new_row_partitions, + col_partitions=new_column_partitions, + columns=new_columns, + index=new_index) + else: + return DataFrame(row_partitions=new_row_partitions, + col_partitions=new_column_partitions, + columns=new_columns, + index=new_index) def sortlevel(self, level=0, axis=0, ascending=True, inplace=False, sort_remaining=True): diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index fa3fb5667e56..7660b5366447 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -969,8 +969,6 @@ def test_append(): pandas_df2 = pd.DataFrame({"col5": [0], "col6": [1]}) - print(ray_df.append(ray_df2)) - assert ray_df_equals_pandas(ray_df.append(ray_df2), pandas_df.append(pandas_df2)) @@ -2829,17 +2827,43 @@ def test_slice_shift(): def test_sort_index(): - ray_df = create_test_dataframe() + pandas_df = pd.DataFrame(np.random.randint(0, 100, size=(1000, 100))) + ray_df = rdf.DataFrame(pandas_df) - with pytest.raises(NotImplementedError): - ray_df.sort_index() + pandas_result = pandas_df.sort_index() + ray_result = ray_df.sort_index() + + ray_df_equals_pandas(ray_result, pandas_result) + + pandas_result = pandas_df.sort_index(ascending=False) + ray_result = ray_df.sort_index(ascending=False) + + ray_df_equals_pandas(ray_result, pandas_result) def test_sort_values(): - ray_df = create_test_dataframe() + pandas_df = pd.DataFrame(np.random.randint(0, 100, size=(1000, 100))) + ray_df = rdf.DataFrame(pandas_df) - with pytest.raises(NotImplementedError): - ray_df.sort_values(None) + pandas_result = pandas_df.sort_values(by=1) + ray_result = ray_df.sort_values(by=1) + + ray_df_equals_pandas(ray_result, pandas_result) + + pandas_result = pandas_df.sort_values(by=1, axis=1) + ray_result = ray_df.sort_values(by=1, axis=1) + + ray_df_equals_pandas(ray_result, pandas_result) + + pandas_result = pandas_df.sort_values(by=[1, 3]) + ray_result = ray_df.sort_values(by=[1, 3]) + + ray_df_equals_pandas(ray_result, pandas_result) + + pandas_result = pandas_df.sort_values(by=[1, 67], axis=1) + ray_result = ray_df.sort_values(by=[1, 67], axis=1) + + ray_df_equals_pandas(ray_result, pandas_result) def test_sortlevel():