Skip to content

Commit c2e7f9e

Browse files
authored
FIX-#2374: remove extra code; add pandas way to handle duplicate values in reindex func for binary operations (#2378)
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
1 parent 7458746 commit c2e7f9e

File tree

4 files changed

+188
-98
lines changed

4 files changed

+188
-98
lines changed

asv_bench/benchmarks/benchmarks.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
JOIN_DATA_SIZE = MERGE_DATA_SIZE
4242
ARITHMETIC_DATA_SIZE = GROUPBY_DATA_SIZE
4343

44+
CONCAT_DATA_SIZE = [(10_128, 100, 10_000, 128)]
45+
4446

4547
class TimeGroupBy:
4648
param_names = ["impl", "data_type", "data_size"]
@@ -111,6 +113,51 @@ def time_merge(self, impl, data_type, data_size, how, sort):
111113
self.df1.merge(self.df2, on=self.df1.columns[0], how=how, sort=sort)
112114

113115

116+
class TimeConcat:
117+
param_names = ["data_type", "data_size", "how", "axis"]
118+
params = [
119+
["int"],
120+
CONCAT_DATA_SIZE,
121+
["inner"],
122+
[0, 1],
123+
]
124+
125+
def setup(self, data_type, data_size, how, axis):
126+
# shape for generate_dataframe: first - ncols, second - nrows
127+
self.df1 = generate_dataframe(
128+
"modin", data_type, data_size[1], data_size[0], RAND_LOW, RAND_HIGH
129+
)
130+
self.df2 = generate_dataframe(
131+
"modin", data_type, data_size[3], data_size[2], RAND_LOW, RAND_HIGH
132+
)
133+
134+
def time_concat(self, data_type, data_size, how, axis):
135+
pd.concat([self.df1, self.df2], axis=axis, join=how)
136+
137+
138+
class TimeBinaryOp:
139+
param_names = ["data_type", "data_size", "binary_op", "axis"]
140+
params = [
141+
["int"],
142+
CONCAT_DATA_SIZE,
143+
["mul"],
144+
[0, 1],
145+
]
146+
147+
def setup(self, data_type, data_size, binary_op, axis):
148+
# shape for generate_dataframe: first - ncols, second - nrows
149+
self.df1 = generate_dataframe(
150+
"modin", data_type, data_size[1], data_size[0], RAND_LOW, RAND_HIGH
151+
)
152+
self.df2 = generate_dataframe(
153+
"modin", data_type, data_size[3], data_size[2], RAND_LOW, RAND_HIGH
154+
)
155+
self.op = getattr(self.df1, binary_op)
156+
157+
def time_concat(self, data_type, data_size, binary_op, axis):
158+
self.op(self.df2, axis=axis)
159+
160+
114161
class TimeArithmetic:
115162
param_names = ["impl", "data_type", "data_size", "axis"]
116163
params = [

modin/engines/base/frame/data.py

Lines changed: 124 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -967,7 +967,8 @@ def internal(block_idx, global_index):
967967
]
968968
return OrderedDict(partition_ids_with_indices)
969969

970-
def _join_index_objects(self, axis, other_index, how, sort):
970+
@staticmethod
971+
def _join_index_objects(axis, indexes, how, sort):
971972
"""
972973
Join the pair of index objects (columns or rows) by a given strategy.
973974
@@ -976,37 +977,80 @@ def _join_index_objects(self, axis, other_index, how, sort):
976977
977978
Parameters
978979
----------
979-
axis : 0 or 1
980-
The axis index object to join (0 - rows, 1 - columns).
981-
other_index : Index
982-
The other_index to join on.
983-
how : {'left', 'right', 'inner', 'outer'}
984-
The type of join to join to make.
985-
sort : boolean
986-
Whether or not to sort the joined index
980+
axis : 0 or 1
981+
The axis index object to join (0 - rows, 1 - columns).
982+
indexes : list(Index)
983+
The indexes to join on.
984+
how : {'left', 'right', 'inner', 'outer'}
985+
The type of join to join to make.
986+
sort : boolean
987+
Whether or not to sort the joined index
987988
988989
Returns
989990
-------
990-
Index
991-
Joined indices.
991+
(Index, func)
992+
Joined index with make_reindexer func
992993
"""
994+
assert isinstance(indexes, list)
993995

