Skip to content

Commit

Permalink
REFACTOR-modin-project#1763: Move logic of merge (modin-project#1764)
Browse files Browse the repository at this point in the history
into the query compiler

Signed-off-by: Igoshev, Yaroslav <yaroslav.igoshev@intel.com>
  • Loading branch information
YarShev authored Jul 23, 2020
1 parent 77a2994 commit 8658f09
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 80 deletions.
72 changes: 64 additions & 8 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,16 +388,55 @@ def merge(self, right, **kwargs):
-----
See pd.merge or pd.DataFrame.merge for more info on kwargs.
"""
right = right.to_pandas()
how = kwargs.get("how", "inner")
on = kwargs.get("on", None)
left_on = kwargs.get("left_on", None)
right_on = kwargs.get("right_on", None)
left_index = kwargs.get("left_index", False)
right_index = kwargs.get("right_index", False)
sort = kwargs.get("sort", False)

sort = kwargs.get("sort")
kwargs["sort"] = not sort if sort else sort
if how in ["left", "inner"] and left_index is False and right_index is False:
right = right.to_pandas()

def map_func(left, right=right, kwargs=kwargs):
return pandas.merge(left, right, **kwargs)
kwargs["sort"] = False

new_modin_frame = self._modin_frame._apply_full_axis(1, map_func)
return self.__constructor__(new_modin_frame)
def map_func(left, right=right, kwargs=kwargs):
return pandas.merge(left, right, **kwargs)

new_self = self.__constructor__(
self._modin_frame._apply_full_axis(1, map_func)
)
is_reset_index = True
if left_on and right_on:
left_on = left_on if is_list_like(left_on) else [left_on]
right_on = right_on if is_list_like(right_on) else [right_on]
is_reset_index = (
False
if any(o in new_self.index.names for o in left_on)
and any(o in right.index.names for o in right_on)
else True
)
if sort:
new_self = (
new_self.sort_rows_by_column_values(left_on.append(right_on))
if is_reset_index
else new_self.sort_index(axis=0, level=left_on.append(right_on))
)
if on:
on = on if is_list_like(on) else [on]
is_reset_index = not any(
o in new_self.index.names and o in right.index.names for o in on
)
if sort:
new_self = (
new_self.sort_rows_by_column_values(on)
if is_reset_index
else new_self.sort_index(axis=0, level=on)
)
return new_self.reset_index(drop=True) if is_reset_index else new_self
else:
return self.default_to_pandas(pandas.DataFrame.merge, right, **kwargs)

# END Inter-Data operations

Expand Down Expand Up @@ -1504,6 +1543,21 @@ def sort_index(self, **kwargs):
QueryCompiler containing the data sorted by columns or indices.
"""
axis = kwargs.pop("axis", 0)
level = kwargs.pop("level", None)
sort_remaining = kwargs.pop("sort_remaining", True)
kwargs["inplace"] = False

if level is not None or (
(axis == 0 and isinstance(self.index, pandas.MultiIndex))
or (axis == 1 and isinstance(self.columns, pandas.MultiIndex))
):
return self.default_to_pandas(
pandas.DataFrame.sort_index,
level=level,
sort_remaining=sort_remaining,
**kwargs
)

