Skip to content

Commit

Permalink
FIX-modin-project#2374: fix binary operations
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev committed Dec 3, 2020
1 parent 8251234 commit b881a25
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
19 changes: 15 additions & 4 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1690,7 +1690,7 @@ def broadcast_apply_full_axis(
)

def _copartition(
self, axis, other, how, sort, force_repartition=False, reindex=True
self, axis, other, how, sort, force_repartition=False, reindexer=None
):
"""
Copartition two dataframes.
Expand Down Expand Up @@ -1736,9 +1736,20 @@ def _copartition(
left_old_idx = self.axes[axis]
right_old_idxes = index_other_obj

def make_map_func(index):
def make_map_func(index, left=True):
if index.equals(joined_index):
return lambda df: df
if reindexer == "binary":
# case for binary operation with duplicate values; way from pandas
_join_index, ilidx, iridx = self.axes[axis].join(
other[0].axes[axis], how=how, sort=sort, return_indexers=True
)

return lambda df: df._reindex_with_indexers(
{axis: [_join_index, ilidx if left else iridx]},
copy=False,
allow_dups=True,
)
return lambda df: df.reindex(joined_index, axis=axis)

# Start with this and we'll repartition the first time, and then not again.
Expand Down Expand Up @@ -1767,7 +1778,7 @@ def get_row_lengths(partitions):
reindexed_other = other[i]._frame_mgr_cls.map_axis_partitions(
axis,
other[i]._partitions,
make_map_func(right_old_idxes[i]),
make_map_func(right_old_idxes[i], left=False),
lengths=get_row_lengths(reindexed_self)
if axis == 0
else get_column_widths(reindexed_self),
Expand Down Expand Up @@ -1830,7 +1841,7 @@ def _binary_op(self, op, right_frame, join_type="outer"):
right_frame,
join_type,
sort=True,
reindex=False,
reindexer="binary",
)
# unwrap list returned by `copartition`.
# import pdb;pdb.set_trace()
Expand Down
2 changes: 1 addition & 1 deletion modin/test/backends/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_aligning_blocks_with_duplicated_index():

data21 = [0]
data22 = [1, 2, 3]
# import pandas as pd

df1 = pd.DataFrame(data11).append(pd.DataFrame(data12))
df2 = pd.DataFrame(data21).append(pd.DataFrame(data22))

Expand Down

0 comments on commit b881a25

Please sign in to comment.