994-
def merge_index(obj1, obj2):
996+
# define helper functions
997+
def merge(left_index, right_index):
995998
if axis == 1 and how == "outer" and not sort:
996-
return obj1.union(obj2, sort=False)
999+
return left_index.union(right_index, sort=False)
9971000
else:
998-
return obj1.join(obj2, how=how, sort=sort)
999-
1000-
if isinstance(other_index, list):
1001-
joined_obj = self.columns if axis else self.index
1002-
# TODO: revisit for performance
1003-
for obj in other_index:
1004-
joined_obj = merge_index(joined_obj, obj)
1005-
return joined_obj
1006-
if axis:
1007-
return merge_index(self.columns, other_index)
1001+
return left_index.join(right_index, how=how, sort=sort)
1002+
1003+
# define condition for joining indexes
1004+
do_join_index = False
1005+
for index in indexes[1:]:
1006+
if not indexes[0].equals(index):
1007+
do_join_index = True
1008+
break
1009+
1010+
# define condition for joining indexes with getting indexers
1011+
is_duplicates = any(not index.is_unique for index in indexes) and axis == 0
1012+
indexers = []
1013+
if is_duplicates:
1014+
indexers = [None] * len(indexes)
1015+
1016+
# perform joining indexes
1017+
if do_join_index:
1018+
if len(indexes) == 2 and is_duplicates:
1019+
# in case of count of indexes > 2 we should perform joining all indexes
1020+
# after that get indexers
1021+
# in the fast path we can obtain joined_index and indexers in one call
1022+
joined_index, indexers[0], indexers[1] = indexes[0].join(
1023+
indexes[1], how=how, sort=sort, return_indexers=True
1024+
)
1025+
else:
1026+
joined_index = indexes[0]
1027+
# TODO: revisit for performance
1028+
for index in indexes[1:]:
1029+
joined_index = merge(joined_index, index)
1030+
1031+
if is_duplicates:
1032+
for i, index in enumerate(indexes):
1033+
indexers[i] = index.get_indexer_for(joined_index)
10081034
else:
1009-
return self.index.join(other_index, how=how, sort=sort)
1035+
joined_index = indexes[0].copy()
1036+
1037+
def make_reindexer(do_reindex: bool, frame_idx: int):
1038+
# the order of the frames must match the order of the indexes
1039+
if not do_reindex:
1040+
return lambda df: df
1041+
1042+
if is_duplicates:
1043+
assert indexers != []
1044+
1045+
return lambda df: df._reindex_with_indexers(
1046+
{0: [joined_index, indexers[frame_idx]]},
1047+
copy=True,
1048+
allow_dups=True,
1049+
)
1050+
1051+
return lambda df: df.reindex(joined_index, axis=axis)
1052+
1053+
return joined_index, make_reindexer
10101054

10111055
# Internal methods
10121056
# These methods are for building the correct answer in a modular way.
@@ -1697,19 +1741,19 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
16971741
16981742
Parameters
16991743
----------
1700-
axis : 0 or 1
1701-
The axis to copartition along (0 - rows, 1 - columns).
1702-
other : BasePandasFrame
1703-
The other dataframes(s) to copartition against.
1704-
how : str
1705-
How to manage joining the index object ("left", "right", etc.)
1706-
sort : boolean
1707-
Whether or not to sort the joined index.
1708-
force_repartition : boolean
1709-
Whether or not to force the repartitioning. By default,
1710-
this method will skip repartitioning if it is possible. This is because
1711-
reindexing is extremely inefficient. Because this method is used to
1712-
`join` or `append`, it is vital that the internal indices match.
1744+
axis : 0 or 1
1745+
The axis to copartition along (0 - rows, 1 - columns).
1746+
other : BasePandasFrame
1747+
The other dataframes(s) to copartition against.
1748+
how : str
1749+
How to manage joining the index object ("left", "right", etc.)
1750+
sort : boolean
1751+
Whether or not to sort the joined index.
1752+
force_repartition : bool, default False
1753+
Whether or not to force the repartitioning. By default,
1754+
this method will skip repartitioning if it is possible. This is because
1755+
reindexing is extremely inefficient. Because this method is used to
1756+
`join` or `append`, it is vital that the internal indices match.
17131757
17141758
Returns
17151759
-------
@@ -1719,79 +1763,62 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
17191763
if isinstance(other, type(self)):
17201764
other = [other]
17211765

