Skip to content

[FEA] Support dask correlation and covariance when columns include nulls #10293

Open
@beckernick

Description

@beckernick

Currently, Dask's {DataFrame, Series}.corr codepath relies on calling df.values in _cov_corr_chunk. When cudf columns have null values, we throw an error trying to convert to cupy.

I'd like to be able to do the following with Dask and have data containing nulls handled appropriately on GPUs and CPUs.

import dask.dataframe as dd
import pandas as pd
import dask_cudf
import cudfdf = pd.DataFrame({
    "a":[0,0,1,1,1,0,1],
    "b":[10,None,3,None,5,4,2],
    "c":[10,-3,3,-6,5,4,-2]
})
gdf = cudf.from_pandas(df)
ddf = dd.from_pandas(df, 2)
gddf = dask_cudf.from_dask_dataframe(ddf)
​
​
print(ddf.corr().compute())
print(gddf.corr().compute())
          a         b         c
a  1.000000 -0.644831 -0.356138
b -0.644831  1.000000  0.933122
c -0.356138  0.933122  1.000000
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Input In [51], in <module>
     13 gddf = dask_cudf.from_dask_dataframe(ddf)
     16 print(ddf.corr().compute())
---> 17 print(gddf.corr().compute())

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/base.py:288, in DaskMethodsMixin.compute(self, **kwargs)
    264 def compute(self, **kwargs):
    265     """Compute this dask collection
    266 
    267     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    286     dask.base.compute
    287     """
--> 288     (result,) = compute(self, traverse=False, **kwargs)
    289     return result

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/base.py:571, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    568     keys.append(x.__dask_keys__())
    569     postcomputes.append(x.__dask_postcompute__())
--> 571 results = schedule(dsk, keys, **kwargs)
    572 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/threaded.py:79, in get(dsk, result, cache, num_workers, pool, **kwargs)
     76     elif isinstance(pool, multiprocessing.pool.Pool):
     77         pool = MultiprocessingPoolExecutor(pool)
---> 79 results = get_async(
     80     pool.submit,
     81     pool._max_workers,
     82     dsk,
     83     result,
     84     cache=cache,
     85     get_id=_thread_get_id,
     86     pack_exception=pack_exception,
     87     **kwargs,
     88 )
     90 # Cleanup pools associated to dead threads
     91 with pools_lock:

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/local.py:507, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    505         _execute_task(task, data)  # Re-execute locally
    506     else:
--> 507         raise_exception(exc, tb)
    508 res, worker_id = loads(res_info)
    509 state["cache"][key] = res

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/local.py:315, in reraise(exc, tb)
    313 if exc.__traceback__ is not tb:
    314     raise exc.with_traceback(tb)
--> 315 raise exc

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/local.py:220, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    218 try:
    219     task, data = loads(task_info)
--> 220     result = _execute_task(task, data)
    221     id = get_id()
    222     result = dumps((result, id))

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/dask/dataframe/core.py:6440, in cov_corr_chunk(df, corr)
   6438 shape = (df.shape[1], df.shape[1])
   6439 df = df.astype("float64", copy=False)
-> 6440 sums = np.zeros_like(df.values, shape=shape)
   6441 counts = np.zeros_like(df.values, shape=shape)
   6442 for idx, col in enumerate(df):

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/frame.py:619, in Frame.values(self)
    606 @property
    607 def values(self):
    608     """
    609     Return a CuPy representation of the DataFrame.
    610 
   (...)
    617         The values of the DataFrame.
    618     """
--> 619     return self.to_cupy()

File ~/conda/envs/rapids-22.04/lib/python3.8/contextlib.py:75, in ContextDecorator.__call__.<locals>.inner(*args, **kwds)
     72 @wraps(func)
     73 def inner(*args, **kwds):
     74     with self._recreate_cm():
---> 75         return func(*args, **kwds)

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/frame.py:719, in Frame.to_cupy(self, dtype, copy, na_value)
    693 @annotate("FRAME_TO_CUPY", color="green", domain="cudf_python")
    694 def to_cupy(
    695     self,
   (...)
    698     na_value=None,
    699 ) -> cupy.ndarray:
    700     """Convert the Frame to a CuPy array.
    701 
    702     Parameters
   (...)
    717     cupy.ndarray
    718     """
--> 719     return self._to_array(
    720         (lambda col: col.values.copy())
    721         if copy
    722         else (lambda col: col.values),
    723         cupy.empty,
    724         dtype,
    725         na_value,
    726     )

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/frame.py:684, in Frame._to_array(self, get_column_values, make_empty_matrix, dtype, na_value)
    677 matrix = make_empty_matrix(
    678     shape=(len(self), ncol), dtype=dtype, order="F"
    679 )
    680 for i, col in enumerate(self._data.values()):
    681     # TODO: col.values may fail if there is nullable data or an
    682     # unsupported dtype. We may want to catch and provide a more
    683     # suitable error.
