Skip to content

[BUG] Dask cuDF cummax and cummin fail due to missing axis argument in cuDF.Series.where #10104

Open
@beckernick

Description

@beckernick

Dask cuDF cummax and cummin fail due to missing axis argument in cuDF.Series.where if npartitions > 1.

import cudf
import dask_cudfdf = cudf.DataFrame({"x": range(10)})
ddf = dask_cudf.from_cudf(df, 2)
​
ddf.x.cummin().compute()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [1], in <module>
      4 df = cudf.DataFrame({"x": range(10)})
      5 ddf = dask_cudf.from_cudf(df, 2)
----> 7 ddf.x.cummin().compute()

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/base.py:288, in DaskMethodsMixin.compute(self, **kwargs)
    264 def compute(self, **kwargs):
    265     """Compute this dask collection
    266 
    267     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    286     dask.base.compute
    287     """
--> 288     (result,) = compute(self, traverse=False, **kwargs)
    289     return result

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/base.py:571, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    568     keys.append(x.__dask_keys__())
    569     postcomputes.append(x.__dask_postcompute__())
--> 571 results = schedule(dsk, keys, **kwargs)
    572 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:553, in get_sync(dsk, keys, **kwargs)
    548 """A naive synchronous version of get_async
    549 
    550 Can be useful for debugging.
    551 """
    552 kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 553 return get_async(
    554     synchronous_executor.submit,
    555     synchronous_executor._max_workers,
    556     dsk,
    557     keys,
    558     **kwargs,
    559 )

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:496, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    494 while state["waiting"] or state["ready"] or state["running"]:
    495     fire_tasks(chunksize)
--> 496     for key, res_info, failed in queue_get(queue).result():
    497         if failed:
    498             exc, tb = loads(res_info)

File ~/conda/envs/rapids-22.02/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
    435     raise CancelledError()
    436 elif self._state == FINISHED:
--> 437     return self.__get_result()
    439 self._condition.wait(timeout)
    441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/conda/envs/rapids-22.02/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
    387 if self._exception:
    388     try:
--> 389         raise self._exception
    390     finally:
    391         # Break a reference cycle with the exception in self._exception
    392         self = None

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:538, in SynchronousExecutor.submit(self, fn, *args, **kwargs)
    536 fut = Future()
    537 try:
--> 538     fut.set_result(fn(*args, **kwargs))
    539 except BaseException as e:
    540     fut.set_exception(e)

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:234, in batch_execute_tasks(it)
    230 def batch_execute_tasks(it):
    231     """
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:234, in <listcomp>(.0)
    230 def batch_execute_tasks(it):
    231     """
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:225, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    223     failed = False
    224 except BaseException as e:
--> 225     result = pack_exception(e, dumps)
    226     failed = True
    227 return key, result, failed

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:220, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    218 try:
    219     task, data = loads(task_info)
--> 220     result = _execute_task(task, data)
    221     id = get_id()
    222     result = dumps((result, id))

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/dataframe/methods.py:314, in cummin_aggregate(x, y)
    312 def cummin_aggregate(x, y):
    313     if is_series_like(x) or is_dataframe_like(x):
--> 314         return x.where((x < y) | x.isnull(), y, axis=x.ndim - 1)
    315     else:  # scalar
    316         return x if x < y else y

TypeError: where() got an unexpected keyword argument 'axis'
import cudf
import dask_cudfdf = cudf.DataFrame({"x": range(10)})
ddf = dask_cudf.from_cudf(df, 2)
​
ddf.x.cummin().compute()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [9], in <module>
      4 df = cudf.DataFrame({"x": range(10)})
      5 ddf = dask_cudf.from_cudf(df, 2)
----> 7 ddf.x.cummin().compute()

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/base.py:288, in DaskMethodsMixin.compute(self, **kwargs)
    264 def compute(self, **kwargs):
    265     """Compute this dask collection
    266 
    267     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    286     dask.base.compute
    287     """
--> 288     (result,) = compute(self, traverse=False, **kwargs)
    289     return result

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/base.py:571, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    568     keys.append(x.__dask_keys__())
    569     postcomputes.append(x.__dask_postcompute__())
--> 571 results = schedule(dsk, keys, **kwargs)
    572 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:553, in get_sync(dsk, keys, **kwargs)
    548 """A naive synchronous version of get_async
    549 
    550 Can be useful for debugging.
    551 """
    552 kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 553 return get_async(
    554     synchronous_executor.submit,
    555     synchronous_executor._max_workers,
    556     dsk,
    557     keys,
    558     **kwargs,
    559 )

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:496, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    494 while state["waiting"] or state["ready"] or state["running"]:
    495     fire_tasks(chunksize)
--> 496     for key, res_info, failed in queue_get(queue).result():
    497         if failed:
    498             exc, tb = loads(res_info)

File ~/conda/envs/rapids-22.02/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
    435     raise CancelledError()
    436 elif self._state == FINISHED:
