Skip to content

Commit

Permalink
PERF-modin-project#6433: Implement '.dropna()' using map-reduce pattern
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Aug 8, 2023
1 parent f24ba6f commit 5a4b058
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 17 deletions.
79 changes: 63 additions & 16 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,22 @@ def _set_axis_lengths_cache(self, value, axis=0):
else:
self._column_widths_cache = value

def _get_axis_lengths_cache(self, axis=0):
"""
Get partition's shape caches along the specified axis if avaliable.
Parameters
----------
axis : int, default 0
0 - get row lengths cache, 1 - get column widths cache.
Returns
-------
list of ints or None
If the cache is computed return a list of ints, ``None`` otherwise.
"""
return self._row_lengths_cache if axis == 0 else self._column_widths_cache

@property
def has_dtypes_cache(self):
"""
Expand Down Expand Up @@ -2814,7 +2830,14 @@ def apply_select_indices(

@lazy_metadata_decorator(apply_axis="both")
def broadcast_apply(
self, axis, func, other, join_type="left", labels="keep", dtypes=None
self,
axis,
func,
other,
join_type="left",
copartition=True,
labels="keep",
dtypes=None,
):
"""
Broadcast axis partitions of `other` to partitions of `self` and apply a function.
Expand All @@ -2829,6 +2852,13 @@ def broadcast_apply(
Modin DataFrame to broadcast.
join_type : str, default: "left"
Type of join to apply.
copartition : bool, default: True
Whether to align indices/partitioning of the `self` and `other` frame.
Disabling this may save some time, however, you have to be 100% sure that
the indexing and partitioning are identical along the broadcasting axis,
this might be the case for example if `other` is a projection of the `self`
or vice-versa. If copartitioning is disabled and partitioning/indexing are
incompatible then you may end up with undefined behavior.
labels : {"keep", "replace", "drop"}, default: "keep"
Whether keep labels from `self` Modin DataFrame, replace them with labels
from joined DataFrame or drop altogether to make them be computed lazily later.
Expand All @@ -2840,17 +2870,28 @@ def broadcast_apply(
PandasDataframe
New Modin DataFrame.
"""
# Only sort the indices if they do not match
(
left_parts,
right_parts,
joined_index,
partition_sizes_along_axis,
) = self._copartition(
axis, other, join_type, sort=not self.axes[axis].equals(other.axes[axis])
)
# unwrap list returned by `copartition`.
right_parts = right_parts[0]
if copartition:
# Only sort the indices if they do not match
(
left_parts,
right_parts,
joined_index,
partition_sizes_along_axis,
) = self._copartition(
axis,
other,
join_type,
sort=not self.axes[axis].equals(other.axes[axis]),
)
# unwrap list returned by `copartition`.
right_parts = right_parts[0]
else:
left_parts = self._partitions
right_parts = other._partitions
partition_sizes_along_axis, joined_index = self._get_axis_lengths_cache(
axis
), self.copy_axis_cache(axis)

new_frame = self._partition_mgr_cls.broadcast_apply(
axis, func, left_parts, right_parts
)
Expand All @@ -2868,13 +2909,19 @@ def _pick_axis(get_axis, sizes_cache):
if axis == 0:
# Pass shape caches instead of values in order to not trigger shape computation.
new_index, new_row_lengths = _pick_axis(
self._get_index, self._row_lengths_cache
self.copy_index_cache, self._row_lengths_cache
)
new_columns, new_column_widths = (
self.copy_columns_cache(),
self._column_widths_cache,
)
new_columns, new_column_widths = self.columns, self._column_widths_cache
else:
new_index, new_row_lengths = self.index, self._row_lengths_cache
new_index, new_row_lengths = (
self.copy_index_cache(),
self._row_lengths_cache,
)
new_columns, new_column_widths = _pick_axis(
self._get_columns, self._column_widths_cache
self.copy_columns_cache, self._column_widths_cache
)

return self.__constructor__(
Expand Down
47 changes: 47 additions & 0 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2828,6 +2828,53 @@ def setitem_builder(df, internal_indices=[]): # pragma: no cover
# Drop/Dropna
# This will change the shape of the resulting data.
def dropna(self, **kwargs):
if kwargs.get("axis", 0) == 1 and kwargs.get("thresh", no_default) in (
no_default,
None,
):
how = kwargs.get("how", "any")
subset = kwargs.get("subset")
how = "any" if how in (no_default, None) else how
condition = lambda df: getattr(df, how)() # noqa: E731 (lambda assignment)

def mapper(df: pandas.DataFrame):
if subset is not None:
subset_mask = condition(
df.loc[df.index.intersection(subset)].isna()
)
mask = pandas.Series(
np.zeros(df.shape[1], dtype=bool), index=df.columns
)
mask.update(subset_mask)
else:
mask = condition(df.isna())
return mask.to_frame().T

masks = self._modin_frame.apply_full_axis(
func=mapper, axis=1, keep_partitioning=True
)

def reduce(df: pandas.DataFrame, mask: pandas.DataFrame):
to_take_mask = ~condition(mask)

to_take = []
for col, value in to_take_mask.items():
if value and col in df:
to_take.append(col)

return df[to_take]

result = self._modin_frame.broadcast_apply(
# 'masks' have identical partitioning as we specified 'keep_partitioning=True' before,
# this means that we can safely skip the 'co-partitioning' stage
axis=1,
func=reduce,
other=masks,
copartition=False,
labels="drop",
)
return self.__constructor__(result)

return self.__constructor__(
self._modin_frame.filter(
kwargs.get("axis", 0) ^ 1,
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ tag_prefix =
parentdir_prefix = modin-

[tool:pytest]
addopts = --disable-pytest-warnings --cov-config=setup.cfg --cov=modin --cov-append --cov-report=
addopts = --disable-pytest-warnings
xfail_strict=true
markers =
xfail_executions
Expand Down

0 comments on commit 5a4b058

Please sign in to comment.