Skip to content

Commit

Permalink
FEAT-modin-project#7117: Support building range-partitioning from an …
Browse files Browse the repository at this point in the history
…index level (modin-project#7120)

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev authored Apr 2, 2024
1 parent 3bf5d8f commit 9afc049
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 56 deletions.
39 changes: 32 additions & 7 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2446,6 +2446,7 @@ def _apply_func_to_range_partitioning(
preserve_columns=False,
data=None,
data_key_columns=None,
level=None,
**kwargs,
):
"""
Expand All @@ -2454,7 +2455,7 @@ def _apply_func_to_range_partitioning(
Parameters
----------
key_columns : list of hashables
Columns to build the range partitioning for.
Columns to build the range partitioning for. Can't be specified along with `level`.
func : callable(pandas.DataFrame) -> pandas.DataFrame
Function to apply against partitions.
ascending : bool, default: True
Expand All @@ -2467,6 +2468,8 @@ def _apply_func_to_range_partitioning(
``df["grouper"] # self`` and ``df["data"] # data``.
data_key_columns : list of hashables, optional
Additional key columns from `data`. Will be combined with `key_columns`.
level : list of ints or labels, optional
Index level(s) to build the range partitioning for. Can't be specified along with `key_columns`.
**kwargs : dict
Additional arguments to forward to the range builder function.
Expand Down Expand Up @@ -2575,14 +2578,21 @@ def _apply_func_to_range_partitioning(
key_columns,
ascending[0] if is_list_like(ascending) else ascending,
ideal_num_new_partitions,
level=level,
**kwargs,
)

# here we want to get indices of those partitions that hold the key columns
key_indices = grouper.columns.get_indexer_for(key_columns)
partition_indices = np.unique(
np.digitize(key_indices, np.cumsum(grouper.column_widths))
)
if key_columns:
# here we want to get indices of those partitions that hold the key columns
key_indices = grouper.columns.get_indexer_for(key_columns)
partition_indices = np.unique(
np.digitize(key_indices, np.cumsum(grouper.column_widths))
)
elif level is not None:
# each partition contains an index, so taking the first one
partition_indices = [0]
else:
raise ValueError("Must specify either 'level' or 'key_columns'")

new_partitions = grouper._partition_mgr_cls.shuffle_partitions(
new_partitions,
Expand Down Expand Up @@ -4038,6 +4048,10 @@ def groupby(
duplicated_suffix = "__duplicated_suffix__"
duplicated_pattern = r"_[\d]*__duplicated_suffix__"
kwargs["observed"] = True
level = kwargs.get("level")

if level is not None and not isinstance(level, list):
level = [level]

def apply_func(df): # pragma: no cover
if has_external_grouper:
Expand Down Expand Up @@ -4081,6 +4095,12 @@ def apply_func(df): # pragma: no cover

if series_groupby:
df = df.squeeze(axis=1)

if kwargs.get("level") is not None:
assert len(by) == 0
# passing an empty list triggers an error
by = None

result = operator(df.groupby(by, **kwargs))

if align_result_columns and df.empty and result.empty:
Expand Down Expand Up @@ -4135,6 +4155,7 @@ def apply_func(df): # pragma: no cover
func=apply_func,
data=data,
data_key_columns=data_key_columns,
level=level,
)
# no need aligning columns if there's only one row partition
if add_missing_cats or align_result_columns and result._partitions.shape[0] > 1:
Expand Down Expand Up @@ -4293,7 +4314,11 @@ def join_cols(df, *cols):
row_lengths=result._row_lengths_cache,
)

if not result.has_materialized_index and not has_external_grouper:
if (
not result.has_materialized_index
and not has_external_grouper
and level is None
):
by_dtypes = ModinDtypes(self._dtypes).lazy_get(internal_by)
if by_dtypes.is_materialized:
new_index = ModinIndex(value=result, axis=0, dtypes=by_dtypes)
Expand Down
74 changes: 63 additions & 11 deletions modin/core/dataframe/pandas/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,34 +114,42 @@ class ShuffleSortFunctions(ShuffleFunctions):
----------
modin_frame : PandasDataframe
The frame to build the range-partitioning for.
columns : str or list of strings
The column/columns to use as a key.
columns : str, list of strings or None
The column/columns to use as a key. Can't be specified along with `level`.
ascending : bool
Whether the ranges should be in ascending or descending order.
ideal_num_new_partitions : int
The ideal number of new partitions.
level : list of strings or ints, or None
Index level(s) to use as a key. Can't be specified along with `columns`.
**kwargs : dict
Additional keyword arguments.
"""

def __init__(
self,
modin_frame: "PandasDataframe",
columns: Union[str, list],
columns: Optional[Union[str, list]],
ascending: Union[list, bool],
ideal_num_new_partitions: int,
level: Optional[list[Union[str, int]]] = None,
**kwargs: dict,
):
self.frame_len = len(modin_frame)
self.ideal_num_new_partitions = ideal_num_new_partitions
self.columns = columns if is_list_like(columns) else [columns]
self.ascending = ascending
self.kwargs = kwargs.copy()
self.level = level
self.columns_info = None

def sample_fn(self, partition: pandas.DataFrame) -> pandas.DataFrame:
if self.level is not None:
partition = self._index_to_df_zero_copy(partition, self.level)
else:
partition = partition[self.columns]
return self.pick_samples_for_quantiles(
partition[self.columns], self.ideal_num_new_partitions, self.frame_len
partition, self.ideal_num_new_partitions, self.frame_len
)

def pivot_fn(self, samples: "list[pandas.DataFrame]") -> int:
Expand Down Expand Up @@ -181,6 +189,7 @@ def split_fn(
partition,
self.columns_info,
self.ascending,
keys_are_index_levels=self.level is not None,
**self.kwargs,
)

Expand Down Expand Up @@ -312,6 +321,7 @@ def split_partitions_using_pivots_for_sort(
df: pandas.DataFrame,
columns_info: "list[ColumnInfo]",
ascending: bool,
keys_are_index_levels: bool = False,
**kwargs: dict,
) -> "tuple[pandas.DataFrame, ...]":
"""
Expand All @@ -330,6 +340,8 @@ def split_partitions_using_pivots_for_sort(
Information regarding keys and pivots for range partitioning.
ascending : bool
The ascending flag.
keys_are_index_levels : bool, default: False
Whether `columns_info` describes index levels or actual columns from `df`.
**kwargs : dict
Additional keyword arguments.
Expand All @@ -342,9 +354,14 @@ def split_partitions_using_pivots_for_sort(
# We can return the dataframe with zero changes if there were no pivots passed
return (df,)

na_index = (
df[[col_info.name for col_info in columns_info]].isna().squeeze(axis=1)
key_data = (
ShuffleSortFunctions._index_to_df_zero_copy(
df, [col_info.name for col_info in columns_info]
)
if keys_are_index_levels
else df[[col_info.name for col_info in columns_info]]
)
na_index = key_data.isna().squeeze(axis=1)
if na_index.ndim == 2:
na_index = na_index.any(axis=1)
na_rows = df[na_index]
Expand Down Expand Up @@ -373,22 +390,27 @@ def get_group(grp, key, df):
pivots = pivots[::-1]
group_keys.append(range(len(pivots) + 1))
key = kwargs.pop("key", None)
cols_to_digitize = non_na_rows[col_info.name]
cols_to_digitize = (
non_na_rows.index.get_level_values(col_info.name)
if keys_are_index_levels
else non_na_rows[col_info.name]
)
if key is not None:
cols_to_digitize = key(cols_to_digitize)

if cols_to_digitize.ndim == 2:
cols_to_digitize = cols_to_digitize.squeeze()

if col_info.is_numeric:
groupby_col = np.digitize(cols_to_digitize.squeeze(), pivots)
groupby_col = np.digitize(cols_to_digitize, pivots)
# `np.digitize` returns results based off of the sort order of the pivots it is passed.
# When we only have one unique value in our pivots, `np.digitize` assumes that the pivots
# are sorted in ascending order, and gives us results based off of that assumption - so if
# we actually want to sort in descending order, we need to swap the new indices.
if not ascending and len(np.unique(pivots)) == 1:
groupby_col = len(pivots) - groupby_col
else:
groupby_col = np.searchsorted(
pivots, cols_to_digitize.squeeze(), side="right"
)
groupby_col = np.searchsorted(pivots, cols_to_digitize, side="right")
# Since np.searchsorted requires the pivots to be in ascending order, if we want to sort
# in descending order, we need to swap the new indices.
if not ascending:
Expand Down Expand Up @@ -426,6 +448,36 @@ def get_group(grp, key, df):
).astype(df.dtypes)
return tuple(groups)

@staticmethod
def _index_to_df_zero_copy(
df: pandas.DataFrame, levels: list[Union[str, int]]
) -> pandas.DataFrame:
"""
Convert index `level` of `df` to a ``pandas.DataFrame``.
Parameters
----------
df : pandas.DataFrame
levels : list of labels or ints
Index level to convert to a dataframe.
Returns
-------
pandas.DataFrame
The columns in the resulting dataframe use the same data arrays as the index levels
in the original `df`, so no copies.
"""
# calling 'df.index.to_frame()' creates a copy of the index, so doing the conversion manually
# to avoid the copy
data = {
(
df.index.names[lvl] if isinstance(lvl, int) else lvl
): df.index.get_level_values(lvl)
for lvl in levels
}
index_data = pandas.DataFrame(data, index=df.index, copy=False)
return index_data


def lazy_metadata_decorator(apply_axis=None, axis_arg=-1, transpose=False):
"""
Expand Down
12 changes: 5 additions & 7 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3701,12 +3701,7 @@ def _groupby_shuffle(
by, agg_func, axis, groupby_kwargs, agg_args, agg_kwargs, how, drop
)

if groupby_kwargs.get("level") is not None:
raise NotImplementedError(
"Grouping on an index level is not yet supported by range-partitioning groupby implementation: "
+ "https://github.com/modin-project/modin/issues/5926"
)

grouping_on_level = groupby_kwargs.get("level") is not None
if any(
isinstance(obj, pandas.Grouper)
for obj in (by if isinstance(by, list) else [by])
Expand All @@ -3716,7 +3711,10 @@ def _groupby_shuffle(
+ "https://github.com/modin-project/modin/issues/5926"
)

external_by, internal_by, by_positions = self._groupby_separate_by(by, drop)
if grouping_on_level:
external_by, internal_by, by_positions = [], [], []
else:
external_by, internal_by, by_positions = self._groupby_separate_by(by, drop)

all_external_are_qcs = all(isinstance(obj, type(self)) for obj in external_by)
if not all_external_are_qcs:
Expand Down
Loading

0 comments on commit 9afc049

Please sign in to comment.