Skip to content

Commit

Permalink
Improve performance of slice indexing (modin-project#1753)
Browse files Browse the repository at this point in the history
* Improves performance of slice indexing

* Resolves modin-project#1752

Signed-off-by: Devin Petersohn <devin.petersohn@gmail.com>

* Fix issue

Signed-off-by: Devin Petersohn <devin.petersohn@gmail.com>

* Fix Series cases

Signed-off-by: Devin Petersohn <devin.petersohn@gmail.com>
  • Loading branch information
devin-petersohn authored and aregm committed Sep 16, 2020
1 parent e647d26 commit b78a628
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 65 deletions.
149 changes: 126 additions & 23 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,14 @@ def mask(
BasePandasFrame
A new BasePandasFrame from the mask provided.
"""
if isinstance(row_numeric_idx, slice) and (
row_numeric_idx == slice(None) or row_numeric_idx == slice(0, None)
):
row_numeric_idx = None
if isinstance(col_numeric_idx, slice) and (
col_numeric_idx == slice(None) or col_numeric_idx == slice(0, None)
):
col_numeric_idx = None
if (
row_indices is None
and row_numeric_idx is None
Expand All @@ -354,11 +362,20 @@ def mask(
if row_indices is not None:
row_numeric_idx = self.index.get_indexer_for(row_indices)
if row_numeric_idx is not None:
row_partitions_list = self._get_dict_of_block_index(1, row_numeric_idx)
new_row_lengths = [
len(indices) for _, indices in row_partitions_list.items()
]
new_index = self.index[sorted(row_numeric_idx)]
row_partitions_list = self._get_dict_of_block_index(0, row_numeric_idx)
if isinstance(row_numeric_idx, slice):
# Row lengths for slice are calculated as the length of the slice
# on the partition. Often this will be the same length as the current
# length, but sometimes it is different, thus the extra calculation.
new_row_lengths = [
len(range(*idx.indices(self._row_lengths[p])))
for p, idx in row_partitions_list.items()
]
# Use the slice to calculate the new row index
new_index = self.index[row_numeric_idx]
else:
new_row_lengths = [len(idx) for _, idx in row_partitions_list.items()]
new_index = self.index[sorted(row_numeric_idx)]
else:
row_partitions_list = {
i: slice(None) for i in range(len(self._row_lengths))
Expand All @@ -369,15 +386,37 @@ def mask(
if col_indices is not None:
col_numeric_idx = self.columns.get_indexer_for(col_indices)
if col_numeric_idx is not None:
col_partitions_list = self._get_dict_of_block_index(0, col_numeric_idx)
new_col_widths = [
len(indices) for _, indices in col_partitions_list.items()
]
new_columns = self.columns[sorted(col_numeric_idx)]
if self._dtypes is not None:
new_dtypes = self.dtypes[sorted(col_numeric_idx)]
col_partitions_list = self._get_dict_of_block_index(1, col_numeric_idx)
if isinstance(col_numeric_idx, slice):
# Column widths for slice are calculated as the length of the slice
# on the partition. Often this will be the same length as the current
# length, but sometimes it is different, thus the extra calculation.
new_col_widths = [
len(range(*idx.indices(self._column_widths[p])))
for p, idx in col_partitions_list.items()
]
# Use the slice to calculate the new columns
new_columns = self.columns[col_numeric_idx]
assert sum(new_col_widths) == len(
new_columns
), "{} != {}.\n{}\n{}\n{}".format(
sum(new_col_widths),
len(new_columns),
col_numeric_idx,
self._column_widths,
col_partitions_list,
)
if self._dtypes is not None:
new_dtypes = self.dtypes[col_numeric_idx]
else:
new_dtypes = None
else:
new_dtypes = None
new_col_widths = [len(idx) for _, idx in col_partitions_list.items()]
new_columns = self.columns[sorted(col_numeric_idx)]
if self._dtypes is not None:
new_dtypes = self.dtypes[sorted(col_numeric_idx)]
else:
new_dtypes = None
else:
col_partitions_list = {
i: slice(None) for i in range(len(self._column_widths))
Expand Down Expand Up @@ -415,10 +454,12 @@ def mask(
# common case to keep it fast.
if (
row_numeric_idx is None
or isinstance(row_numeric_idx, slice)
or len(row_numeric_idx) == 1
or np.all(row_numeric_idx[1:] >= row_numeric_idx[:-1])
) and (
col_numeric_idx is None
or isinstance(col_numeric_idx, slice)
or len(col_numeric_idx) == 1
or np.all(col_numeric_idx[1:] >= col_numeric_idx[:-1])
):
Expand Down Expand Up @@ -627,9 +668,9 @@ def _get_dict_of_block_index(self, axis, indices):
Parameters
----------
axis : (0 - columns, 1 - rows)
axis : (0 - rows, 1 - columns)
The axis along which to get the indices
indices : list of int
indices : list of int, slice
A list of global indices to convert.
Returns
Expand All @@ -638,11 +679,70 @@ def _get_dict_of_block_index(self, axis, indices):
A mapping from partition to list of internal indices to extract from that
partition.
"""
indices = np.sort(indices)
if not axis:
bins = np.array(self._column_widths)
else:
# Fasttrack slices
if isinstance(indices, slice):
if indices == slice(None) or indices == slice(0, None):
return OrderedDict(
zip(
range(len(self.axes[axis])),
[slice(None)] * len(self.axes[axis]),
)
)
if indices.start is None or indices.start == 0:
last_part, last_idx = list(
self._get_dict_of_block_index(axis, [indices.stop]).items()
)[0]
dict_of_slices = OrderedDict(
zip(range(last_part), [slice(None)] * last_part)
)
dict_of_slices.update({last_part: slice(last_idx[0])})
return dict_of_slices
elif indices.stop is None or indices.stop >= len(self.axes[axis]):
first_part, first_idx = list(
self._get_dict_of_block_index(axis, [indices.start]).items()
)[0]
dict_of_slices = OrderedDict({first_part: slice(first_idx[0], None)})
num_partitions = np.size(self._partitions, axis=axis)
part_list = range(first_part + 1, num_partitions)
dict_of_slices.update(
OrderedDict(zip(part_list, [slice(None)] * len(part_list)))
)
return dict_of_slices
else:
first_part, first_idx = list(
self._get_dict_of_block_index(axis, [indices.start]).items()
)[0]
last_part, last_idx = list(
self._get_dict_of_block_index(axis, [indices.stop]).items()
)[0]
if first_part == last_part:
return OrderedDict({first_part: slice(first_idx[0], last_idx[0])})
else:
if last_part - first_part == 1:
return OrderedDict(
{
first_part: slice(first_idx[0], None),
last_part: slice(None, last_idx[0]),
}
)
else:
dict_of_slices = OrderedDict(
{first_part: slice(first_idx[0], None)}
)
part_list = range(first_part + 1, last_part)
dict_of_slices.update(
OrderedDict(zip(part_list, [slice(None)] * len(part_list)))
)
dict_of_slices.update({last_part: slice(None, last_idx[0])})
return dict_of_slices
# Sort and convert negative indices to positive
indices = np.sort(
[i if i >= 0 else max(0, len(self.axes[axis]) + i) for i in indices]
)
if axis == 0:
bins = np.array(self._row_lengths)
else:
bins = np.array(self._column_widths)
# INT_MAX to make sure we don't try to compute on partitions that don't exist.
cumulative = np.append(bins[:-1].cumsum(), np.iinfo(bins.dtype).max)

Expand Down Expand Up @@ -1029,7 +1129,9 @@ def _apply_full_axis_select_indices(
old_index = self.index if axis else self.columns
if apply_indices is not None:
numeric_indices = old_index.get_indexer_for(apply_indices)
dict_indices = self._get_dict_of_block_index(axis, numeric_indices)
# Get the indices for the axis being applied to (it is the opposite of axis
# being applied over)
dict_indices = self._get_dict_of_block_index(axis ^ 1, numeric_indices)
new_partitions = self._frame_mgr_cls.apply_func_to_select_indices_along_full_axis(
axis, self._partitions, func, dict_indices, keep_remaining=keep_remaining
)
Expand Down Expand Up @@ -1084,7 +1186,8 @@ def _apply_select_indices(
# Convert indices to numeric indices
old_index = self.index if axis else self.columns
numeric_indices = old_index.get_indexer_for(apply_indices)
dict_indices = self._get_dict_of_block_index(axis, numeric_indices)
# Get indices being applied to (opposite of indices being applied over)
dict_indices = self._get_dict_of_block_index(axis ^ 1, numeric_indices)
new_partitions = self._frame_mgr_cls.apply_func_to_select_indices(
axis,
self._partitions,
Expand Down Expand Up @@ -1113,8 +1216,8 @@ def _apply_select_indices(
assert row_indices is not None and col_indices is not None
assert keep_remaining
assert item_to_distribute is not None
row_partitions_list = self._get_dict_of_block_index(1, row_indices).items()
col_partitions_list = self._get_dict_of_block_index(0, col_indices).items()
row_partitions_list = self._get_dict_of_block_index(0, row_indices).items()
col_partitions_list = self._get_dict_of_block_index(1, col_indices).items()
new_partitions = self._frame_mgr_cls.apply_func_to_indices_both_axis(
self._partitions,
func,
Expand Down
14 changes: 4 additions & 10 deletions modin/engines/dask/pandas_on_dask/frame/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,10 @@ def mask(self, row_indices, col_indices):
new_obj = self.add_to_apply_calls(
lambda df: pandas.DataFrame(df.iloc[row_indices, col_indices])
)
new_obj._length_cache = (
len(row_indices)
if not isinstance(row_indices, slice)
else self._length_cache
)
new_obj._width_cache = (
len(col_indices)
if not isinstance(col_indices, slice)
else self._width_cache
)
if not isinstance(row_indices, slice):
new_obj._length_cache = len(row_indices)
if not isinstance(col_indices, slice):
new_obj._width_cache = len(col_indices)
return new_obj

def __copy__(self):
Expand Down
14 changes: 4 additions & 10 deletions modin/engines/python/pandas_on_python/frame/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,10 @@ def mask(self, row_indices=None, col_indices=None):
new_obj = self.add_to_apply_calls(
lambda df: pandas.DataFrame(df.iloc[row_indices, col_indices])
)
new_obj._length_cache = (
len(row_indices)
if not isinstance(row_indices, slice)
else self._length_cache
)
new_obj._width_cache = (
len(col_indices)
if not isinstance(col_indices, slice)
else self._width_cache
)
if not isinstance(row_indices, slice):
new_obj._length_cache = len(row_indices)
if not isinstance(col_indices, slice):
new_obj._width_cache = len(col_indices)
return new_obj

def to_pandas(self):
Expand Down
27 changes: 13 additions & 14 deletions modin/engines/ray/pandas_on_ray/frame/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,30 +103,29 @@ def to_numpy(self):

def mask(self, row_indices, col_indices):
if (
isinstance(row_indices, slice)
(isinstance(row_indices, slice) and row_indices == slice(None))
or (
self._length_cache is not None
not isinstance(row_indices, slice)
and self._length_cache is not None
and len(row_indices) == self._length_cache
)
) and (
isinstance(col_indices, slice)
or (self._width_cache is not None and len(col_indices) == self._width_cache)
(isinstance(col_indices, slice) and col_indices == slice(None))
or (
not isinstance(col_indices, slice)
and self._width_cache is not None
and len(col_indices) == self._width_cache
)
):
return self.__copy__()

new_obj = self.add_to_apply_calls(
lambda df: pandas.DataFrame(df.iloc[row_indices, col_indices])
)
new_obj._length_cache = (
len(row_indices)
if not isinstance(row_indices, slice)
else self._length_cache
)
new_obj._width_cache = (
len(col_indices)
if not isinstance(col_indices, slice)
else self._width_cache
)
if not isinstance(row_indices, slice):
new_obj._length_cache = len(row_indices)
if not isinstance(col_indices, slice):
new_obj._width_cache = len(col_indices)
return new_obj

@classmethod
Expand Down
53 changes: 45 additions & 8 deletions modin/pandas/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,38 @@ def __setitem__(self, row_lookup, col_lookup, item):
item: The new item needs to be set. It can be any shape that's
broadcast-able to the product of the lookup tables.
"""
if len(row_lookup) == len(self.qc.index) and len(col_lookup) == 1:
# Convert slices to indices for the purposes of application.
# TODO (devin-petersohn): Apply to slice without conversion to list
if isinstance(row_lookup, slice):
row_lookup = range(len(self.qc.index))[row_lookup]
if isinstance(col_lookup, slice):
col_lookup = range(len(self.qc.columns))[col_lookup]
# This is True when we dealing with assignment of a full column. This case
# should be handled in a fastpath with `df[col] = item`.
if (
len(row_lookup) == len(self.qc.index)
and len(col_lookup) == 1
and hasattr(self.df, "columns")
):
self.df[self.df.columns[col_lookup][0]] = item
# This is True when we are assigning to a full row. We want to reuse the setitem
# mechanism to operate along only one axis for performance reasons.
elif len(col_lookup) == len(self.qc.columns) and len(row_lookup) == 1:
if hasattr(item, "_query_compiler"):
item = item._query_compiler
new_qc = self.qc.setitem(1, self.qc.index[row_lookup[0]], item)
self.df._create_or_update_from_compiler(new_qc, inplace=True)
# Assignment to both axes.
else:
to_shape = (len(row_lookup), len(col_lookup))
if isinstance(row_lookup, slice):
new_row_len = len(self.df.index[row_lookup])
else:
new_row_len = len(row_lookup)
if isinstance(col_lookup, slice):
new_col_len = len(self.df.columns[col_lookup])
else:
new_col_len = len(col_lookup)
to_shape = new_row_len, new_col_len
item = self._broadcast_item(row_lookup, col_lookup, item, to_shape)
self._write_items(row_lookup, col_lookup, item)

Expand Down Expand Up @@ -358,12 +381,26 @@ def __setitem__(self, key, item):
super(_iLocIndexer, self).__setitem__(row_lookup, col_lookup, item)

def _compute_lookup(self, row_loc, col_loc):
row_lookup = (
pandas.RangeIndex(len(self.qc.index)).to_series().iloc[row_loc].index
)
col_lookup = (
pandas.RangeIndex(len(self.qc.columns)).to_series().iloc[col_loc].index
)
if (
not isinstance(row_loc, slice)
or isinstance(row_loc, slice)
and row_loc.step is not None
):
row_lookup = (
pandas.RangeIndex(len(self.qc.index)).to_series().iloc[row_loc].index
)
else:
row_lookup = row_loc
if (
not isinstance(col_loc, slice)
or isinstance(col_loc, slice)
and col_loc.step is not None
):
col_lookup = (
pandas.RangeIndex(len(self.qc.columns)).to_series().iloc[col_loc].index
)
else:
col_lookup = col_loc
return row_lookup, col_lookup

def _check_dtypes(self, locator):
Expand Down

0 comments on commit b78a628

Please sign in to comment.