From b78a6282357c2f9362731cf5489bb9c389783145 Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Tue, 21 Jul 2020 01:03:51 -0700 Subject: [PATCH] Improve performance of slice indexing (#1753) * Improves performance of slice indexing * Resolves #1752 Signed-off-by: Devin Petersohn * Fix issue Signed-off-by: Devin Petersohn * Fix Series cases Signed-off-by: Devin Petersohn --- modin/engines/base/frame/data.py | 149 +++++++++++++++--- .../dask/pandas_on_dask/frame/partition.py | 14 +- .../pandas_on_python/frame/partition.py | 14 +- .../ray/pandas_on_ray/frame/partition.py | 27 ++-- modin/pandas/indexing.py | 53 ++++++- 5 files changed, 192 insertions(+), 65 deletions(-) diff --git a/modin/engines/base/frame/data.py b/modin/engines/base/frame/data.py index a5ba3b45fdb..520e9d28bd8 100644 --- a/modin/engines/base/frame/data.py +++ b/modin/engines/base/frame/data.py @@ -344,6 +344,14 @@ def mask( BasePandasFrame A new BasePandasFrame from the mask provided. """ + if isinstance(row_numeric_idx, slice) and ( + row_numeric_idx == slice(None) or row_numeric_idx == slice(0, None) + ): + row_numeric_idx = None + if isinstance(col_numeric_idx, slice) and ( + col_numeric_idx == slice(None) or col_numeric_idx == slice(0, None) + ): + col_numeric_idx = None if ( row_indices is None and row_numeric_idx is None @@ -354,11 +362,20 @@ def mask( if row_indices is not None: row_numeric_idx = self.index.get_indexer_for(row_indices) if row_numeric_idx is not None: - row_partitions_list = self._get_dict_of_block_index(1, row_numeric_idx) - new_row_lengths = [ - len(indices) for _, indices in row_partitions_list.items() - ] - new_index = self.index[sorted(row_numeric_idx)] + row_partitions_list = self._get_dict_of_block_index(0, row_numeric_idx) + if isinstance(row_numeric_idx, slice): + # Row lengths for slice are calculated as the length of the slice + # on the partition. Often this will be the same length as the current + # length, but sometimes it is different, thus the extra calculation. + new_row_lengths = [ + len(range(*idx.indices(self._row_lengths[p]))) + for p, idx in row_partitions_list.items() + ] + # Use the slice to calculate the new row index + new_index = self.index[row_numeric_idx] + else: + new_row_lengths = [len(idx) for _, idx in row_partitions_list.items()] + new_index = self.index[sorted(row_numeric_idx)] else: row_partitions_list = { i: slice(None) for i in range(len(self._row_lengths)) @@ -369,15 +386,37 @@ def mask( if col_indices is not None: col_numeric_idx = self.columns.get_indexer_for(col_indices) if col_numeric_idx is not None: - col_partitions_list = self._get_dict_of_block_index(0, col_numeric_idx) - new_col_widths = [ - len(indices) for _, indices in col_partitions_list.items() - ] - new_columns = self.columns[sorted(col_numeric_idx)] - if self._dtypes is not None: - new_dtypes = self.dtypes[sorted(col_numeric_idx)] + col_partitions_list = self._get_dict_of_block_index(1, col_numeric_idx) + if isinstance(col_numeric_idx, slice): + # Column widths for slice are calculated as the length of the slice + # on the partition. Often this will be the same length as the current + # length, but sometimes it is different, thus the extra calculation. + new_col_widths = [ + len(range(*idx.indices(self._column_widths[p]))) + for p, idx in col_partitions_list.items() + ] + # Use the slice to calculate the new columns + new_columns = self.columns[col_numeric_idx] + assert sum(new_col_widths) == len( + new_columns + ), "{} != {}.\n{}\n{}\n{}".format( + sum(new_col_widths), + len(new_columns), + col_numeric_idx, + self._column_widths, + col_partitions_list, + ) + if self._dtypes is not None: + new_dtypes = self.dtypes[col_numeric_idx] + else: + new_dtypes = None else: - new_dtypes = None + new_col_widths = [len(idx) for _, idx in col_partitions_list.items()] + new_columns = self.columns[sorted(col_numeric_idx)] + if self._dtypes is not None: + new_dtypes = self.dtypes[sorted(col_numeric_idx)] + else: + new_dtypes = None else: col_partitions_list = { i: slice(None) for i in range(len(self._column_widths)) @@ -415,10 +454,12 @@ def mask( # common case to keep it fast. if ( row_numeric_idx is None + or isinstance(row_numeric_idx, slice) or len(row_numeric_idx) == 1 or np.all(row_numeric_idx[1:] >= row_numeric_idx[:-1]) ) and ( col_numeric_idx is None + or isinstance(col_numeric_idx, slice) or len(col_numeric_idx) == 1 or np.all(col_numeric_idx[1:] >= col_numeric_idx[:-1]) ): @@ -627,9 +668,9 @@ def _get_dict_of_block_index(self, axis, indices): Parameters ---------- - axis : (0 - columns, 1 - rows) + axis : (0 - rows, 1 - columns) The axis along which to get the indices - indices : list of int + indices : list of int, slice A list of global indices to convert. Returns @@ -638,11 +679,70 @@ def _get_dict_of_block_index(self, axis, indices): A mapping from partition to list of internal indices to extract from that partition. """ - indices = np.sort(indices) - if not axis: - bins = np.array(self._column_widths) - else: + # Fasttrack slices + if isinstance(indices, slice): + if indices == slice(None) or indices == slice(0, None): + return OrderedDict( + zip( + range(len(self.axes[axis])), + [slice(None)] * len(self.axes[axis]), + ) + ) + if indices.start is None or indices.start == 0: + last_part, last_idx = list( + self._get_dict_of_block_index(axis, [indices.stop]).items() + )[0] + dict_of_slices = OrderedDict( + zip(range(last_part), [slice(None)] * last_part) + ) + dict_of_slices.update({last_part: slice(last_idx[0])}) + return dict_of_slices + elif indices.stop is None or indices.stop >= len(self.axes[axis]): + first_part, first_idx = list( + self._get_dict_of_block_index(axis, [indices.start]).items() + )[0] + dict_of_slices = OrderedDict({first_part: slice(first_idx[0], None)}) + num_partitions = np.size(self._partitions, axis=axis) + part_list = range(first_part + 1, num_partitions) + dict_of_slices.update( + OrderedDict(zip(part_list, [slice(None)] * len(part_list))) + ) + return dict_of_slices + else: + first_part, first_idx = list( + self._get_dict_of_block_index(axis, [indices.start]).items() + )[0] + last_part, last_idx = list( + self._get_dict_of_block_index(axis, [indices.stop]).items() + )[0] + if first_part == last_part: + return OrderedDict({first_part: slice(first_idx[0], last_idx[0])}) + else: + if last_part - first_part == 1: + return OrderedDict( + { + first_part: slice(first_idx[0], None), + last_part: slice(None, last_idx[0]), + } + ) + else: + dict_of_slices = OrderedDict( + {first_part: slice(first_idx[0], None)} + ) + part_list = range(first_part + 1, last_part) + dict_of_slices.update( + OrderedDict(zip(part_list, [slice(None)] * len(part_list))) + ) + dict_of_slices.update({last_part: slice(None, last_idx[0])}) + return dict_of_slices + # Sort and convert negative indices to positive + indices = np.sort( + [i if i >= 0 else max(0, len(self.axes[axis]) + i) for i in indices] + ) + if axis == 0: bins = np.array(self._row_lengths) + else: + bins = np.array(self._column_widths) # INT_MAX to make sure we don't try to compute on partitions that don't exist. cumulative = np.append(bins[:-1].cumsum(), np.iinfo(bins.dtype).max) @@ -1029,7 +1129,9 @@ def _apply_full_axis_select_indices( old_index = self.index if axis else self.columns if apply_indices is not None: numeric_indices = old_index.get_indexer_for(apply_indices) - dict_indices = self._get_dict_of_block_index(axis, numeric_indices) + # Get the indices for the axis being applied to (it is the opposite of axis + # being applied over) + dict_indices = self._get_dict_of_block_index(axis ^ 1, numeric_indices) new_partitions = self._frame_mgr_cls.apply_func_to_select_indices_along_full_axis( axis, self._partitions, func, dict_indices, keep_remaining=keep_remaining ) @@ -1084,7 +1186,8 @@ def _apply_select_indices( # Convert indices to numeric indices old_index = self.index if axis else self.columns numeric_indices = old_index.get_indexer_for(apply_indices) - dict_indices = self._get_dict_of_block_index(axis, numeric_indices) + # Get indices being applied to (opposite of indices being applied over) + dict_indices = self._get_dict_of_block_index(axis ^ 1, numeric_indices) new_partitions = self._frame_mgr_cls.apply_func_to_select_indices( axis, self._partitions, @@ -1113,8 +1216,8 @@ def _apply_select_indices( assert row_indices is not None and col_indices is not None assert keep_remaining assert item_to_distribute is not None - row_partitions_list = self._get_dict_of_block_index(1, row_indices).items() - col_partitions_list = self._get_dict_of_block_index(0, col_indices).items() + row_partitions_list = self._get_dict_of_block_index(0, row_indices).items() + col_partitions_list = self._get_dict_of_block_index(1, col_indices).items() new_partitions = self._frame_mgr_cls.apply_func_to_indices_both_axis( self._partitions, func, diff --git a/modin/engines/dask/pandas_on_dask/frame/partition.py b/modin/engines/dask/pandas_on_dask/frame/partition.py index 67457172661..a702173abc0 100644 --- a/modin/engines/dask/pandas_on_dask/frame/partition.py +++ b/modin/engines/dask/pandas_on_dask/frame/partition.py @@ -98,16 +98,10 @@ def mask(self, row_indices, col_indices): new_obj = self.add_to_apply_calls( lambda df: pandas.DataFrame(df.iloc[row_indices, col_indices]) ) - new_obj._length_cache = ( - len(row_indices) - if not isinstance(row_indices, slice) - else self._length_cache - ) - new_obj._width_cache = ( - len(col_indices) - if not isinstance(col_indices, slice) - else self._width_cache - ) + if not isinstance(row_indices, slice): + new_obj._length_cache = len(row_indices) + if not isinstance(col_indices, slice): + new_obj._width_cache = len(col_indices) return new_obj def __copy__(self): diff --git a/modin/engines/python/pandas_on_python/frame/partition.py b/modin/engines/python/pandas_on_python/frame/partition.py index b24b8701ec1..d2cfc8f238a 100644 --- a/modin/engines/python/pandas_on_python/frame/partition.py +++ b/modin/engines/python/pandas_on_python/frame/partition.py @@ -91,16 +91,10 @@ def mask(self, row_indices=None, col_indices=None): new_obj = self.add_to_apply_calls( lambda df: pandas.DataFrame(df.iloc[row_indices, col_indices]) ) - new_obj._length_cache = ( - len(row_indices) - if not isinstance(row_indices, slice) - else self._length_cache - ) - new_obj._width_cache = ( - len(col_indices) - if not isinstance(col_indices, slice) - else self._width_cache - ) + if not isinstance(row_indices, slice): + new_obj._length_cache = len(row_indices) + if not isinstance(col_indices, slice): + new_obj._width_cache = len(col_indices) return new_obj def to_pandas(self): diff --git a/modin/engines/ray/pandas_on_ray/frame/partition.py b/modin/engines/ray/pandas_on_ray/frame/partition.py index 00c5ac34727..9cfb5f5af3e 100644 --- a/modin/engines/ray/pandas_on_ray/frame/partition.py +++ b/modin/engines/ray/pandas_on_ray/frame/partition.py @@ -103,30 +103,29 @@ def to_numpy(self): def mask(self, row_indices, col_indices): if ( - isinstance(row_indices, slice) + (isinstance(row_indices, slice) and row_indices == slice(None)) or ( - self._length_cache is not None + not isinstance(row_indices, slice) + and self._length_cache is not None and len(row_indices) == self._length_cache ) ) and ( - isinstance(col_indices, slice) - or (self._width_cache is not None and len(col_indices) == self._width_cache) + (isinstance(col_indices, slice) and col_indices == slice(None)) + or ( + not isinstance(col_indices, slice) + and self._width_cache is not None + and len(col_indices) == self._width_cache + ) ): return self.__copy__() new_obj = self.add_to_apply_calls( lambda df: pandas.DataFrame(df.iloc[row_indices, col_indices]) ) - new_obj._length_cache = ( - len(row_indices) - if not isinstance(row_indices, slice) - else self._length_cache - ) - new_obj._width_cache = ( - len(col_indices) - if not isinstance(col_indices, slice) - else self._width_cache - ) + if not isinstance(row_indices, slice): + new_obj._length_cache = len(row_indices) + if not isinstance(col_indices, slice): + new_obj._width_cache = len(col_indices) return new_obj @classmethod diff --git a/modin/pandas/indexing.py b/modin/pandas/indexing.py index f853ade33a7..68d60db0006 100644 --- a/modin/pandas/indexing.py +++ b/modin/pandas/indexing.py @@ -154,15 +154,38 @@ def __setitem__(self, row_lookup, col_lookup, item): item: The new item needs to be set. It can be any shape that's broadcast-able to the product of the lookup tables. """ - if len(row_lookup) == len(self.qc.index) and len(col_lookup) == 1: + # Convert slices to indices for the purposes of application. + # TODO (devin-petersohn): Apply to slice without conversion to list + if isinstance(row_lookup, slice): + row_lookup = range(len(self.qc.index))[row_lookup] + if isinstance(col_lookup, slice): + col_lookup = range(len(self.qc.columns))[col_lookup] + # This is True when we dealing with assignment of a full column. This case + # should be handled in a fastpath with `df[col] = item`. + if ( + len(row_lookup) == len(self.qc.index) + and len(col_lookup) == 1 + and hasattr(self.df, "columns") + ): self.df[self.df.columns[col_lookup][0]] = item + # This is True when we are assigning to a full row. We want to reuse the setitem + # mechanism to operate along only one axis for performance reasons. elif len(col_lookup) == len(self.qc.columns) and len(row_lookup) == 1: if hasattr(item, "_query_compiler"): item = item._query_compiler new_qc = self.qc.setitem(1, self.qc.index[row_lookup[0]], item) self.df._create_or_update_from_compiler(new_qc, inplace=True) + # Assignment to both axes. else: - to_shape = (len(row_lookup), len(col_lookup)) + if isinstance(row_lookup, slice): + new_row_len = len(self.df.index[row_lookup]) + else: + new_row_len = len(row_lookup) + if isinstance(col_lookup, slice): + new_col_len = len(self.df.columns[col_lookup]) + else: + new_col_len = len(col_lookup) + to_shape = new_row_len, new_col_len item = self._broadcast_item(row_lookup, col_lookup, item, to_shape) self._write_items(row_lookup, col_lookup, item) @@ -358,12 +381,26 @@ def __setitem__(self, key, item): super(_iLocIndexer, self).__setitem__(row_lookup, col_lookup, item) def _compute_lookup(self, row_loc, col_loc): - row_lookup = ( - pandas.RangeIndex(len(self.qc.index)).to_series().iloc[row_loc].index - ) - col_lookup = ( - pandas.RangeIndex(len(self.qc.columns)).to_series().iloc[col_loc].index - ) + if ( + not isinstance(row_loc, slice) + or isinstance(row_loc, slice) + and row_loc.step is not None + ): + row_lookup = ( + pandas.RangeIndex(len(self.qc.index)).to_series().iloc[row_loc].index + ) + else: + row_lookup = row_loc + if ( + not isinstance(col_loc, slice) + or isinstance(col_loc, slice) + and col_loc.step is not None + ): + col_lookup = ( + pandas.RangeIndex(len(self.qc.columns)).to_series().iloc[col_loc].index + ) + else: + col_lookup = col_loc return row_lookup, col_lookup def _check_dtypes(self, locator):