Skip to content

vectorized groupby binary ops #5804

Closed
@dcherian

Description

@dcherian

By switching to numpy_groupies we are vectorizing our groupby reductions. I think we can do the same for groupby's binary ops.

Here's an example array

import numpy as np
import xarray as xr

%load_ext memory_profiler

N = 4 * 2000
da = xr.DataArray(
    np.random.random((N, N)),
    dims=("x", "y"),
    coords={"labels": ("x", np.repeat(["a", "b", "c", "d", "e", "f", "g", "h"], repeats=N//8))},
)

Consider this "anomaly" calculation, anomaly defined relative to the group mean

def anom_current(da):
    grouped = da.groupby("labels")
    mean = grouped.mean()
    anom = grouped - mean
    return anom

With this approach, we loop over each group and apply the binary operation:

iterators = []
for arg in args:
if isinstance(arg, GroupBy):
iterator = (value for _, value in arg)
elif hasattr(arg, "dims") and grouped_dim in arg.dims:
if isinstance(arg, Variable):
raise ValueError(
"groupby operations cannot be performed with "
"xarray.Variable objects that share a dimension with "
"the grouped dimension"
)
iterator = _iter_over_selections(arg, grouped_dim, unique_values)
else:
iterator = itertools.repeat(arg)
iterators.append(iterator)
applied = (func(*zipped_args) for zipped_args in zip(*iterators))
applied_example, applied = peek_at(applied)
combine = first_groupby._combine
if isinstance(applied_example, tuple):
combined = tuple(combine(output) for output in zip(*applied))
else:
combined = combine(applied)
return combined

This saves some memory, but becomes slow for large number of groups.

We could instead do

def anom_vectorized(da):
    mean = da.groupby("labels").mean()
    mean_expanded = mean.sel(labels=da.labels)
    anom = da - mean_expanded
    return anom

Now we are faster, but construct an extra array as big as the original array (I think this is an OK tradeoff).

%timeit anom_current(da)

# 1.4 s ± 20.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit anom_vectorized(da)

# 937 ms ± 5.26 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

(I haven't experimented with dask yet, so the following is just a theory).

I think the real benefit comes with dask. Depending on where the groups are located relative to chunking, we could end up creating a lot of tiny chunks by splitting up existing chunks. With the vectorized approach we can do better.

Ideally we would reindex the "mean" dask array with a numpy-array-of-repeated-ints such that the chunking of mean_expanded exactly matches the chunking of da along the grouped dimension.

In practice, dask.array.take doesn't allow specifying "output chunks" so we'd end up chunking "mean_expanded" based on dask's automatic heuristics, and then rechunking again for the binary operation.

Thoughts?

cc @rabernat

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions