Skip to content

[FEA] Dask support for groupby correlation and covariance #10294

Open
@beckernick

Description

@beckernick

Today, cuDF Python supports groupby.corr and has a PR in review for groupby.cov. I'd like to be able to use these with Dask on GPUs, like I can with Dask on CPUs (note that the distributed algorithm doesn't actually rely on these primitives).

The Dask implementation looks to have some challenges to running with a cuDF backend, including:

  • Hardcoded pandas return types in some functions
  • Passing arguments to groupby.apply (which cuDF doesn't currently support)
  • Using groupby.apply(arbitrary_func), which can be slow in cuDF
import dask.dataframe as dd
import pandas as pd
import dask_cudf
import cudfdf = pd.DataFrame({
    "a": [0,0,1,0,0,1,1,1,1,1],
    "b": [0,0.3,0,-4,0,-3,1,1,11,1],
    "c": [19,0,30,0,0,1,41,1,1,1],
})
gdf = cudf.from_pandas(df)
ddf = dd.from_pandas(df, 2)
gddf = dask_cudf.from_dask_dataframe(ddf)
​
print(ddf.groupby("a").corr().compute())
print(gddf.groupby("a").corr().compute())
            b         c
a                      
0 b  1.000000  0.300100
  c  0.300100  1.000000
1 b  1.000000 -0.200625
  c -0.200625  1.000000
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/utils.py:176, in raise_on_meta_error(funcname, udf)
    175 try:
--> 176     yield
    177 except Exception as e:

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/core.py:5984, in _emulate(func, udf, *args, **kwargs)
   5983 with raise_on_meta_error(funcname(func), udf=udf):
-> 5984     return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/groupby.py:447, in _cov_chunk(df, *by)
    445 x = g.sum()
--> 447 mul = g.apply(_mul_cols, cols=cols).reset_index(level=-1, drop=True)
    449 n = g[x.columns].count().rename(columns=lambda c: f"{c}-count")

TypeError: apply() got an unexpected keyword argument 'cols'

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
Input In [85], in <module>
     13 gddf = dask_cudf.from_dask_dataframe(ddf)
     15 print(ddf.groupby("a").corr().compute())
---> 16 print(gddf.groupby("a").corr().compute())

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/groupby.py:1475, in _GroupBy.corr(self, ddof, split_every, split_out)
   1470 @derived_from(pd.DataFrame)
   1471 def corr(self, ddof=1, split_every=None, split_out=1):
   1472     """Groupby correlation:
   1473     corr(X, Y) = cov(X, Y) / (std_x * std_y)
   1474     """
-> 1475     return self.cov(split_every=split_every, split_out=split_out, std=True)

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/groupby.py:1500, in _GroupBy.cov(self, ddof, split_every, split_out, std)
   1497         sliced_plus = list(self._slice) + list(self.by)
   1498         self.obj = self.obj[sliced_plus]
-> 1500 result = aca(
   1501     [self.obj, self.by]
   1502     if not isinstance(self.by, list)
   1503     else [self.obj] + self.by,
   1504     chunk=_cov_chunk,
   1505     aggregate=_cov_agg,
   1506     combine=_cov_combine,
   1507     token=self._token_prefix + "cov",
   1508     aggregate_kwargs={"ddof": ddof, "levels": levels, "std": std},
   1509     combine_kwargs={"levels": levels},
   1510     split_every=split_every,
   1511     split_out=split_out,
   1512     split_out_setup=split_out_on_index,
   1513     sort=self.sort,
   1514 )
   1516 if isinstance(self.obj, Series):
   1517     result = result[result.columns[0]]

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/core.py:5935, in apply_concat_apply(args, chunk, aggregate, combine, meta, token, chunk_kwargs, aggregate_kwargs, combine_kwargs, split_every, split_out, split_out_setup, split_out_setup_kwargs, sort, ignore_index, **kwargs)
   5932         dsk[(b, j)] = (aggregate, conc)
   5934 if meta is no_default:
-> 5935     meta_chunk = _emulate(chunk, *args, udf=True, **chunk_kwargs)
   5936     meta = _emulate(
   5937         aggregate, _concat([meta_chunk], ignore_index), udf=True, **aggregate_kwargs
   5938     )
   5939 meta = make_meta(
   5940     meta,
   5941     index=(getattr(make_meta(dfs[0]), "index", None) if dfs else None),
   5942     parent_meta=dfs[0]._meta,
   5943 )

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/core.py:5984, in _emulate(func, udf, *args, **kwargs)
   5979 """
   5980 Apply a function using args / kwargs. If arguments contain dd.DataFrame /
   5981 dd.Series, using internal cache (``_meta``) for calculation
   5982 """
   5983 with raise_on_meta_error(funcname(func), udf=udf):
-> 5984     return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))

File ~/conda/envs/rapids-22.04/lib/python3.8/contextlib.py:131, in _GeneratorContextManager.__exit__(self, type, value, traceback)
    129     value = type()
    130 try:
--> 131     self.gen.throw(type, value, traceback)
    132 except StopIteration as exc:
    133     # Suppress StopIteration *unless* it's the same exception that
    134     # was passed to throw().  This prevents a StopIteration
    135     # raised inside the "with" statement from being suppressed.
    136     return exc is not value

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/utils.py:197, in raise_on_meta_error(funcname, udf)
    188 msg += (
    189     "Original error is below:\n"
    190     "------------------------\n"
   (...)
    194     "{2}"
    195 )
    196 msg = msg.format(f" in `{funcname}`" if funcname else "", repr(e), tb)
--> 197 raise ValueError(msg) from e

ValueError: Metadata inference failed in `_cov_chunk`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
TypeError("apply() got an unexpected keyword argument 'cols'")

Traceback:
---------
  File "/home/nicholasb/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/utils.py", line 176, in raise_on_meta_error
    yield
  File "/home/nicholasb/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/core.py", line 5984, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/home/nicholasb/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/groupby.py", line 447, in _cov_chunk
    mul = g.apply(_mul_cols, cols=cols).reset_index(level=-1, drop=True)

Metadata

Metadata

Assignees

No one assigned

    Labels

    PythonAffects Python cuDF API.daskDask issuefeature requestNew feature or request

    Type

    No type

    Projects

    Status

    Todo

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions