Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REFACTOR-#2648: Correct uses of MapReduceFunction and metadata manipu… #2655

Merged
merged 3 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,14 +493,14 @@ def reset_index(self, **kwargs):
# we will implement a Distributed Series, and this will be returned
# instead.

def is_monotonic(self):
def is_monotonic_increasing(self):
"""Return boolean if values in the object are monotonic_increasing.

Returns
-------
bool
"""
return SeriesDefault.register(pandas.Series.is_monotonic)(self)
return SeriesDefault.register(pandas.Series.is_monotonic_increasing)(self)

def is_monotonic_decreasing(self):
"""Return boolean if values in the object are monotonic_decreasing.
Expand Down
234 changes: 41 additions & 193 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
is_list_like,
is_numeric_dtype,
is_datetime_or_timedelta_dtype,
is_scalar,
)
from pandas.core.base import DataError
from typing import Type, Callable
from collections.abc import Iterable, Container
import warnings

Expand All @@ -31,7 +29,6 @@
from modin.error_message import ErrorMessage
from modin.utils import try_cast_to_pandas, wrap_udf_function, hashable
from modin.data_management.functions import (
Function,
FoldFunction,
MapFunction,
MapReduceFunction,
Expand Down Expand Up @@ -155,35 +152,6 @@ def caller(df, *args, **kwargs):
return caller


def _numeric_only_reduce_fn(applier: Type[Function], *funcs) -> Callable:
"""
Build reduce function for statistic operations with `numeric_only` parameter.

Parameters
----------
applier: Callable
Function object to register `funcs`
*funcs: list
List of functions to register in `applier`

Returns
-------
callable
A callable function to be applied in the partitions
"""

def caller(self, *args, **kwargs):
# If `numeric_only` is None and the frame contains non-numeric columns,
# then we don't know what columns/indices will be dropped at the result
# of reduction function, and so can't preserve labels
preserve_index = kwargs.get("numeric_only", None) is not None
return applier.register(*funcs, preserve_index=preserve_index)(
self, *args, **kwargs
)

return caller


class PandasQueryCompiler(BaseQueryCompiler):
"""This class implements the logic necessary for operating on partitions
with a Pandas backend. This logic is specific to Pandas."""
Expand Down Expand Up @@ -625,46 +593,23 @@ def is_series_like(self):

# MapReduce operations

def _is_monotonic(self, func_type=None):
funcs = {
"increasing": lambda df: df.is_monotonic_increasing,
"decreasing": lambda df: df.is_monotonic_decreasing,
}

monotonic_fn = funcs.get(func_type, funcs["increasing"])

def is_monotonic_map(df):
df = df.squeeze(axis=1)
return [monotonic_fn(df), df.iloc[0], df.iloc[len(df) - 1]]

def is_monotonic_reduce(df):
df = df.squeeze(axis=1)

common_case = df[0].all()
left_edges = df[1]
right_edges = df[2]

edges_list = []
for i in range(len(left_edges)):
edges_list.extend([left_edges.iloc[i], right_edges.iloc[i]])

edge_case = monotonic_fn(pandas.Series(edges_list))
return [common_case and edge_case]
def is_monotonic_decreasing(self):
def is_monotonic_decreasing(df):
return pandas.DataFrame([df.squeeze(axis=1).is_monotonic_decreasing])

return MapReduceFunction.register(
is_monotonic_map, is_monotonic_reduce, axis=0
)(self)
return self.default_to_pandas(is_monotonic_decreasing)

def is_monotonic_decreasing(self):
return self._is_monotonic(func_type="decreasing")
def is_monotonic_increasing(self):
def is_monotonic_increasing(df):
return pandas.DataFrame([df.squeeze(axis=1).is_monotonic_increasing])

is_monotonic = _is_monotonic
return self.default_to_pandas(is_monotonic_increasing)

count = MapReduceFunction.register(pandas.DataFrame.count, pandas.DataFrame.sum)
max = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.max)
min = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.min)
sum = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.sum)
prod = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.prod)
max = MapReduceFunction.register(pandas.DataFrame.max)
min = MapReduceFunction.register(pandas.DataFrame.min)
sum = MapReduceFunction.register(pandas.DataFrame.sum)
prod = MapReduceFunction.register(pandas.DataFrame.prod)
any = MapReduceFunction.register(pandas.DataFrame.any, pandas.DataFrame.any)
all = MapReduceFunction.register(pandas.DataFrame.all, pandas.DataFrame.all)
memory_usage = MapReduceFunction.register(
Expand Down Expand Up @@ -716,60 +661,26 @@ def value_counts(self, **kwargs):
-------
PandasQueryCompiler
"""
if kwargs.get("bins", None) is not None:
new_modin_frame = self._modin_frame._apply_full_axis(
0, lambda df: df.squeeze(axis=1).value_counts(**kwargs)
)
return self.__constructor__(new_modin_frame)

def map_func(df, *args, **kwargs):
def value_counts(df):
return df.squeeze(axis=1).value_counts(**kwargs).to_frame()

def reduce_func(df, *args, **kwargs):
normalize = kwargs.get("normalize", False)
sort = kwargs.get("sort", True)
ascending = kwargs.get("ascending", False)
dropna = kwargs.get("dropna", True)

try:
result = (
df.squeeze(axis=1)
.groupby(df.index, sort=False, dropna=dropna)
.sum()
)
# This will happen with Arrow buffer read-only errors. We don't want to copy
# all the time, so this will try to fast-path the code first.
except (ValueError):
result = (
df.copy()
.squeeze(axis=1)
.groupby(df.index, sort=False, dropna=dropna)
.sum()
)