--> 684     matrix[:, i] = get_column_values_na(col)
    685 return matrix

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/frame.py:663, in Frame._to_array.<locals>.get_column_values_na(col)
    661 if na_value is not None:
    662     col = col.fillna(na_value)
--> 663 return get_column_values(col)

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/frame.py:722, in Frame.to_cupy.<locals>.<lambda>(col)
    693 @annotate("FRAME_TO_CUPY", color="green", domain="cudf_python")
    694 def to_cupy(
    695     self,
   (...)
    698     na_value=None,
    699 ) -> cupy.ndarray:
    700     """Convert the Frame to a CuPy array.
    701 
    702     Parameters
   (...)
    717     cupy.ndarray
    718     """
    719     return self._to_array(
    720         (lambda col: col.values.copy())
    721         if copy
--> 722         else (lambda col: col.values),
    723         cupy.empty,
    724         dtype,
    725         na_value,
    726     )

File ~/conda/envs/rapids-22.04/lib/python3.8/site-packages/cudf/core/column/column.py:155, in ColumnBase.values(self)
    152     return cupy.array([], dtype=self.dtype)
    154 if self.has_nulls():
--> 155     raise ValueError("Column must have no nulls.")
    157 return cupy.asarray(self.data_array_view)

ValueError: Column must have no nulls.

Environment:


conda list | grep "rapids|dask|cupy"

packages in environment at /home/nicholasb/conda/envs/rapids-22.04:

cucim 22.04.00a220215 cuda_11_py38_g12cc926_22 rapidsai-nightly
cudf 22.04.00a220215 cuda_11_py38_g8b0737d7a4_164 rapidsai-nightly
cudf_kafka 22.04.00a220215 py38_g8b0737d7a4_164 rapidsai-nightly
cugraph 22.04.00a220215 cuda11_py38_g5f971fef_53 rapidsai-nightly
cuml 22.04.00a220215 cuda11_py38_g88e41e858_62 rapidsai-nightly
cupy 9.6.0 py38h177b0fd_0 conda-forge
cusignal 22.04.00a220215 py39_gc620d82_7 rapidsai-nightly
cuspatial 22.04.00a220215 py38_gc63083c_11 rapidsai-nightly
custreamz 22.04.00a220215 py38_g8b0737d7a4_164 rapidsai-nightly
cuxfilter 22.04.00a220215 py38_g97fa691_8 rapidsai-nightly
dask 2022.1.0 pyhd8ed1ab_0 conda-forge
dask-core 2022.1.0 pyhd8ed1ab_0 conda-forge
dask-cuda 22.04.00a220215 py38_15 rapidsai-nightly
dask-cudf 22.04.00a220215 cuda_11_py38_g8b0737d7a4_164 rapidsai-nightly
dask-sql 2022.1.0 py38h578d9bd_0 conda-forge
libcucim 22.04.00a220215 cuda11_g12cc926_22 rapidsai-nightly
libcudf 22.04.00a220215 cuda11_g8b0737d7a4_164 rapidsai-nightly
libcudf_kafka 22.04.00a220215 g8b0737d7a4_164 rapidsai-nightly
libcugraph 22.04.00a220215 cuda11_g5f971fef_53 rapidsai-nightly
libcugraph_etl 22.04.00a220215 cuda11_g5f971fef_53 rapidsai-nightly
libcuml 22.04.00a220215 cuda11_g88e41e858_62 rapidsai-nightly
libcumlprims 22.04.00a220207 cuda11_gde69bdf_11 rapidsai-nightly
libcuspatial 22.04.00a220215 cuda11_gc63083c_11 rapidsai-nightly
librmm 22.04.00a220215 cuda11_g653f331_23 rapidsai-nightly
libxgboost 1.5.2dev.rapidsai22.04 cuda11.2_0 rapidsai-nightly
ptxcompiler 0.2.0 py38h98f4b32_0 rapidsai-nightly
py-xgboost 1.5.2dev.rapidsai22.04 cuda11.2py38_0 rapidsai-nightly
pylibcugraph 22.04.00a220215 cuda11_py38_g5f971fef_53 rapidsai-nightly
rapids 22.04.00a220215 cuda11_py38_g76071bf_87 rapidsai-nightly
rapids-xgboost 22.04.00a220215 cuda11_py38_g76071bf_87 rapidsai-nightly
rmm 22.04.00a220203 cuda11_py38_g0515ca4_16_has_cma rapidsai-nightly
ucx 1.12.0+gd367332 cuda11.2_0 rapidsai-nightly
ucx-proc 1.0.0 gpu rapidsai-nightly
ucx-py 0.25.00a220205 py38_gd367332_4 rapidsai-nightly
xgboost 1.5.2dev.rapidsai22.04 cuda11.2py38_0 rapidsai-nightly

Metadata

Metadata

Assignees

No one assigned

    Labels

    PythonAffects Python cuDF API.daskDask issuefeature requestNew feature or request

    Type

    No type

    Projects

    Status

    Todo

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions