Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#2214, FEAT-#1233, FEAT-#1185: Add align implementation for DataFrame and Series #2242

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,26 @@ def where(self, cond, other, **kwargs):
self, cond=cond, other=other, **kwargs
)

def align(self, other, **kwargs):
"""
Align two objects on their axes with the specified join method.

Parameters
----------
other : BaseQueryCompiler
The query compiler of other.
**kwargs
See pd.DataFrame.align for more info on kwargs.

Returns
-------
(BaseQueryCompiler, BaseQueryCompiler)
Tuple of aligned query compilers.
"""
return DataFrameDefault.register(pandas.DataFrame.align)(
self, other=other, **kwargs
)

def merge(self, right, **kwargs):
"""
Merge DataFrame or named Series objects with a database-style join.
Expand Down
65 changes: 53 additions & 12 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,23 @@ def default_to_pandas(self, pandas_op, *args, **kwargs):
}

result = pandas_op(self.to_pandas(), *args, **kwargs)
if isinstance(result, pandas.Series):
if result.name is None:
result.name = "__reduced__"
result = result.to_frame()
if isinstance(result, pandas.DataFrame):
return self.from_pandas(result, type(self._modin_frame))

def compute_result(result):
if isinstance(result, pandas.Series):
if result.name is None:
result.name = "__reduced__"
result = result.to_frame()
if isinstance(result, pandas.DataFrame):
return self.from_pandas(result, type(self._modin_frame))
else:
return result

if isinstance(result, tuple) and all(
isinstance(obj, pandas.DataFrame) for obj in result
):
return tuple(compute_result(obj) for obj in result)
else:
return result
return compute_result(result)

def to_pandas(self):
return self._modin_frame.to_pandas()
Expand Down Expand Up @@ -381,6 +390,36 @@ def to_numpy(self, **kwargs):
join_type="left",
)

def align(self, other, **kwargs):
"""
Align two objects on their axes with the specified join method.

Parameters
----------
other : PandasQueryCompiler
The query compiler of other.
**kwargs
See pd.DataFrame.align for more info on kwargs.

Returns
-------
(PandasQueryCompiler, PandasQueryCompiler)
Tuple of aligned query compilers.
"""
other = other.to_pandas()

def map_func(df, other=other):
return pandas.DataFrame.align(df, other, **kwargs)

mf1, mf2 = self._modin_frame._apply_full_axis(
kwargs.get("axis"),
map_func,
wrap_func=False,
num_objs=2,
broadcast=True,
)
return tuple([self.__constructor__(mf1), self.__constructor__(mf2)])

def where(self, cond, other, **kwargs):
"""Gets values from this manager where cond is true else from other.

Expand Down Expand Up @@ -857,7 +896,7 @@ def map_func(df, resample_args=resample_args):
return val

new_modin_frame = self._modin_frame._apply_full_axis(
axis=0, func=map_func, new_columns=new_columns
0, map_func, new_columns=new_columns
)
return self.__constructor__(new_modin_frame)

Expand Down Expand Up @@ -1830,7 +1869,7 @@ def map_func(df, n=n, keep=keep, columns=columns):
new_columns = self.columns

new_modin_frame = self._modin_frame._apply_full_axis(
axis=0, func=map_func, new_columns=new_columns
0, map_func, new_columns=new_columns
)
return self.__constructor__(new_modin_frame)

Expand Down Expand Up @@ -2054,8 +2093,8 @@ def sort_index(self, **kwargs):
lambda df: df.sort_index(
axis=axis, level=level, sort_remaining=sort_remaining, **kwargs
),
new_index,
new_columns,
new_index=new_index,
new_columns=new_columns,
dtypes="copy" if axis == 0 else None,
)
return self.__constructor__(new_modin_frame)
Expand Down Expand Up @@ -2758,7 +2797,9 @@ def applyier(df, other):

result = self.__constructor__(
to_group._modin_frame.broadcast_apply_full_axis(
axis=0, func=applyier, other=keys_columns._modin_frame
keys_columns._modin_frame,
0,
applyier,
)
)

Expand Down
46 changes: 27 additions & 19 deletions modin/data_management/functions/default_methods/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,34 @@ def applyier(df, *args, **kwargs):
df = cls.frame_wrapper(df)
result = fn(df, *args, **kwargs)

if (
not isinstance(result, pandas.Series)
and not isinstance(result, pandas.DataFrame)
and func != "to_numpy"
and func != pandas.DataFrame.to_numpy
def compute_result(result):
if (
not isinstance(result, pandas.Series)
and not isinstance(result, pandas.DataFrame)
and func != "to_numpy"
and func != pandas.DataFrame.to_numpy
):
result = (
pandas.DataFrame(result)
if is_list_like(result)
else pandas.DataFrame([result])
)
if isinstance(result, pandas.Series):
if result.name is None:
result.name = "__reduced__"
result = result.to_frame()

inplace = kwargs.get("inplace", False)
if force_inplace is not None:
inplace = force_inplace
return result if not inplace else df

if isinstance(result, tuple) and all(
isinstance(obj, (pandas.DataFrame, pandas.Series)) for obj in result
):
result = (
pandas.DataFrame(result)
if is_list_like(result)
else pandas.DataFrame([result])
)
if isinstance(result, pandas.Series):
if result.name is None:
result.name = "__reduced__"
result = result.to_frame()

inplace = kwargs.get("inplace", False)
if force_inplace is not None:
inplace = force_inplace
return result if not inplace else df
return tuple(compute_result(obj) for obj in result)
else:
return compute_result(result)

return cls.build_wrapper(applyier, fn_name)

Expand Down
Loading