diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 904a2809b4f..fffd69f00f2 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -766,6 +766,53 @@ def to_numpy(cls, partitions, **kwargs): [[block.to_numpy(**kwargs) for block in row] for row in partitions] ) + @classmethod + def split_pandas_df_into_partitions( + cls, df, row_chunksize, col_chunksize, update_bar + ): + """ + Split given pandas DataFrame according to the row/column chunk sizes into distributed partitions. + + Parameters + ---------- + df : pandas.DataFrame + row_chunksize : int + col_chunksize : int + update_bar : callable(x) -> x + Function that updates a progress bar. + + Returns + ------- + 2D np.ndarray[PandasDataframePartition] + """ + put_func = cls._partition_class.put + # even a full-axis slice can cost something (https://github.com/pandas-dev/pandas/issues/55202) + # so we try not to do it if unnecessary. + # FIXME: it appears that this optimization doesn't work for Unidist correctly as it + # doesn't explicitly copy the data when putting it into storage (as the rest engines do) + # causing it to eventially share memory with a pandas object that was provided by user. + # Everything works fine if we do this column slicing as pandas then would set some flags + # to perform in COW mode apparently (and so it wouldn't crash our tests). + # @YarShev promised that this will be eventially fixed on Unidist's side, but for now there's + # this hacky condition + if col_chunksize >= len(df.columns) and Engine.get() != "Unidist": + col_parts = [df] + else: + col_parts = [ + df.iloc[:, i : i + col_chunksize] + for i in range(0, len(df.columns), col_chunksize) + ] + parts = [ + [ + update_bar( + put_func(col_part.iloc[i : i + row_chunksize]), + ) + for col_part in col_parts + ] + for i in range(0, len(df), row_chunksize) + ] + return np.array(parts) + @classmethod @wait_computations_if_benchmark_mode def from_pandas(cls, df, return_dims=False): @@ -786,13 +833,7 @@ def from_pandas(cls, df, return_dims=False): A NumPy array with partitions (with dimensions or not). """ - def update_bar(pbar, f): - if ProgressBar.get(): - pbar.update(1) - return f - num_splits = NPartitions.get() - put_func = cls._partition_class.put row_chunksize = compute_chunksize(df.shape[0], num_splits) col_chunksize = compute_chunksize(df.shape[1], num_splits) @@ -820,36 +861,18 @@ def update_bar(pbar, f): else: pbar = None - # even a full-axis slice can cost something (https://github.com/pandas-dev/pandas/issues/55202) - # so we try not to do it if unnecessary. - # FIXME: it appears that this optimization doesn't work for Unidist correctly as it - # doesn't explicitly copy the data when putting it into storage (as the rest engines do) - # causing it to eventially share memory with a pandas object that was provided by user. - # Everything works fine if we do this column slicing as pandas then would set some flags - # to perform in COW mode apparently (and so it wouldn't crash our tests). - # @YarShev promised that this will be eventially fixed on Unidist's side, but for now there's - # this hacky condition - if col_chunksize >= len(df.columns) and Engine.get() != "Unidist": - col_parts = [df] - else: - col_parts = [ - df.iloc[:, i : i + col_chunksize] - for i in range(0, len(df.columns), col_chunksize) - ] - parts = [ - [ - update_bar( - pbar, - put_func(col_part.iloc[i : i + row_chunksize]), - ) - for col_part in col_parts - ] - for i in range(0, len(df), row_chunksize) - ] + def update_bar(f): + if ProgressBar.get(): + pbar.update(1) + return f + + parts = cls.split_pandas_df_into_partitions( + df, row_chunksize, col_chunksize, update_bar + ) if ProgressBar.get(): pbar.close() if not return_dims: - return np.array(parts) + return parts else: row_lengths = [ row_chunksize @@ -863,7 +886,7 @@ def update_bar(pbar, f): else len(df.columns) % col_chunksize or col_chunksize for i in range(0, len(df.columns), col_chunksize) ] - return np.array(parts), row_lengths, col_widths + return parts, row_lengths, col_widths @classmethod def from_arrow(cls, at, return_dims=False): diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py index 26164984fa7..bc9dce0ebab 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py @@ -13,11 +13,15 @@ """Module houses class that implements ``GenericRayDataframePartitionManager`` using Ray.""" +import numpy as np +from pandas.core.dtypes.common import is_numeric_dtype + from modin.core.execution.modin_aqp import progress_bar_wrapper from modin.core.execution.ray.common import RayWrapper from modin.core.execution.ray.generic.partitioning import ( GenericRayDataframePartitionManager, ) +from modin.utils import _inherit_docstrings from .partition import PandasOnRayDataframePartition from .virtual_partition import ( @@ -51,6 +55,47 @@ def wait_partitions(cls, partitions): [block for partition in partitions for block in partition.list_of_blocks] ) + @classmethod + @_inherit_docstrings( + GenericRayDataframePartitionManager.split_pandas_df_into_partitions + ) + def split_pandas_df_into_partitions( + cls, df, row_chunksize, col_chunksize, update_bar + ): + # it was found out, that starting from about ~6mln elements it's more beneficial to do + # distributed dataframe splitting for Ray in case of numerical data + distributed_splitting = (len(df) * len(df.columns)) > 6_000_000 and all( + is_numeric_dtype(dtype) for dtype in df.dtypes + ) + + if not distributed_splitting: + return super().split_pandas_df_into_partitions( + df, row_chunksize, col_chunksize, update_bar + ) + + put_func = cls._partition_class.put + + def mask(part, row_loc, col_loc): + # 2D iloc works surprisingly slow, so doing this chained iloc calls: + # https://github.com/pandas-dev/pandas/issues/55202 + return part.apply(lambda df: df.iloc[row_loc, :].iloc[:, col_loc]) + + main_part = put_func(df) + parts = [ + [ + update_bar( + mask( + main_part, + slice(i, i + row_chunksize), + slice(j, j + col_chunksize), + ), + ) + for j in range(0, len(df.columns), col_chunksize) + ] + for i in range(0, len(df), row_chunksize) + ] + return np.array(parts) + def _make_wrapped_method(name: str): """