Open
Description
Today, Dask supports several groupby rolling operations on time frequency windows (e.g., "1 hour"), including canonical reductions like sum, max, min, mean, std, and var.
These operations are not supported with dask-cuDF. We should support these operations.
They currently fail with the following error:
import dask
import dask_cudf
ddf = dask.datasets.timeseries(freq="1H")
gddf = dask_cudf.from_dask_dataframe(ddf)
print(ddf.groupby("name").x.rolling("1D").var().head())
gddf.groupby("name").x.rolling('1D').mean().head()
name timestamp
Zelda 2000-01-01 00:00:00 NaN
Sarah 2000-01-01 01:00:00 NaN
George 2000-01-01 02:00:00 NaN
Norbert 2000-01-01 03:00:00 NaN
Wendy 2000-01-01 04:00:00 NaN
Name: x, dtype: float64
/home/nicholasb/conda/envs/cudf-22.04/lib/python3.8/site-packages/dask_cudf/groupby.py:54: FutureWarning: index is deprecated and will be removed in a future release.
by=self.index,
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Input In [16], in <module>
5 gddf = dask_cudf.from_dask_dataframe(ddf)
7 print(ddf.groupby("name").x.rolling("1D").var().head())
----> 9 gddf.groupby("name").x.rolling('1D').mean().head()
File ~/conda/envs/cudf-22.04/lib/python3.8/site-packages/dask/dataframe/rolling.py:352, in Rolling.mean(self)
350 @derived_from(pd_Rolling)
351 def mean(self):
--> 352 return self._call_method("mean")
File ~/conda/envs/cudf-22.04/lib/python3.8/site-packages/dask/dataframe/rolling.py:492, in RollingGroupby._call_method(self, method_name, *args, **kwargs)
491 def _call_method(self, method_name, *args, **kwargs):
--> 492 return super()._call_method(
493 method_name,
494 *args,
495 groupby_kwargs=self._groupby_kwargs,
496 groupby_slice=self._groupby_slice,
497 **kwargs,
498 )
File ~/conda/envs/cudf-22.04/lib/python3.8/site-packages/dask/dataframe/rolling.py:300, in Rolling._call_method(self, method_name, *args, **kwargs)
298 def _call_method(self, method_name, *args, **kwargs):
299 rolling_kwargs = self._rolling_kwargs()
--> 300 meta = self.pandas_rolling_method(
301 self.obj._meta_nonempty, rolling_kwargs, method_name, *args, **kwargs
302 )
304 if self._has_single_partition:
305 # There's no overlap just use map_partitions
306 return self.obj.map_partitions(
307 self.pandas_rolling_method,
308 rolling_kwargs,
(...)
313 **kwargs,
314 )
File ~/conda/envs/cudf-22.04/lib/python3.8/site-packages/dask/dataframe/rolling.py:488, in RollingGroupby.pandas_rolling_method(df, rolling_kwargs, name, groupby_kwargs, groupby_slice, *args, **kwargs)
486 if groupby_slice:
487 groupby = groupby[groupby_slice]
--> 488 rolling = groupby.rolling(**rolling_kwargs)
489 return getattr(rolling, name)(*args, **kwargs).sort_index(level=-1)
File ~/conda/envs/cudf-22.04/lib/python3.8/site-packages/cudf/core/groupby/groupby.py:729, in GroupBy.rolling(self, *args, **kwargs)
720 def rolling(self, *args, **kwargs):
721 """
722 Returns a `RollingGroupby` object that enables rolling window
723 calculations on the groups.
(...)
727 cudf.core.window.Rolling
728 """
--> 729 return cudf.core.window.rolling.RollingGroupby(self, *args, **kwargs)
TypeError: __init__() got an unexpected keyword argument 'win_type'
Env:
conda list | grep "rapids\|dask"
cudf 22.04.00a220131 cuda_11_py38_gc25d35b361_93 rapidsai-nightly
dask 2022.1.0 pyhd8ed1ab_0 conda-forge
dask-core 2022.1.0 pyhd8ed1ab_0 conda-forge
dask-cudf 22.04.00a220131 cuda_11_py38_gc25d35b361_93 rapidsai-nightly
libcudf 22.04.00a220131 cuda11_gc25d35b361_93 rapidsai-nightly
librmm 22.04.00a220131 cuda11_g81d523a_15 rapidsai-nightly
ptxcompiler 0.2.0 py38h98f4b32_0 rapidsai-nightly
rmm 22.04.00a220131 cuda11_py38_g81d523a_15_has_cma rapidsai-nightly
Metadata
Metadata
Assignees
Type
Projects
Status
Todo