Skip to content

Commit

Permalink
Apply @vnlitvinov's suggestions
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Sep 6, 2023
1 parent ec2e14c commit b679b64
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 39 deletions.
25 changes: 14 additions & 11 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2393,8 +2393,8 @@ def _apply_func_to_range_partitioning(
Parameters
----------
key_columns : hashable or list of hashables
Column/columns to build the range partitioning for.
key_columns : list of hashables
Columns to build the range partitioning for.
func : callable(pandas.DataFrame) -> pandas.DataFrame
Function to apply against partitions.
ascending : bool, default: True
Expand All @@ -2411,8 +2411,6 @@ def _apply_func_to_range_partitioning(
if self._partitions.shape[0] == 1:
return self.apply_full_axis(axis=1, func=func)

key_columns = key_columns if isinstance(key_columns, list) else [key_columns]

# don't want to inherit over-partitioning so doing this 'min' check
ideal_num_new_partitions = min(len(self._partitions), NPartitions.get())
m = len(self.index) / ideal_num_new_partitions
Expand Down Expand Up @@ -2441,13 +2439,10 @@ def _apply_func_to_range_partitioning(
self._partitions, ideal_num_new_partitions
)
else:
step = round(len(self._partitions) / ideal_num_new_partitions)
joining_partitions = np.split(
self._partitions,
range(
0,
len(self._partitions),
round(len(self._partitions) / ideal_num_new_partitions),
)[1:],
range(step, len(self._partitions), step),
)

new_partitions = np.array(
Expand All @@ -2471,10 +2466,18 @@ def _apply_func_to_range_partitioning(
**kwargs,
)

# here we want to get indices of those partitions that hold the key columns;
# first we translate column labels into their numeric indices
key_indices = self.columns.get_indexer_for(key_columns)
partition_indices = set()
# 'indices' will show us partition boundaries, helping to understand which
# column belongs to which partition. For example if 'indices = [0, 5, 10, 15]'
# then we know that columns with indices (0-4) are located in part#0;
# columns with indices (5-9) are located in part#1 and so on...
indices = np.cumsum([0] + self.column_widths)
# 'partition_indices' will store partition ids that hold the key columns
partition_indices = set()
for i in range(len(indices) - 1):
# going through the key columns and check whether they belong to the part#i
for key_idx in key_indices:
if key_idx >= indices[i] and key_idx < indices[i + 1]:
partition_indices.add(i)
Expand Down Expand Up @@ -2543,7 +2546,7 @@ def sort_function(df): # pragma: no cover
)

result = self._apply_func_to_range_partitioning(
key_columns=columns[0], func=sort_function, ascending=ascending, **kwargs
key_columns=[columns[0]], func=sort_function, ascending=ascending, **kwargs
)

result.set_axis_cache(self.copy_axis_cache(axis.value ^ 1), axis=axis.value ^ 1)
Expand Down
13 changes: 4 additions & 9 deletions modin/core/dataframe/pandas/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ShuffleFunctions:
ascending : bool
Whether the ranges should be in ascending or descending order.
ideal_num_new_partitions : int
The ideal number of bins.
The ideal number of new partitions.
**kwargs : dict
Additional keyword arguments.
"""
Expand Down Expand Up @@ -118,7 +118,7 @@ class ShuffleSortFunctions(ShuffleFunctions):
ascending : bool
Whether the ranges should be in ascending or descending order.
ideal_num_new_partitions : int
The ideal number of bins.
The ideal number of new partitions.
**kwargs : dict
Additional keyword arguments.
"""
Expand Down Expand Up @@ -158,13 +158,8 @@ def pivot_fn(self, samples: "list[pandas.DataFrame]") -> int:
cols.append(col)
is_numeric = is_numeric_dtype(column_val.dtype)

if is_numeric:
method = "linear"
else:
# This means we are not sorting numbers, so we need our quantiles to not try
# arithmetic on the values.
method = "inverted_cdf"

# When we are not sorting numbers, we need our quantiles to not do arithmetic on the values
method = "linear" if is_numeric else "inverted_cdf"
pivots = self.pick_pivots_from_samples_for_sort(
column_val, num_pivots, method, key
)
Expand Down
23 changes: 4 additions & 19 deletions modin/test/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
from modin.distributed.dataframe.pandas import from_partitions
from modin.core.storage_formats.pandas.utils import split_result_of_axis_func_pandas
from modin.utils import try_cast_to_pandas
from modin.core.dataframe.pandas.dataframe.utils import (
ShuffleSortFunctions,
ColumnInfo,
)

import numpy as np
import pandas
Expand Down Expand Up @@ -841,11 +845,6 @@ def test_split_partitions_kernel(
Duplicate pivot values cause empty partitions to be produced. This parameter helps
to verify that the function still behaves correctly in such cases.
"""
from modin.core.dataframe.pandas.dataframe.utils import (
ShuffleSortFunctions,
ColumnInfo,
)

random_state = np.random.RandomState(42)

df = pandas.DataFrame(
Expand Down Expand Up @@ -909,11 +908,6 @@ def test_split_partitions_with_empty_pivots(col_name, ascending):
This test verifies that the splitting function performs correctly when an empty pivots list is passed.
The expected behavior is to return a single split consisting of the exact copy of the input dataframe.
"""
from modin.core.dataframe.pandas.dataframe.utils import (
ShuffleSortFunctions,
ColumnInfo,
)

df = pandas.DataFrame(
{
"numeric_col": range(9),
Expand Down Expand Up @@ -949,10 +943,6 @@ def test_shuffle_partitions_with_empty_pivots(ascending):

assert modin_frame._partitions.shape == (1, 1)

from modin.core.dataframe.pandas.dataframe.utils import (
ShuffleSortFunctions,
)

column_name = modin_frame.columns[1]

shuffle_functions = ShuffleSortFunctions(
Expand Down Expand Up @@ -982,11 +972,6 @@ def test_split_partition_preserve_names(ascending):
This test verifies that the dataframes being split by ``split_partitions_using_pivots_for_sort``
preserve their index/column names.
"""
from modin.core.dataframe.pandas.dataframe.utils import (
ShuffleSortFunctions,
ColumnInfo,
)

df = pandas.DataFrame(
{
"numeric_col": range(9),
Expand Down

0 comments on commit b679b64

Please sign in to comment.