1722-
is_aligning_applied = False
1723-
for i in range(len(other)):
1724-
if (
1725-
len(self._partitions) != len(other[i]._partitions)
1726-
and len(self.axes[0]) == len(other[i].axes[0])
1727-
and axis == 0
1728-
):
1729-
is_aligning_applied = True
1730-
self._partitions = self._frame_mgr_cls.map_axis_partitions(
1731-
axis, self._partitions, lambda df: df
1732-
)
1733-
other[i]._partitions = other[i]._frame_mgr_cls.map_axis_partitions(
1734-
axis, other[i]._partitions, lambda df: df
1735-
)
1736-
1737-
if (
1738-
all(o.axes[axis].equals(self.axes[axis]) for o in other)
1739-
and not is_aligning_applied
1740-
):
1741-
return (
1742-
self._partitions,
1743-
[self._simple_shuffle(axis, o) for o in other],
1744-
self.axes[axis].copy(),
1745-
)
1766+
# define helper functions
1767+
def get_axis_lengths(partitions, axis):
1768+
if axis:
1769+
return [obj.width() for obj in partitions[0]]
1770+
return [obj.length() for obj in partitions.T[0]]
17461771

1747-
index_other_obj = [o.axes[axis] for o in other]
1748-
joined_index = self._join_index_objects(axis, index_other_obj, how, sort)
1749-
# We have to set these because otherwise when we perform the functions it may
1750-
# end up serializing this entire object.
1751-
left_old_idx = self.axes[axis]
1752-
right_old_idxes = index_other_obj
1772+
self_index = self.axes[axis]
1773+
others_index = [o.axes[axis] for o in other]
1774+
joined_index, make_reindexer = self._join_index_objects(
1775+
axis, [self_index] + others_index, how, sort
1776+
)
17531777

1754-
def make_map_func():
1755-
if not joined_index.is_unique and axis == 0:
1756-
return lambda df: df
1757-
return lambda df: df.reindex(joined_index, axis=axis)
1778+
# define conditions for reindexing and repartitioning `self` frame
1779+
do_reindex_self = not self_index.equals(joined_index)
1780+
do_repartition_self = force_repartition or do_reindex_self
17581781

1759-
# Start with this and we'll repartition the first time, and then not again.
1760-
if is_aligning_applied or (
1761-
not force_repartition and left_old_idx.equals(joined_index)
1762-
):
1763-
reindexed_self = self._partitions
1764-
else:
1782+
# perform repartitioning and reindexing for `self` frame if needed
1783+
if do_repartition_self:
17651784
reindexed_self = self._frame_mgr_cls.map_axis_partitions(
17661785
axis,
17671786
self._partitions,
1768-
make_map_func(),
1787+
# self frame has 0 idx
1788+
make_reindexer(do_reindex_self, 0),
17691789
)
1790+
else:
1791+
reindexed_self = self._partitions
17701792

1771-
def get_column_widths(partitions):
1772-
if len(partitions) > 0:
1773-
return [obj.width() for obj in partitions[0]]
1793+
# define length of `self` and `other` frames to aligning purpose
1794+
self_lengths = get_axis_lengths(reindexed_self, axis)
1795+
others_lengths = [o._axes_lengths[axis] for o in other]
17741796

