Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support expression-based Dask Dataframe API #5835

Merged
merged 9 commits into from
Apr 10, 2024
4 changes: 2 additions & 2 deletions ci/test_wheel.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.

set -euo pipefail

Expand Down Expand Up @@ -27,7 +27,7 @@ else
./ci/run_cuml_singlegpu_pytests.sh \
--numprocesses=8 \
--dist=worksteal \
-k 'not test_sparse_pca_inputs' \
-k 'not test_sparse_pca_inputs and not test_fil_skl_classification' \
--junitxml="${RAPIDS_TESTS_DIR}/junit-cuml.xml"

# Run test_sparse_pca_inputs separately
Expand Down
6 changes: 5 additions & 1 deletion python/cuml/dask/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from dask import config

from cuml.dask import cluster
from cuml.dask import common
Expand All @@ -27,6 +28,9 @@
from cuml.dask import preprocessing
from cuml.dask import solvers

# Avoid "p2p" shuffling in dask for now
config.set({"dataframe.shuffle.method": "tasks"})

__all__ = [
"cluster",
"common",
Expand Down
6 changes: 3 additions & 3 deletions python/cuml/dask/common/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,7 @@

from distributed.client import Future
from functools import wraps
from dask_cudf.core import Series as dcSeries
from dask_cudf import Series as dcSeries
from cuml.internals.safe_imports import gpu_only_import_from
from cuml.internals.base import Base
from cuml.internals import BaseMetaClass
Expand All @@ -37,7 +37,7 @@


dask_cudf = gpu_only_import("dask_cudf")
dcDataFrame = gpu_only_import_from("dask_cudf.core", "DataFrame")
dcDataFrame = gpu_only_import_from("dask_cudf", "DataFrame")


class BaseEstimator(object, metaclass=BaseMetaClass):
Expand Down
6 changes: 3 additions & 3 deletions python/cuml/dask/common/input_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,7 @@
from cuml.dask.common.dask_arr_utils import validate_dask_array
from cuml.dask.common.dask_df_utils import to_dask_cudf
from cuml.dask.common.utils import get_client
from dask_cudf.core import Series as dcSeries
from dask_cudf import Series as dcSeries
from dask.dataframe import Series as daskSeries
from dask.dataframe import DataFrame as daskDataFrame
from cudf import Series
Expand All @@ -43,7 +43,7 @@


DataFrame = gpu_only_import_from("cudf", "DataFrame")
dcDataFrame = gpu_only_import_from("dask_cudf.core", "DataFrame")
dcDataFrame = gpu_only_import_from("dask_cudf", "DataFrame")


class DistributedDataHandler:
Expand Down
6 changes: 3 additions & 3 deletions python/cuml/dask/common/part_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2023, NVIDIA CORPORATION.
# Copyright (c) 2019-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@
#

from cuml.dask.common.utils import parse_host_port
from dask_cudf.core import Series as dcSeries
from dask_cudf import Series as dcSeries
from cuml.internals.safe_imports import gpu_only_import_from
from dask.dataframe import Series as daskSeries
from dask.dataframe import DataFrame as daskDataFrame
Expand All @@ -30,7 +30,7 @@
np = cpu_only_import("numpy")


dcDataFrame = gpu_only_import_from("dask_cudf.core", "DataFrame")
dcDataFrame = gpu_only_import_from("dask_cudf", "DataFrame")


def hosts_to_parts(futures):
Expand Down
11 changes: 8 additions & 3 deletions python/cuml/dask/neighbors/kneighbors_classifier.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -111,9 +111,14 @@ def fit(self, X, y):
if isinstance(y, DaskSeries):
uniq_labels.append(y.unique())
else:
n_targets = len(y.columns)
# Dask-expr does not support numerical column names
# See: https://github.com/dask/dask-expr/issues/1015
_y = y
if hasattr(y, "to_legacy_dataframe"):
_y = y.to_legacy_dataframe()
n_targets = len(_y.columns)
for i in range(n_targets):
uniq_labels.append(y.iloc[:, i].unique())
uniq_labels.append(_y.iloc[:, i].unique())

uniq_labels = da.compute(uniq_labels)[0]
if hasattr(uniq_labels[0], "values_host"): # for cuDF Series
Expand Down
8 changes: 4 additions & 4 deletions python/cuml/dask/preprocessing/LabelEncoder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,6 @@
#
from cuml.preprocessing import LabelEncoder as LE
from cuml.common.exceptions import NotFittedError
from dask_cudf.core import Series as daskSeries
from cuml.dask.common.base import BaseEstimator
from cuml.dask.common.base import DelayedTransformMixin
from cuml.dask.common.base import DelayedInverseTransformMixin
Expand All @@ -24,7 +23,8 @@
from collections.abc import Sequence
from cuml.internals.safe_imports import gpu_only_import_from

dcDataFrame = gpu_only_import_from("dask_cudf.core", "DataFrame")
dcDataFrame = gpu_only_import_from("dask_cudf", "DataFrame")
dcSeries = gpu_only_import_from("dask_cudf", "Series")


class LabelEncoder(
Expand Down Expand Up @@ -148,7 +148,7 @@ def fit(self, y):
_classes = y.unique().compute().sort_values(ignore_index=True)
el = first(y) if isinstance(y, Sequence) else y
self.datatype = (
"cudf" if isinstance(el, (dcDataFrame, daskSeries)) else "cupy"
"cudf" if isinstance(el, (dcDataFrame, dcSeries)) else "cupy"
)
self._set_internal_model(LE(**self.kwargs).fit(y, _classes=_classes))
return self
Expand Down
10 changes: 5 additions & 5 deletions python/cuml/dask/preprocessing/encoders.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -21,11 +21,11 @@
DelayedTransformMixin,
)
from cuml.internals.safe_imports import gpu_only_import_from, gpu_only_import
from dask_cudf.core import Series as daskSeries
from toolz import first

dask_cudf = gpu_only_import("dask_cudf")
dcDataFrame = gpu_only_import_from("dask_cudf.core", "DataFrame")
dcDataFrame = gpu_only_import_from("dask_cudf", "DataFrame")
dcSeries = gpu_only_import_from("dask_cudf", "Series")


class DelayedFitTransformMixin:
Expand Down Expand Up @@ -123,7 +123,7 @@ def fit(self, X):

el = first(X) if isinstance(X, Sequence) else X
self.datatype = (
"cudf" if isinstance(el, (dcDataFrame, daskSeries)) else "cupy"
"cudf" if isinstance(el, (dcDataFrame, dcSeries)) else "cupy"
)

self._set_internal_model(OneHotEncoderMG(**self.kwargs).fit(X))
Expand Down Expand Up @@ -233,7 +233,7 @@ def fit(self, X):

el = first(X) if isinstance(X, Sequence) else X
self.datatype = (
"cudf" if isinstance(el, (dcDataFrame, daskSeries)) else "cupy"
"cudf" if isinstance(el, (dcDataFrame, dcSeries)) else "cupy"
)

self._set_internal_model(OrdinalEncoderMG(**self.kwargs).fit(X))
Expand Down
Loading