# sort_index can have ascending be None and behaves as if it is False.
# sort_values cannot have ascending be None. Thus, the following logic is to
# convert the ascending argument to one that works with sort_values
Expand All @@ -1519,7 +1573,9 @@ def sort_index(self, **kwargs):
new_columns = self.columns
new_modin_frame = self._modin_frame._apply_full_axis(
axis,
lambda df: df.sort_index(axis=axis, **kwargs),
lambda df: df.sort_index(
axis=axis, level=level, sort_remaining=sort_remaining, **kwargs
),
new_index,
new_columns,
dtypes="copy" if axis == 0 else None,
Expand Down
4 changes: 2 additions & 2 deletions modin/data_management/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ def split_result_of_axis_func_pandas(axis, num_splits, result, length_list=None)

def length_fn_pandas(df):
assert isinstance(df, pandas.DataFrame)
return len(df) if len(df.columns) > 0 else 0
return len(df) if len(df) > 0 else 0


def width_fn_pandas(df):
assert isinstance(df, pandas.DataFrame)
return len(df.columns) if len(df) > 0 else 0
return len(df.columns) if len(df.columns) > 0 else 0
30 changes: 10 additions & 20 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2800,28 +2800,18 @@ def sort_index(
A sorted DataFrame
"""
axis = self._get_axis_number(axis)
if level is not None or (
(axis == 0 and isinstance(self.index, pandas.MultiIndex))
or (axis == 1 and isinstance(self.columns, pandas.MultiIndex))
):
new_query_compiler = self._default_to_pandas(
"sort_index",
axis=axis,
level=level,
ascending=ascending,
inplace=False,
kind=kind,
na_position=na_position,
sort_remaining=sort_remaining,
)._query_compiler
return self._create_or_update_from_compiler(new_query_compiler, inplace)
inplace = validate_bool_kwarg(inplace, "inplace")
new_query_compiler = self._query_compiler.sort_index(
axis=axis, ascending=ascending, kind=kind, na_position=na_position
axis=axis,
level=level,
ascending=ascending,
inplace=inplace,
kind=kind,
na_position=na_position,
sort_remaining=sort_remaining,
ignore_index=ignore_index,
)
if inplace:
self._update_inplace(new_query_compiler=new_query_compiler)
else:
return self.__constructor__(query_compiler=new_query_compiler)
return self._create_or_update_from_compiler(new_query_compiler, inplace)

def sort_values(
self,
Expand Down
51 changes: 1 addition & 50 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1491,57 +1491,8 @@ def merge(
right, how=how, lsuffix=suffixes[0], rsuffix=suffixes[1], sort=sort
)

if how in ["left", "inner"] and left_index is False and right_index is False:
result = self.__constructor__(
query_compiler=self._query_compiler.merge(
right._query_compiler,
how=how,
on=on,
left_on=left_on,
right_on=right_on,
left_index=left_index,
right_index=right_index,
sort=sort,
suffixes=suffixes,
copy=copy,
indicator=indicator,
validate=validate,
)
)

is_reset_index = True
if left_on and right_on:
left_on = left_on if is_list_like(left_on) else [left_on]
right_on = right_on if is_list_like(right_on) else [right_on]
is_reset_index = (
False
if any(o in self.index.names for o in left_on)
and any(o in right.index.names for o in right_on)
else True
)
if sort:
result = (
result.sort_values(left_on.append(right_on))
if is_reset_index
else result.sort_index(axis=0, level=left_on.append(right_on))
)
if on:
on = on if is_list_like(on) else [on]
is_reset_index = not any(
o in self.index.names and o in right.index.names for o in on
)
if sort:
result = (
result.sort_values(on)
if is_reset_index
else result.sort_index(axis=0, level=on)
)

return result.reset_index(drop=True) if is_reset_index else result

return self.__constructor__(
query_compiler=self._query_compiler.default_to_pandas(
pandas.DataFrame.merge,
query_compiler=self._query_compiler.merge(
right._query_compiler,
how=how,
on=on,
Expand Down
9 changes: 9 additions & 0 deletions modin/pandas/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5777,6 +5777,15 @@ def test_merge(self, test_data, test_data2):
)
df_equals(modin_result, pandas_result)

# Test for issue #1771
modin_df = pd.DataFrame({"name": np.arange(40)})
modin_df2 = pd.DataFrame({"name": [39], "position": [0]})
pandas_df = pandas.DataFrame({"name": np.arange(40)})
pandas_df2 = pandas.DataFrame({"name": [39], "position": [0]})
modin_result = modin_df.merge(modin_df2, on="name", how="inner")
pandas_result = pandas_df.merge(pandas_df2, on="name", how="inner")
df_equals(modin_result, pandas_result)

frame_data = {
"col1": [0, 1, 2, 3],
"col2": [4, 5, 6, 7],
Expand Down

0 comments on commit 8658f09

Please sign in to comment.