if normalize:
result = result / df.squeeze(axis=1).sum()

return result.sort_values(ascending=ascending) if sort else result

return MapReduceFunction.register(
map_func, reduce_func, axis=0, preserve_index=False
)(self, **kwargs)
return self.default_to_pandas(value_counts)

# END MapReduce operations

# Reduction operations
idxmax = ReductionFunction.register(pandas.DataFrame.idxmax)
idxmin = ReductionFunction.register(pandas.DataFrame.idxmin)
median = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.median)
median = ReductionFunction.register(pandas.DataFrame.median)
nunique = ReductionFunction.register(pandas.DataFrame.nunique)
skew = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.skew)
kurt = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.kurt)
sem = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.sem)
std = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.std)
var = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.var)
sum_min_count = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.sum)
prod_min_count = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.prod)
skew = ReductionFunction.register(pandas.DataFrame.skew)
kurt = ReductionFunction.register(pandas.DataFrame.kurt)
sem = ReductionFunction.register(pandas.DataFrame.sem)
std = ReductionFunction.register(pandas.DataFrame.std)
var = ReductionFunction.register(pandas.DataFrame.var)
sum_min_count = ReductionFunction.register(pandas.DataFrame.sum)
prod_min_count = ReductionFunction.register(pandas.DataFrame.prod)
quantile_for_single_value = ReductionFunction.register(pandas.DataFrame.quantile)
mad = ReductionFunction.register(pandas.DataFrame.mad)
to_datetime = ReductionFunction.register(
Expand Down Expand Up @@ -1350,83 +1261,13 @@ def searchsorted(self, **kwargs):
PandasQueryCompiler
"""

def map_func(part, *args, **kwargs):

elements_number = len(part.index)
assert elements_number > 0, "Wrong mapping behaviour of MapReduce"

# unify value type
value = kwargs.pop("value")
value = np.array([value]) if is_scalar(value) else value

if elements_number == 1:
part = part[part.columns[0]]
else:
part = part.squeeze()

part_index_start = part.index.start
part_index_stop = part.index.stop

result = part.searchsorted(value=value, *args, **kwargs)

processed_results = {}
value_number = 0
for value_result in result:
value_result += part_index_start

if value_result > part_index_start and value_result < part_index_stop:
processed_results[f"value{value_number}"] = {
"relative_location": "current_partition",
"index": value_result,
}
elif value_result <= part_index_start:
processed_results[f"value{value_number}"] = {
"relative_location": "previoius_partitions",
"index": part_index_start,
}
else:
processed_results[f"value{value_number}"] = {
"relative_location": "next_partitions",
"index": part_index_stop,
}

value_number += 1

return pandas.DataFrame(processed_results)

def reduce_func(map_results, *args, **kwargs):
def get_value_index(value_result):
value_result_grouped = value_result.groupby(level=0)
rel_location = value_result_grouped.get_group("relative_location")
ind = value_result_grouped.get_group("index")
# executes if result is inside of the mapped part
if "current_partition" in rel_location.values:
assert (
rel_location[rel_location == "current_partition"].count() == 1
), "Each value should have single result"
return ind[rel_location.values == "current_partition"]
# executes if result is between mapped parts
elif rel_location.nunique(dropna=False) > 1:
return ind[rel_location.values == "previoius_partitions"][0]
# executes if result is outside of the mapped part
else:
if "next_partitions" in rel_location.values:
return ind[-1]
else:
return ind[0]

map_results_parsed = map_results.apply(
lambda ser: get_value_index(ser)
).squeeze()

if isinstance(map_results_parsed, pandas.Series):
map_results_parsed = map_results_parsed.to_list()

return pandas.Series(map_results_parsed)
def searchsorted(df):
result = df.squeeze(axis=1).searchsorted(**kwargs)
if not is_list_like(result):
result = [result]
return pandas.DataFrame(result)

return MapReduceFunction.register(map_func, reduce_func, preserve_index=False)(
self, **kwargs
)
return self.default_to_pandas(searchsorted)

# Dt map partitions operations

Expand Down Expand Up @@ -1456,12 +1297,19 @@ def get_value_index(value_result):
dt_is_leap_year = MapFunction.register(_dt_prop_map("is_leap_year"))
dt_daysinmonth = MapFunction.register(_dt_prop_map("daysinmonth"))
dt_days_in_month = MapFunction.register(_dt_prop_map("days_in_month"))
dt_tz = MapReduceFunction.register(
_dt_prop_map("tz"), lambda df: pandas.DataFrame(df.iloc[0]), axis=0
)
dt_freq = MapReduceFunction.register(
_dt_prop_map("freq"), lambda df: pandas.DataFrame(df.iloc[0]), axis=0
)

def dt_tz(self):
def datetime_tz(df):
return pandas.DataFrame([df.squeeze(axis=1).dt.tz])

return self.default_to_pandas(datetime_tz)

def dt_freq(self):
def datetime_freq(df):
return pandas.DataFrame([df.squeeze(axis=1).dt.freq])

return self.default_to_pandas(datetime_freq)

dt_to_period = MapFunction.register(_dt_func_map("to_period"))
dt_to_pydatetime = MapFunction.register(_dt_func_map("to_pydatetime"))
dt_tz_localize = MapFunction.register(_dt_func_map("tz_localize"))
Expand Down
Loading