Skip to content

Commit

Permalink
PERF-modin-project#2813: Distributed 'from_pandas()' for numerical da…
Browse files Browse the repository at this point in the history
…ta in Ray

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Oct 10, 2023
1 parent a20efa6 commit d081328
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 34 deletions.
91 changes: 57 additions & 34 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,53 @@ def to_numpy(cls, partitions, **kwargs):
[[block.to_numpy(**kwargs) for block in row] for row in partitions]
)

@classmethod
def split_pandas_df_into_partitions(
cls, df, row_chunksize, col_chunksize, update_bar
):
"""
Split given pandas DataFrame according to the row/column chunk sizes into distributed partitions.
Parameters
----------
df : pandas.DataFrame
row_chunksize : int
col_chunksize : int
update_bar : callable(x) -> x
Function that updates a progress bar.
Returns
-------
2D np.ndarray[PandasDataframePartition]
"""
put_func = cls._partition_class.put
# even a full-axis slice can cost something (https://github.com/pandas-dev/pandas/issues/55202)
# so we try not to do it if unnecessary.
# FIXME: it appears that this optimization doesn't work for Unidist correctly as it
# doesn't explicitly copy the data when putting it into storage (as the rest engines do)
# causing it to eventially share memory with a pandas object that was provided by user.
# Everything works fine if we do this column slicing as pandas then would set some flags
# to perform in COW mode apparently (and so it wouldn't crash our tests).
# @YarShev promised that this will be eventially fixed on Unidist's side, but for now there's
# this hacky condition
if col_chunksize >= len(df.columns) and Engine.get() != "Unidist":
col_parts = [df]
else:
col_parts = [
df.iloc[:, i : i + col_chunksize]
for i in range(0, len(df.columns), col_chunksize)
]
parts = [
[
update_bar(
put_func(col_part.iloc[i : i + row_chunksize]),
)
for col_part in col_parts
]
for i in range(0, len(df), row_chunksize)
]
return np.array(parts)

@classmethod
@wait_computations_if_benchmark_mode
def from_pandas(cls, df, return_dims=False):
Expand All @@ -786,13 +833,7 @@ def from_pandas(cls, df, return_dims=False):
A NumPy array with partitions (with dimensions or not).
"""

def update_bar(pbar, f):
if ProgressBar.get():
pbar.update(1)
return f

num_splits = NPartitions.get()
put_func = cls._partition_class.put
row_chunksize = compute_chunksize(df.shape[0], num_splits)
col_chunksize = compute_chunksize(df.shape[1], num_splits)

Expand Down Expand Up @@ -820,36 +861,18 @@ def update_bar(pbar, f):
else:
pbar = None

# even a full-axis slice can cost something (https://github.com/pandas-dev/pandas/issues/55202)
# so we try not to do it if unnecessary.
# FIXME: it appears that this optimization doesn't work for Unidist correctly as it
# doesn't explicitly copy the data when putting it into storage (as the rest engines do)
# causing it to eventially share memory with a pandas object that was provided by user.
# Everything works fine if we do this column slicing as pandas then would set some flags
# to perform in COW mode apparently (and so it wouldn't crash our tests).
# @YarShev promised that this will be eventially fixed on Unidist's side, but for now there's
# this hacky condition
if col_chunksize >= len(df.columns) and Engine.get() != "Unidist":
col_parts = [df]
else:
col_parts = [
df.iloc[:, i : i + col_chunksize]
for i in range(0, len(df.columns), col_chunksize)
]
parts = [
[
update_bar(
pbar,
put_func(col_part.iloc[i : i + row_chunksize]),
)
for col_part in col_parts
]
for i in range(0, len(df), row_chunksize)
]
def update_bar(f):
if ProgressBar.get():
pbar.update(1)
return f

parts = cls.split_pandas_df_into_partitions(
df, row_chunksize, col_chunksize, update_bar
)
if ProgressBar.get():
pbar.close()
if not return_dims:
return np.array(parts)
return parts
else:
row_lengths = [
row_chunksize
Expand All @@ -863,7 +886,7 @@ def update_bar(pbar, f):
else len(df.columns) % col_chunksize or col_chunksize
for i in range(0, len(df.columns), col_chunksize)
]
return np.array(parts), row_lengths, col_widths
return parts, row_lengths, col_widths

@classmethod
def from_arrow(cls, at, return_dims=False):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@

"""Module houses class that implements ``GenericRayDataframePartitionManager`` using Ray."""

import numpy as np
from pandas.core.dtypes.common import is_numeric_dtype

from modin.core.execution.modin_aqp import progress_bar_wrapper
from modin.core.execution.ray.common import RayWrapper
from modin.core.execution.ray.generic.partitioning import (
GenericRayDataframePartitionManager,
)
from modin.utils import _inherit_docstrings

from .partition import PandasOnRayDataframePartition
from .virtual_partition import (
Expand Down Expand Up @@ -51,6 +55,47 @@ def wait_partitions(cls, partitions):
[block for partition in partitions for block in partition.list_of_blocks]
)

@classmethod
@_inherit_docstrings(
GenericRayDataframePartitionManager.split_pandas_df_into_partitions
)
def split_pandas_df_into_partitions(
cls, df, row_chunksize, col_chunksize, update_bar
):
# it was found out, that starting from about ~6mln elements it's more beneficial to do
# distributed dataframe splitting for Ray in case of numerical data
distributed_splitting = (len(df) * len(df.columns)) > 6_000_000 and all(
is_numeric_dtype(dtype) for dtype in df.dtypes
)

if not distributed_splitting:
return super().split_pandas_df_into_partitions(
df, row_chunksize, col_chunksize, update_bar
)

put_func = cls._partition_class.put

def mask(part, row_loc, col_loc):
# 2D iloc works surprisingly slow, so doing this chained iloc calls:
# https://github.com/pandas-dev/pandas/issues/55202
return part.apply(lambda df: df.iloc[row_loc, :].iloc[:, col_loc])

main_part = put_func(df)
parts = [
[
update_bar(
mask(
main_part,
slice(i, i + row_chunksize),
slice(j, j + col_chunksize),
),
)
for j in range(0, len(df.columns), col_chunksize)
]
for i in range(0, len(df), row_chunksize)
]
return np.array(parts)


def _make_wrapped_method(name: str):
"""
Expand Down

0 comments on commit d081328

Please sign in to comment.