--> 437     return self.__get_result()
    439 self._condition.wait(timeout)
    441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/conda/envs/rapids-22.02/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
    387 if self._exception:
    388     try:
--> 389         raise self._exception
    390     finally:
    391         # Break a reference cycle with the exception in self._exception
    392         self = None

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:538, in SynchronousExecutor.submit(self, fn, *args, **kwargs)
    536 fut = Future()
    537 try:
--> 538     fut.set_result(fn(*args, **kwargs))
    539 except BaseException as e:
    540     fut.set_exception(e)

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:234, in batch_execute_tasks(it)
    230 def batch_execute_tasks(it):
    231     """
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:234, in <listcomp>(.0)
    230 def batch_execute_tasks(it):
    231     """
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:225, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    223     failed = False
    224 except BaseException as e:
--> 225     result = pack_exception(e, dumps)
    226     failed = True
    227 return key, result, failed

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:220, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    218 try:
    219     task, data = loads(task_info)
--> 220     result = _execute_task(task, data)
    221     id = get_id()
    222     result = dumps((result, id))

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/dataframe/methods.py:314, in cummin_aggregate(x, y)
    312 def cummin_aggregate(x, y):
    313     if is_series_like(x) or is_dataframe_like(x):
--> 314         return x.where((x < y) | x.isnull(), y, axis=x.ndim - 1)
    315     else:  # scalar
    316         return x if x < y else y

TypeError: where() got an unexpected keyword argument 'axis'

Env:

!conda list | grep "rapids\|dask" # packages in environment at /home/nicholasb/conda/envs/rapids-22.02: cucim 22.02.00a220121 cuda_11_py38_g0e199fc_37 rapidsai-nightly cudf 22.02.00a220121 cuda_11_py38_g53a31d1b01_303 rapidsai-nightly cudf_kafka 22.02.00a220121 py38_g53a31d1b01_303 rapidsai-nightly cugraph 22.02.00a220119 cuda11_py38_gefbff09a_70 rapidsai-nightly cuml 22.02.00a220118 cuda11_py38_g592834c13_88 rapidsai-nightly cusignal 22.02.00a220121 py39_g80eadba_11 rapidsai-nightly cuspatial 22.02.00a220121 py38_g5d619e7_19 rapidsai-nightly custreamz 22.02.00a220121 py38_g53a31d1b01_303 rapidsai-nightly cuxfilter 22.02.00a220121 py38_g4287bac_13 rapidsai-nightly dask 2021.11.2 pyhd8ed1ab_0 conda-forge dask-core 2021.11.2 pyhd8ed1ab_0 conda-forge dask-cuda 22.02.00a220121 py38_49 rapidsai-nightly dask-cudf 22.02.00a220121 cuda_11_py38_g53a31d1b01_303 rapidsai-nightly libcucim 22.02.00a220121 cuda11_g0e199fc_37 rapidsai-nightly libcudf 22.02.00a220121 cuda11_g53a31d1b01_303 rapidsai-nightly libcudf_kafka 22.02.00a220121 g53a31d1b01_303 rapidsai-nightly libcugraph 22.02.00a220119 cuda11_gefbff09a_70 rapidsai-nightly libcugraph_etl 22.02.00a220121 cuda11_gc0096791_73 rapidsai-nightly libcuml 22.02.00a220118 cuda11_g592834c13_88 rapidsai-nightly libcumlprims 22.02.00a220119 cuda11_g0342bdb_15 rapidsai-nightly libcuspatial 22.02.00a220121 cuda11_g5d619e7_19 rapidsai-nightly librmm 22.02.00a220121 cuda11_g30eb83b_31 rapidsai-nightly libxgboost 1.5.0dev.rapidsai22.02 cuda11.2_0 rapidsai-nightly ptxcompiler 0.2.0 py38hb739d79_0 rapidsai-nightly py-xgboost 1.5.0dev.rapidsai22.02 cuda11.2py38_0 rapidsai-nightly pylibcugraph 22.02.00a220119 cuda11_py38_gefbff09a_70 rapidsai-nightly rapids 22.02.00a220118 cuda11_py38_g3986715_134 rapidsai-nightly rapids-xgboost 22.02.00a220118 cuda11_py38_g3986715_134 rapidsai-nightly rmm 22.02.00a220121 cuda11_py38_g30eb83b_31_has_cma rapidsai-nightly ucx 1.12.0+gd367332 cuda11.2_0 rapidsai-nightly ucx-proc 1.0.0 gpu rapidsai-nightly ucx-py 0.24.0a220121 py38_gd367332_26 rapidsai-nightly xgboost 1.5.0dev.rapidsai22.02 cuda11.2py38_0 rapidsai-nightly

Metadata

Metadata

Assignees

Labels

PythonAffects Python cuDF API.bugSomething isn't workingdaskDask issue

Type

No type

Projects

Status

Todo

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions