Skip to content

Commit

Permalink
FIX-#2169: avoid unnecessary index access in groupby (#2469)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev authored Dec 11, 2020
1 parent 1bf35c9 commit 1a8cd0a
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 64 deletions.
42 changes: 28 additions & 14 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1621,49 +1621,63 @@ def has_multiindex(self, axis=0):
assert axis == 1
return isinstance(self.columns, pandas.MultiIndex)

def get_index_name(self):
def get_index_name(self, axis=0):
"""
Get index name.
Get index name of specified axis.
Parameters
----------
axis: int (default 0),
Axis to return index name on.
Returns
-------
hashable
Index name, None for MultiIndex.
"""
return self.index.name
return self.get_axis(axis).name

def set_index_name(self, name):
def set_index_name(self, name, axis=0):
"""
Set index name.
Set index name for the specified axis.
Parameters
----------
name: hashable
name: hashable,
New index name.
axis: int (default 0),
Axis to set name along.
"""
self.index.name = name
self.get_axis(axis).name = name

def get_index_names(self):
def get_index_names(self, axis=0):
"""
Get index names.
Get index names of specified axis.
Parameters
----------
axis: int (default 0),
Axis to return index names on.
Returns
-------
list
Index names.
"""
return self.index.names
return self.get_axis(axis).names

def set_index_names(self, names):
def set_index_names(self, names, axis=0):
"""
Set index names.
Set index names for the specified axis.
Parameters
----------
names: list
names: list,
New index names.
axis: int (default 0),
Axis to set names along.
"""
self.index.names = names
self.get_axis(axis).names = names

# DateTime methods

Expand Down
19 changes: 0 additions & 19 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2969,22 +2969,3 @@ def cat_codes(self):
return self.default_to_pandas(lambda df: df[df.columns[0]].cat.codes)

# END Cat operations

def has_multiindex(self, axis=0):
"""
Check if specified axis is indexed by MultiIndex.
Parameters
----------
axis : 0 or 1, default 0
The axis to check (0 - index, 1 - columns).
Returns
-------
bool
True if index at specified axis is MultiIndex and False otherwise.
"""
if axis == 0:
return isinstance(self.index, pandas.MultiIndex)
assert axis == 1
return isinstance(self.columns, pandas.MultiIndex)
22 changes: 14 additions & 8 deletions modin/experimental/backends/omnisci/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,17 +633,23 @@ def has_multiindex(self, axis=0):
assert axis == 1
return isinstance(self.columns, pandas.MultiIndex)

def get_index_name(self):
return self._modin_frame.get_index_name()
def get_index_name(self, axis=0):
return self.columns.name if axis else self._modin_frame.get_index_name()

def set_index_name(self, name):
self._modin_frame = self._modin_frame.set_index_name(name)
def set_index_name(self, name, axis=0):
if axis == 0:
self._modin_frame = self._modin_frame.set_index_name(name)
else:
self.columns.name = name

def get_index_names(self):
return self._modin_frame.get_index_names()
def get_index_names(self, axis=0):
return self.columns.names if axis else self._modin_frame.get_index_names()

def set_index_names(self, names):
self._modin_frame = self._modin_frame.set_index_names(names)
def set_index_names(self, names=None, axis=0):
if axis == 0:
self._modin_frame = self._modin_frame.set_index_names(names)
else:
self.columns.names = names

def free(self):
return
Expand Down
29 changes: 24 additions & 5 deletions modin/experimental/engines/omnisci_on_ray/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,17 @@ def run_and_compare(
**kwargs
):
def run_modin(
fn, data, data2, force_lazy, force_arrow_execute, allow_subqueries, **kwargs
fn,
data,
data2,
force_lazy,
force_arrow_execute,
allow_subqueries,
constructor_kwargs,
**kwargs
):
kwargs["df1"] = pd.DataFrame(data)
kwargs["df2"] = pd.DataFrame(data2)
kwargs["df1"] = pd.DataFrame(data, **constructor_kwargs)
kwargs["df2"] = pd.DataFrame(data2, **constructor_kwargs)
kwargs["df"] = kwargs["df1"]

if force_lazy:
Expand All @@ -76,9 +83,10 @@ def run_modin(

return exp_res

constructor_kwargs = kwargs.pop("constructor_kwargs", {})
try:
kwargs["df1"] = pandas.DataFrame(data)
kwargs["df2"] = pandas.DataFrame(data2)
kwargs["df1"] = pandas.DataFrame(data, **constructor_kwargs)
kwargs["df2"] = pandas.DataFrame(data2, **constructor_kwargs)
kwargs["df"] = kwargs["df1"]
ref_res = fn(lib=pandas, **kwargs)
except Exception as e:
Expand All @@ -90,6 +98,7 @@ def run_modin(
force_lazy=force_lazy,
force_arrow_execute=force_arrow_execute,
allow_subqueries=allow_subqueries,
constructor_kwargs=constructor_kwargs,
**kwargs
)
_ = exp_res.index
Expand All @@ -101,6 +110,7 @@ def run_modin(
force_lazy=force_lazy,
force_arrow_execute=force_arrow_execute,
allow_subqueries=allow_subqueries,
constructor_kwargs=constructor_kwargs,
**kwargs
)
df_equals(ref_res, exp_res)
Expand Down Expand Up @@ -634,6 +644,15 @@ def groupby_mean(df, cols, as_index, **kwargs):

run_and_compare(groupby_mean, data=self.data, cols=cols, as_index=as_index)

def test_groupby_lazy_multiindex(self):
index = generate_multiindex(len(self.data["a"]))

def groupby(df, *args, **kwargs):
df = df + 1
return df.groupby("a").agg({"b": "size"})

run_and_compare(groupby, data=self.data, constructor_kwargs={"index": index})

taxi_data = {
"a": [1, 1, 2, 2],
"b": [11, 21, 12, 11],
Expand Down
12 changes: 4 additions & 8 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,12 +368,9 @@ def groupby(
elif isinstance(by, str):
drop = by in self.columns
idx_name = by
if (
self._query_compiler.has_multiindex(axis=axis)
and by in self.axes[axis].names
or hasattr(self.axes[axis], "name")
and self.axes[axis].name == by
):
if self._query_compiler.has_multiindex(
axis=axis
) and by in self._query_compiler.get_index_names(axis):
# In this case we pass the string value of the name through to the
# partitions. This is more efficient than broadcasting the values.
pass
Expand Down Expand Up @@ -419,8 +416,7 @@ def groupby(
if mismatch and all(
isinstance(obj, str)
and (
obj in self
or (hasattr(self.index, "names") and obj in self.index.names)
obj in self or obj in self._query_compiler.get_index_names(axis)
)
for obj in by
):
Expand Down
21 changes: 11 additions & 10 deletions modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,15 @@ def _shift(periods, freq, axis, fill_value, is_set_nan_rows=True):
)
result.index = pandas.MultiIndex.from_arrays(
new_idx_lvl_arrays,
names=[col_name for col_name in self._by.columns] + [result.index.name],
names=[col_name for col_name in self._by.columns]
+ [result._query_compiler.get_index_name()],
)
result = result.dropna(subset=self._by.columns).sort_index()
else:
result = self._apply_agg_function(
lambda df: df.shift(periods, freq, axis, fill_value)
)
result.index.name = None
result._query_compiler.set_index_name(None)
return result

def nth(self, n, dropna=None):
Expand All @@ -240,7 +241,7 @@ def nth(self, n, dropna=None):
def cumsum(self, axis=0, *args, **kwargs):
result = self._apply_agg_function(lambda df: df.cumsum(axis, *args, **kwargs))
# pandas does not name the index on cumsum
result.index.name = None
result._query_compiler.set_index_name(None)
return result

@property
Expand All @@ -258,7 +259,7 @@ def filter(self, func, dropna=True, *args, **kwargs):
def cummax(self, axis=0, **kwargs):
result = self._apply_agg_function(lambda df: df.cummax(axis, **kwargs))
# pandas does not name the index on cummax
result.index.name = None
result._query_compiler.set_index_name(None)
return result

def apply(self, func, *args, **kwargs):
Expand Down Expand Up @@ -340,7 +341,7 @@ def __getitem__(self, key):
def cummin(self, axis=0, **kwargs):
result = self._apply_agg_function(lambda df: df.cummin(axis=axis, **kwargs))
# pandas does not name the index on cummin
result.index.name = None
result._query_compiler.set_index_name(None)
return result

def bfill(self, limit=None):
Expand Down Expand Up @@ -438,7 +439,7 @@ def mad(self, **kwargs):
def rank(self, **kwargs):
result = self._apply_agg_function(lambda df: df.rank(**kwargs))
# pandas does not name the index on rank
result.index.name = None
result._query_compiler.set_index_name(None)
return result

@property
Expand Down Expand Up @@ -597,7 +598,7 @@ def head(self, n=5):
def cumprod(self, axis=0, *args, **kwargs):
result = self._apply_agg_function(lambda df: df.cumprod(axis, *args, **kwargs))
# pandas does not name the index on cumprod
result.index.name = None
result._query_compiler.set_index_name(None)
return result

def __iter__(self):
Expand All @@ -611,7 +612,7 @@ def transform(self, func, *args, **kwargs):
lambda df: df.transform(func, *args, **kwargs)
)
# pandas does not name the index on transform
result.index.name = None
result._query_compiler.set_index_name(None)
return result

def corr(self, **kwargs):
Expand All @@ -620,7 +621,7 @@ def corr(self, **kwargs):
def fillna(self, **kwargs):
result = self._apply_agg_function(lambda df: df.fillna(**kwargs))
# pandas does not name the index on fillna
result.index.name = None
result._query_compiler.set_index_name(None)
return result

def count(self, **kwargs):
Expand All @@ -641,7 +642,7 @@ def pipe(self, func, *args, **kwargs):
def cumcount(self, ascending=True):
result = self._default_to_pandas(lambda df: df.cumcount(ascending=ascending))
# pandas does not name the index on cumcount
result.index.name = None
result._query_compiler.set_index_name(None)
return result

def tail(self, n=5):
Expand Down

0 comments on commit 1a8cd0a

Please sign in to comment.