Skip to content

Commit

Permalink
fix docstrings and some tests
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Sep 5, 2023
1 parent d8abece commit a4b5a18
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 129 deletions.
13 changes: 8 additions & 5 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from pandas._libs.lib import no_default
from typing import List, Hashable, Optional, Callable, Union, Dict, TYPE_CHECKING

from modin.config import Engine, IsRayCluster
from modin.config import Engine, IsRayCluster, NPartitions
from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler
from modin.core.storage_formats.pandas.utils import get_length_list
from modin.error_message import ErrorMessage
Expand All @@ -44,7 +44,7 @@
JoinType,
)
from modin.core.dataframe.pandas.dataframe.utils import (
build_sort_functions,
ShuffleSortFunctions,
lazy_metadata_decorator,
)
from modin.core.dataframe.pandas.metadata import (
Expand Down Expand Up @@ -2411,7 +2411,10 @@ def _apply_func_to_range_partitioning(
if self._partitions.shape[0] == 1:
return self.apply_full_axis(axis=1, func=func)

ideal_num_new_partitions = len(self._partitions)
key_columns = key_columns if isinstance(key_columns, list) else [key_columns]

# don't want to inherit over-partitioning so doing this 'min' check
ideal_num_new_partitions = min(len(self._partitions), NPartitions.get())
m = len(self.index) / ideal_num_new_partitions
sampling_probability = (1 / m) * np.log(
ideal_num_new_partitions * len(self.index)
Expand All @@ -2432,7 +2435,7 @@ def _apply_func_to_range_partitioning(
# simply combine all partitions and apply the sorting to the whole dataframe
return self.combine_and_apply(func=func)

shuffling_functions = build_sort_functions(
shuffling_functions = ShuffleSortFunctions(
self,
key_columns,
ascending[0] if is_list_like(ascending) else ascending,
Expand Down Expand Up @@ -2535,7 +2538,7 @@ def sort_function(df): # pragma: no cover
)

result = self._apply_func_to_range_partitioning(
key_columns=columns, func=sort_function, ascending=ascending, **kwargs
key_columns=columns[0], func=sort_function, ascending=ascending, **kwargs
)

result.set_axis_cache(self.copy_axis_cache(axis.value ^ 1), axis=axis.value ^ 1)
Expand Down
165 changes: 65 additions & 100 deletions modin/core/dataframe/pandas/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,93 +21,90 @@
import abc

from modin.error_message import ErrorMessage
import modin.config as cfg
from modin.utils import _inherit_docstrings

if TYPE_CHECKING:
from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe

ColumnsInfo = namedtuple("ColumnsInfo", ["name", "pivots", "is_numeric"])
ColumnInfo = namedtuple("ColumnInfo", ["name", "pivots", "is_numeric"])


class ShuffleFunctions:
"""
Defines an interface to perform the sampling, quantiles picking, and the splitting stages for the range-partitioning building.
Parameters
----------
modin_frame : PandasDataframe
The frame to build the range-partitioning for.
columns : str or list of strings
The column/columns to use as a key.
ascending : bool
Whether the ranges should be in ascending or descending order.
ideal_num_new_partitions : int
The ideal number of bins.
**kwargs : dict
Additional keyword arguments.
"""

def __init__(
self, modin_frame, columns, ascending, ideal_num_new_partitions, **kwargs
):
pass

@abc.abstractmethod
def sample_fn(self, partition: pandas.DataFrame) -> pandas.DataFrame:
"""
Pick samples over the given partition.
Parameters
----------
partition : pandas.DataFrame
Returns
-------
pandas.DataFrame:
The samples for the partition.
"""
pass

@abc.abstractmethod
def pivot_fn(self, samples: "list[pandas.DataFrame]") -> int:
"""
Determine quantiles from the given samples.
Determine quantiles from the given samples and save it for the future ``.split_fn()`` calls.
Parameters
----------
samples : list of pandas.DataFrames
Returns
-------
np.ndarray
A list of overall quantiles.
int
The number of bins the ``.split_fn()`` will return.
"""
pass

@abc.abstractmethod
def split_fn(self, partition: pandas.DataFrame) -> "tuple[pandas.DataFrame, ...]":
"""
Split the given dataframe into the partitions specified by `pivots`.
Split the given dataframe into the range-partitions defined by the preceding call of the ``.pivot_fn()``.
Parameters
----------
partition : pandas.DataFrame
pivots : np.ndarray
Returns
-------
tuple of pandas.DataFrames
Notes
-----
In order to call this method you must call the ``.pivot_fn()`` first.
"""
pass



def build_sort_functions(
modin_frame: "PandasDataframe",
columns: Union[str, list],
ascending: Union[list, bool],
ideal_num_new_partitions: int,
**kwargs: dict,
) -> ShuffleFunctions:
"""
Return a named tuple containing the functions necessary to perform a sort.
Parameters
----------
modin_frame : PandasDataframe
The frame calling these sort functions.
columns : str or list of strings
The column/columns to sort by.
ascending : bool
The ascending flag.
ideal_num_new_partitions : int
The ideal number of new partitions.
**kwargs : dict
Additional keyword arguments.
Returns
-------
ShuffleFunctions :
A named tuple containing the functions to pick quantiles, choose pivot points, and split
partitions for sorting.
"""
sort_fns = ShuffleSortFunctions(
modin_frame, columns, ascending, ideal_num_new_partitions, **kwargs
)

return sort_fns


@_inherit_docstrings(ShuffleFunctions)
class ShuffleSortFunctions(ShuffleFunctions):
"""
Perform the sampling, quantiles picking, and the splitting stages for the range-partitioning building.
Expand Down Expand Up @@ -135,47 +132,22 @@ def __init__(
**kwargs: dict,
):
self.frame_len = len(modin_frame.index)
self.ideal_num_new_partitions = min(
ideal_num_new_partitions, cfg.NPartitions.get()
)
self.ideal_num_new_partitions = ideal_num_new_partitions
self.columns = columns if is_list_like(columns) else [columns]
self.ascending = ascending
self.kwargs = kwargs.copy()
self.columns_info = None

def sample_fn(self, partition: pandas.DataFrame) -> pandas.DataFrame:
"""
Pick samples over the given partition.
Parameters
----------
partition : pandas.DataFrame
Returns
-------
pandas.DataFrame:
The samples for the partition.
"""
return self.pick_samples_for_quantiles(
partition[self.columns], self.ideal_num_new_partitions, self.frame_len
)

def pivot_fn(self, samples: "list[pandas.DataFrame]") -> int:
"""
Determine quantiles from the given samples.
Parameters
----------
samples : list of pandas.DataFrames
Returns
-------
int
"""
key = self.kwargs.get("key", None)
samples = pandas.concat(samples, axis=0, copy=False)

columns_info : "list[ColumnsInfo]" = []
columns_info: "list[ColumnInfo]" = []
number_of_groups = 1
cols = []
for col in samples.columns:
Expand All @@ -193,27 +165,18 @@ def pivot_fn(self, samples: "list[pandas.DataFrame]") -> int:
# arithmetic on the values.
method = "inverted_cdf"

pivots = self.pick_pivots_from_samples_for_sort(column_val, num_pivots, method, key)
columns_info.append(ColumnsInfo(col, pivots, is_numeric))
pivots = self.pick_pivots_from_samples_for_sort(
column_val, num_pivots, method, key
)
columns_info.append(ColumnInfo(col, pivots, is_numeric))
number_of_groups *= len(pivots) + 1
self.columns_info = columns_info
return number_of_groups

def split_fn(
self, partition: pandas.DataFrame,
self,
partition: pandas.DataFrame,
) -> "tuple[pandas.DataFrame, ...]":
"""
Split the given dataframe into the partitions specified by `pivots`.
Parameters
----------
partition : pandas.DataFrame
pivots : np.ndarray
Returns
-------
tuple of pandas.DataFrames
"""
ErrorMessage.catch_bugs_and_request_email(
failure_condition=self.columns_info is None,
extra_log="The 'split_fn' doesn't have proper metadata, the probable reason is that it was called before 'pivot_fn'",
Expand Down Expand Up @@ -351,12 +314,12 @@ def pick_pivots_from_samples_for_sort(
@staticmethod
def split_partitions_using_pivots_for_sort(
df: pandas.DataFrame,
columns_info: "list[ColumnsInfo]",
columns_info: "list[ColumnInfo]",
ascending: bool,
**kwargs: dict,
) -> "tuple[pandas.DataFrame, ...]":
"""
Split the given dataframe into the partitions specified by `pivots`.
Split the given dataframe into the partitions specified by `pivots` in `columns_info`.
This function takes as input a row-axis partition, as well as the quantiles determined
by the `pivot_func` defined above. It then splits the input dataframe into NPartitions.get()
Expand All @@ -367,12 +330,8 @@ def split_partitions_using_pivots_for_sort(
----------
df : pandas.Dataframe
The partition to split.
column : str
The major column to sort by.
is_numeric_column : bool
Whether the passed `column` has numeric type (int, float).
pivots : np.ndarray
The quantiles to use to split the data.
columns_info : list of ColumnInfo
Information regarding keys and pivots for range partitioning.
ascending : bool
The ascending flag.
**kwargs : dict
Expand All @@ -386,9 +345,6 @@ def split_partitions_using_pivots_for_sort(
if len(columns_info) == 0:
# We can return the dataframe with zero changes if there were no pivots passed
return (df,)
# If `ascending=False` and we are dealing with a numeric dtype, we can pass in a reversed list
# of pivots, and `np.digitize` will work correctly. For object dtypes, we use `np.searchsorted`
# which breaks when we reverse the pivots.

na_index = (
df[[col_info.name for col_info in columns_info]].isna().squeeze(axis=1)
Expand All @@ -399,6 +355,7 @@ def split_partitions_using_pivots_for_sort(
non_na_rows = df[~na_index]

def get_group(grp, key, df):
"""Get a group with the `key` from the `grp`, if it doesn't exist return an empty slice of `df`."""
try:
return grp.get_group(key)
except KeyError:
Expand All @@ -410,6 +367,11 @@ def get_group(grp, key, df):
group_keys = []
for col_info in columns_info:
pivots = col_info.pivots
if len(pivots) == 0:
continue
# If `ascending=False` and we are dealing with a numeric dtype, we can pass in a reversed list
# of pivots, and `np.digitize` will work correctly. For object dtypes, we use `np.searchsorted`
# which breaks when we reverse the pivots.
if not ascending and col_info.is_numeric:
# `key` is already applied to `pivots` in the `pick_pivots_from_samples_for_sort` function.
pivots = pivots[::-1]
Expand Down Expand Up @@ -437,10 +399,13 @@ def get_group(grp, key, df):
groupby_col = len(pivots) - groupby_col
groupby_codes.append(groupby_col)

if len(group_keys) > 1:
group_keys = pandas.MultiIndex.from_product(group_keys)
else:
if len(group_keys) == 0:
# We can return the dataframe with zero changes if there were no pivots passed
return (df,)
elif len(group_keys) == 1:
group_keys = group_keys[0]
else:
group_keys = pandas.MultiIndex.from_product(group_keys)

if len(non_na_rows) == 1:
groups = [
Expand Down
Loading

0 comments on commit a4b5a18

Please sign in to comment.