Open
Description
Today, cuDF Python supports groupby.corr
and has a PR in review for groupby.cov
. I'd like to be able to use these with Dask on GPUs, like I can with Dask on CPUs (note that the distributed algorithm doesn't actually rely on these primitives).
The Dask implementation looks to have some challenges to running with a cuDF backend, including:
- Hardcoded pandas return types in some functions
- Passing arguments to groupby.apply (which cuDF doesn't currently support)
- Using groupby.apply(arbitrary_func), which can be slow in cuDF
import dask.dataframe as dd
import pandas as pd
import dask_cudf
import cudf
df = pd.DataFrame({
"a": [0,0,1,0,0,1,1,1,1,1],
"b": [0,0.3,0,-4,0,-3,1,1,11,1],
"c": [19,0,30,0,0,1,41,1,1,1],
})
gdf = cudf.from_pandas(df)
ddf = dd.from_pandas(df, 2)
gddf = dask_cudf.from_dask_dataframe(ddf)
print(ddf.groupby("a").corr().compute())
print(gddf.groupby("a").corr().compute())
b c
a
0 b 1.000000 0.300100
c 0.300100 1.000000
1 b 1.000000 -0.200625
c -0.200625 1.000000
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/utils.py:176, in raise_on_meta_error(funcname, udf)
175 try:
--> 176 yield
177 except Exception as e:
File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/core.py:5984, in _emulate(func, udf, *args, **kwargs)
5983 with raise_on_meta_error(funcname(func), udf=udf):
-> 5984 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/groupby.py:447, in _cov_chunk(df, *by)
445 x = g.sum()
--> 447 mul = g.apply(_mul_cols, cols=cols).reset_index(level=-1, drop=True)
449 n = g[x.columns].count().rename(columns=lambda c: f"{c}-count")
TypeError: apply() got an unexpected keyword argument 'cols'
The above exception was the direct cause of the following exception:
ValueError Traceback (most recent call last)
Input In [85], in <module>
13 gddf = dask_cudf.from_dask_dataframe(ddf)
15 print(ddf.groupby("a").corr().compute())
---> 16 print(gddf.groupby("a").corr().compute())
File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/groupby.py:1475, in _GroupBy.corr(self, ddof, split_every, split_out)
1470 @derived_from(pd.DataFrame)
1471 def corr(self, ddof=1, split_every=None, split_out=1):
1472 """Groupby correlation:
1473 corr(X, Y) = cov(X, Y) / (std_x * std_y)
1474 """
-> 1475 return self.cov(split_every=split_every, split_out=split_out, std=True)
File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/groupby.py:1500, in _GroupBy.cov(self, ddof, split_every, split_out, std)
1497 sliced_plus = list(self._slice) + list(self.by)
1498 self.obj = self.obj[sliced_plus]
-> 1500 result = aca(
1501 [self.obj, self.by]
1502 if not isinstance(self.by, list)
1503 else [self.obj] + self.by,
1504 chunk=_cov_chunk,
1505 aggregate=_cov_agg,
1506 combine=_cov_combine,
1507 token=self._token_prefix + "cov",
1508 aggregate_kwargs={"ddof": ddof, "levels": levels, "std": std},
1509 combine_kwargs={"levels": levels},
1510 split_every=split_every,
1511 split_out=split_out,
1512 split_out_setup=split_out_on_index,
1513 sort=self.sort,
1514 )
1516 if isinstance(self.obj, Series):
1517 result = result[result.columns[0]]
File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/core.py:5935, in apply_concat_apply(args, chunk, aggregate, combine, meta, token, chunk_kwargs, aggregate_kwargs, combine_kwargs, split_every, split_out, split_out_setup, split_out_setup_kwargs, sort, ignore_index, **kwargs)
5932 dsk[(b, j)] = (aggregate, conc)
5934 if meta is no_default:
-> 5935 meta_chunk = _emulate(chunk, *args, udf=True, **chunk_kwargs)
5936 meta = _emulate(
5937 aggregate, _concat([meta_chunk], ignore_index), udf=True, **aggregate_kwargs
5938 )
5939 meta = make_meta(
5940 meta,
5941 index=(getattr(make_meta(dfs[0]), "index", None) if dfs else None),
5942 parent_meta=dfs[0]._meta,
5943 )
File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/core.py:5984, in _emulate(func, udf, *args, **kwargs)
5979 """
5980 Apply a function using args / kwargs. If arguments contain dd.DataFrame /
5981 dd.Series, using internal cache (``_meta``) for calculation
5982 """
5983 with raise_on_meta_error(funcname(func), udf=udf):
-> 5984 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
File ~/conda/envs/rapids-22.04/lib/python3.8/contextlib.py:131, in _GeneratorContextManager.__exit__(self, type, value, traceback)
129 value = type()
130 try:
--> 131 self.gen.throw(type, value, traceback)
132 except StopIteration as exc:
133 # Suppress StopIteration *unless* it's the same exception that
134 # was passed to throw(). This prevents a StopIteration
135 # raised inside the "with" statement from being suppressed.
136 return exc is not value
File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/utils.py:197, in raise_on_meta_error(funcname, udf)
188 msg += (
189 "Original error is below:\n"
190 "------------------------\n"
(...)
194 "{2}"
195 )
196 msg = msg.format(f" in `{funcname}`" if funcname else "", repr(e), tb)
--> 197 raise ValueError(msg) from e
ValueError: Metadata inference failed in `_cov_chunk`.
You have supplied a custom function and Dask is unable to
determine the type of output that that function returns.
To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.
Original error is below:
------------------------
TypeError("apply() got an unexpected keyword argument 'cols'")
Traceback:
---------
File "/home/nicholasb/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/utils.py", line 176, in raise_on_meta_error
yield
File "/home/nicholasb/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/core.py", line 5984, in _emulate
return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
File "/home/nicholasb/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/groupby.py", line 447, in _cov_chunk
mul = g.apply(_mul_cols, cols=cols).reset_index(level=-1, drop=True)
Metadata
Metadata
Assignees
Type
Projects
Status
Todo