diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 2f49c4b2404..d889caf2853 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -231,6 +231,22 @@ def _set_axis_lengths_cache(self, value, axis=0): else: self._column_widths_cache = value + def _get_axis_lengths_cache(self, axis=0): + """ + Get partition's shape caches along the specified axis if avaliable. + + Parameters + ---------- + axis : int, default 0 + 0 - get row lengths cache, 1 - get column widths cache. + + Returns + ------- + list of ints or None + If the cache is computed return a list of ints, ``None`` otherwise. + """ + return self._row_lengths_cache if axis == 0 else self._column_widths_cache + @property def has_dtypes_cache(self): """ @@ -2814,7 +2830,14 @@ def apply_select_indices( @lazy_metadata_decorator(apply_axis="both") def broadcast_apply( - self, axis, func, other, join_type="left", labels="keep", dtypes=None + self, + axis, + func, + other, + join_type="left", + copartition=True, + labels="keep", + dtypes=None, ): """ Broadcast axis partitions of `other` to partitions of `self` and apply a function. @@ -2829,6 +2852,13 @@ def broadcast_apply( Modin DataFrame to broadcast. join_type : str, default: "left" Type of join to apply. + copartition : bool, default: True + Whether to align indices/partitioning of the `self` and `other` frame. + Disabling this may save some time, however, you have to be 100% sure that + the indexing and partitioning are identical along the broadcasting axis, + this might be the case for example if `other` is a projection of the `self` + or vice-versa. If copartitioning is disabled and partitioning/indexing are + incompatible then you may end up with undefined behavior. labels : {"keep", "replace", "drop"}, default: "keep" Whether keep labels from `self` Modin DataFrame, replace them with labels from joined DataFrame or drop altogether to make them be computed lazily later. @@ -2840,17 +2870,28 @@ def broadcast_apply( PandasDataframe New Modin DataFrame. """ - # Only sort the indices if they do not match - ( - left_parts, - right_parts, - joined_index, - partition_sizes_along_axis, - ) = self._copartition( - axis, other, join_type, sort=not self.axes[axis].equals(other.axes[axis]) - ) - # unwrap list returned by `copartition`. - right_parts = right_parts[0] + if copartition: + # Only sort the indices if they do not match + ( + left_parts, + right_parts, + joined_index, + partition_sizes_along_axis, + ) = self._copartition( + axis, + other, + join_type, + sort=not self.axes[axis].equals(other.axes[axis]), + ) + # unwrap list returned by `copartition`. + right_parts = right_parts[0] + else: + left_parts = self._partitions + right_parts = other._partitions + partition_sizes_along_axis, joined_index = self._get_axis_lengths_cache( + axis + ), self.copy_axis_cache(axis) + new_frame = self._partition_mgr_cls.broadcast_apply( axis, func, left_parts, right_parts ) @@ -2868,13 +2909,19 @@ def _pick_axis(get_axis, sizes_cache): if axis == 0: # Pass shape caches instead of values in order to not trigger shape computation. new_index, new_row_lengths = _pick_axis( - self._get_index, self._row_lengths_cache + self.copy_index_cache, self._row_lengths_cache + ) + new_columns, new_column_widths = ( + self.copy_columns_cache(), + self._column_widths_cache, ) - new_columns, new_column_widths = self.columns, self._column_widths_cache else: - new_index, new_row_lengths = self.index, self._row_lengths_cache + new_index, new_row_lengths = ( + self.copy_index_cache(), + self._row_lengths_cache, + ) new_columns, new_column_widths = _pick_axis( - self._get_columns, self._column_widths_cache + self.copy_columns_cache, self._column_widths_cache ) return self.__constructor__( diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index b4df19a88d2..0a601a7963a 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -2828,6 +2828,53 @@ def setitem_builder(df, internal_indices=[]): # pragma: no cover # Drop/Dropna # This will change the shape of the resulting data. def dropna(self, **kwargs): + if kwargs.get("axis", 0) == 1 and kwargs.get("thresh", no_default) in ( + no_default, + None, + ): + how = kwargs.get("how", "any") + subset = kwargs.get("subset") + how = "any" if how in (no_default, None) else how + condition = lambda df: getattr(df, how)() # noqa: E731 (lambda assignment) + + def mapper(df: pandas.DataFrame): + if subset is not None: + subset_mask = condition( + df.loc[df.index.intersection(subset)].isna() + ) + mask = pandas.Series( + np.zeros(df.shape[1], dtype=bool), index=df.columns + ) + mask.update(subset_mask) + else: + mask = condition(df.isna()) + return mask.to_frame().T + + masks = self._modin_frame.apply_full_axis( + func=mapper, axis=1, keep_partitioning=True + ) + + def reduce(df: pandas.DataFrame, mask: pandas.DataFrame): + to_take_mask = ~condition(mask) + + to_take = [] + for col, value in to_take_mask.items(): + if value and col in df: + to_take.append(col) + + return df[to_take] + + result = self._modin_frame.broadcast_apply( + # 'masks' have identical partitioning as we specified 'keep_partitioning=True' before, + # this means that we can safely skip the 'co-partitioning' stage + axis=1, + func=reduce, + other=masks, + copartition=False, + labels="drop", + ) + return self.__constructor__(result) + return self.__constructor__( self._modin_frame.filter( kwargs.get("axis", 0) ^ 1, diff --git a/setup.cfg b/setup.cfg index 2a9bbd5f8eb..82f10d0aa38 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,7 +12,7 @@ tag_prefix = parentdir_prefix = modin- [tool:pytest] -addopts = --disable-pytest-warnings --cov-config=setup.cfg --cov=modin --cov-append --cov-report= +addopts = --disable-pytest-warnings xfail_strict=true markers = xfail_executions