1775-
def get_row_lengths(partitions):
1776-
if len(partitions.T) > 0:
1777-
return [obj.length() for obj in partitions.T[0]]
1797+
# define conditions for reindexing and repartitioning `other` frames
1798+
do_reindex_others = [not index.equals(joined_index) for index in others_index]
17781799

1779-
reindexed_other_list = []
1800+
do_repartition_others = [None] * len(other)
17801801
for i in range(len(other)):
1781-
if is_aligning_applied or (
1782-
not force_repartition and right_old_idxes[i].equals(joined_index)
1783-
):
1784-
reindexed_other = other[i]._partitions
1785-
else:
1786-
reindexed_other = other[i]._frame_mgr_cls.map_axis_partitions(
1802+
do_repartition_others[i] = (
1803+
force_repartition
1804+
or do_reindex_others[i]
1805+
or others_lengths[i] != self_lengths
1806+
)
1807+
1808+
# perform repartitioning and reindexing for `other` frames if needed
1809+
reindexed_other_list = [None] * len(other)
1810+
for i in range(len(other)):
1811+
if do_repartition_others[i]:
1812+
reindexed_other_list[i] = other[i]._frame_mgr_cls.map_axis_partitions(
17871813
axis,
17881814
other[i]._partitions,
1789-
make_map_func(),
1790-
lengths=get_row_lengths(reindexed_self)
1791-
if axis == 0
1792-
else get_column_widths(reindexed_self),
1815+
# indices of others frame start from 1 (0 - self frame)
1816+
make_reindexer(do_reindex_others[i], 1 + i),
1817+
lengths=self_lengths,
17931818
)
1794-
reindexed_other_list.append(reindexed_other)
1819+
else:
1820+
reindexed_other_list[i] = other[i]._partitions
1821+
17951822
return reindexed_self, reindexed_other_list, joined_index
17961823

17971824
def _simple_shuffle(self, axis, other):
@@ -1900,7 +1927,7 @@ def _concat(self, axis, others, how, sort):
19001927
]
19011928
else:
19021929
left_parts, right_parts, joined_index = self._copartition(
1903-
axis ^ 1, others, how, sort, force_repartition=True
1930+
axis ^ 1, others, how, sort, force_repartition=False
19041931
)
19051932
new_lengths = None
19061933
new_widths = None

modin/engines/base/frame/partition_manager.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ def broadcast_axis_partitions(
238238
right : The right partitions.
239239
keep_partitioning : boolean. Default is False
240240
The flag to keep partitions for Modin Frame.
241-
lengths : list(int)
241+
lengths : list(int), default None
242242
The list of lengths to shuffle the object.
243243
244244
Returns
@@ -250,6 +250,8 @@ def broadcast_axis_partitions(
250250
# partitions as best we can right now.
251251
if keep_partitioning:
252252
num_splits = len(left) if axis == 0 else len(left.T)
253+
elif lengths:
254+
num_splits = len(lengths)
253255
else:
254256
num_splits = cls._compute_num_partitions()
255257
preprocessed_map_func = cls.preprocess_func(apply_func)

modin/test/backends/pandas/test_internals.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
# governing permissions and limitations under the License.
1313

1414
import modin.pandas as pd
15+
from modin.pandas.test.utils import create_test_dfs
16+
17+
pd.DEFAULT_NPARTITIONS = 4
1518

1619

1720
def test_aligning_blocks():
@@ -38,3 +41,14 @@ def test_aligning_blocks_with_duplicated_index():
3841
df2 = pd.DataFrame(data21).append(pd.DataFrame(data22))
3942

4043
repr(df1 - df2)
44+
45+
46+
def test_aligning_partitions():
47+
data = [0, 1, 2, 3, 4, 5]
48+
modin_df1, _ = create_test_dfs({"a": data, "b": data})
49+
modin_df = modin_df1.loc[:2]
50+
51+
modin_df2 = modin_df.append(modin_df)
52+
53+
modin_df2["c"] = modin_df1["b"]
54+
repr(modin_df2)

0 commit comments

Comments
 (0)