Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add best practices page to Dask cuDF docs #16821

Merged
merged 31 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f01fd71
start best practices page for dask-cudf
rjzamora Sep 16, 2024
7aa8041
revisions
rjzamora Sep 17, 2024
b2ce634
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 17, 2024
1e028ea
address code review
rjzamora Sep 18, 2024
3332717
more revisions
rjzamora Sep 18, 2024
eee37f3
more revisions
rjzamora Sep 18, 2024
7c63c7e
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 18, 2024
a425405
add from_map note on meta
rjzamora Sep 18, 2024
9233524
add note on diagnostics
rjzamora Sep 18, 2024
bd144c2
fix typos
rjzamora Sep 18, 2024
5f854e7
tweak wording
rjzamora Sep 19, 2024
397efa7
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 19, 2024
6c8771b
fix map_partitions typo
rjzamora Sep 19, 2024
f7731b8
revisions
rjzamora Sep 19, 2024
581a69f
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 19, 2024
8515cb9
fix spelling error and add link to quick-start example
rjzamora Sep 19, 2024
a23deff
replace link to readme
rjzamora Sep 19, 2024
4c1b55d
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 19, 2024
8ecd536
add a bit more info about wait and CPU-GPU data movement
rjzamora Sep 20, 2024
251bf23
Merge branch 'branch-24.10' into dask-cudf-best-practices
rjzamora Sep 20, 2024
40a638e
update
rjzamora Sep 20, 2024
d082cac
Apply suggestions from code review
rjzamora Sep 20, 2024
8152fca
Apply suggestions from code review
rjzamora Sep 20, 2024
a653a5a
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 20, 2024
91d4fd5
fix lists
rjzamora Sep 20, 2024
d58a5ce
fix func list
rjzamora Sep 20, 2024
59e597a
roll back func change
rjzamora Sep 20, 2024
adbd22d
fix more double-colon mistakes
rjzamora Sep 20, 2024
216d5de
Merge branch 'branch-24.10' into dask-cudf-best-practices
rjzamora Sep 20, 2024
d76dbd6
Apply suggestions from code review
rjzamora Sep 20, 2024
da7308a
Merge branch 'branch-24.10' into dask-cudf-best-practices
rjzamora Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 201 additions & 0 deletions docs/dask_cudf/source/best_practices.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
.. _best-practices:

Dask cuDF Best Practices
========================

This page outlines several important guidelines for using Dask cuDF
effectively.

.. note::
Since Dask cuDF is a backend extension for
`Dask DataFrame <https://docs.dask.org/en/stable/dataframe.html>`__,
the guidelines discussed in the `Dask DataFrames Best Practices
<https://docs.dask.org/en/stable/dataframe-best-practices.html>`__
documentation also apply to Dask cuDF (excluding any pandas-specific
details).


Deployment and Configuration
----------------------------

Use Dask-CUDA
~~~~~~~~~~~~~

In order to execute a Dask workflow on multiple GPUs, a Dask cluster must
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
be deployed with `Dask-CUDA <https://docs.rapids.ai/api/dask-cuda/stable/>`__
and/or `Dask.distributed <https://distributed.dask.org/en/stable/>`__.
jacobtomlinson marked this conversation as resolved.
Show resolved Hide resolved

When running on a single machine, the `LocalCUDACluster <https://docs.rapids.ai/api/dask-cuda/stable/api/#dask_cuda.LocalCUDACluster>`__
convenience function is strongly recommended. No matter how many GPUs are
available on the machine (even one!), using `Dask-CUDA has many advantages
<https://docs.rapids.ai/api/dask-cuda/stable/#motivation>`__
over default (threaded) execution. Just to list a few::

* Dask-CUDA makes it easy to pin workers to specific devices.
* Dask-CUDA makes it easy to configure memory-spilling options.
* The distributed scheduler collects useful diagnostic information that can be viewed on a dashboard in real time.
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

Please see `Dask-CUDA's API <https://docs.rapids.ai/api/dask-cuda/stable/>`__
and `Best Practices <https://docs.rapids.ai/api/dask-cuda/stable/examples/best-practices/>`__
documentation for detailed information.

.. note::
When running on cloud infrastructure or HPC systems, it is usually best to
leverage system-specific deployment libraries like `Dask Operator
<https://docs.dask.org/en/latest/deploying-kubernetes.html>`__ and `Dask-Jobqueue
<https://jobqueue.dask.org/en/latest/>`__.

Please see `RAPIDS-deployment documentation <https://docs.rapids.ai/deployment/stable/>`__
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
for further details and examples.

Enable cuDF Spilling
~~~~~~~~~~~~~~~~~~~~

When using Dask cuDF for classic ETL workloads, it is usually best
to enable `native spilling support in cuDF
<https://docs.rapids.ai/api/cudf/stable/developer_guide/library_design/#spilling-to-host-memory>`__.
When using :func:`LocalCUDACluster`, this is easily accomplished by
setting ``enable_cudf_spill=True``.

When a Dask cuDF workflow includes conversion between DataFrame and Array
representations, native cuDF spilling may be insufficient. For these cases,
JIT unspilling is likely to produce better protection from out-of-memory
jacobtomlinson marked this conversation as resolved.
Show resolved Hide resolved
(OOM) errors. Please see `Dask-CUDA's spilling documentation
<https://docs.rapids.ai/api/dask-cuda/24.10/spilling/>`__ for further details
and guidance.

Use RMM
~~~~~~~

Memory allocations in cuDF are significantly faster and more efficient when
the `RAPIDS Memory Manager (RMM) <https://docs.rapids.ai/api/rmm/stable/>`__
library is used on worker processes. In most cases, the best way to manage
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
memory is by initializing an RMM pool on each worker before executing a
workflow. When using :func:`LocalCUDACluster`, this is easily accomplished
by setting ``rmm_pool_size`` to a large fraction (e.g. ``0.9``).
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved

See the `Dask-CUDA memory-management documentation
<https://docs.rapids.ai/api/dask-cuda/nightly/examples/best-practices/#gpu-memory-management>`__
for more details.

Use the Dask DataFrame API
~~~~~~~~~~~~~~~~~~~~~~~~~~
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

Although Dask cuDF provides a public ``dask_cudf`` Python module, we
strongly recommended that you use the CPU/GPU portable ``dask.dataframe``
API instead. Simply use the `Dask configuration <dask:configuration>`__
system to set the ``"dataframe.backend"`` option to ``"cudf"``, and
the ``dask_cudf`` module will be imported and used implicitly.
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

Reading Data
------------

Tune the partition size
~~~~~~~~~~~~~~~~~~~~~~~

The ideal partition size is usually between 1/16 and 1/8 the memory
jacobtomlinson marked this conversation as resolved.
Show resolved Hide resolved
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
capacity of a single GPU. Increasing the partition size will typically
reduce the number of tasks in your workflow and improve the GPU utilization
for each task. However, if the partitions are too large, the risk of OOM
errors can become significant.

.. note::
As a general rule of thumb, aim for 1/16 in shuffle-intensive workflows
(e.g. large-scale sorting and joining), and 1/8 otherwise. For pathologically
skewed data distributions, it may be necessary to target 1/32 or smaller.
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

The easiest way to tune the partition size is when the DataFrame collection
is first created by a function like :func:`read_parquet`, :func:`read_csv`,
or :func:`from_map`. For example, both :func:`read_parquet` and :func:`read_csv`
expose a ``blocksize`` argument for adjusting the maximum partition size.

If the partition size cannot be tuned effectively at creation time, the
`repartition <https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.repartition.html>`__
method can be used as a last resort.


Use Parquet
~~~~~~~~~~~

`Parquet <https://parquet.apache.org/docs/file-format/>`__ is the recommended
file format for Dask cuDF. It provides efficient columnar storage and enables
Dask to perform valuable query optimizations like column projection and
predicate pushdown.

The most important arguments to :func:`read_parquet` are ``blocksize`` and
``aggregate_files``::
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

``blocksize``: Use this argument to specify the maximum partition size.
The default is `"256 MiB"`, but larger values are usually more performant
on GPUs with more than 8 GiB of memory. Dask will use the ``blocksize``
value to map a discrete number of Parquet row-groups (or files) to each
output partition. This mapping will only account for the uncompressed
storage size of each row group, which is usually smaller than the
correspondng ``cudf.DataFrame``.

``aggregate_files``: Use this argument to specify whether Dask is allowed
to map multiple files to the same DataFrame partition. The default is
``False``, but ``aggregate_files=True`` is usually more performant when
the dataset contains many files that are smaller than half of ``blocksize``.

.. note::
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this note. Once we have more cloud IO specific optimizations it might make sense to add it to best practices or create a new one for cloud IO to discuss tips/tricks for those environments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that we need a lot more remote-IO information. However, it doesn't feel like there is much to say yet :/

Metadata collection can be extremely slow when reading from remote
storage (e.g. S3 and GCS). When reading many remote files that all
correspond to a reasonable partition size, it's usually best to set
`blocksize=None` and `aggregate_files=False`. In most cases, these
settings allow Dask to skip the metadata-collection stage altogether.


Use :func:`from_map`
~~~~~~~~~~~~~~~~~~~~

To implement custom DataFrame-creation logic that is not covered by
existing APIs (like :func:`read_parquet`), use :func:`dask.dataframe.from_map`
whenever possible. The :func:`from_map` API has several advantages
over :func:`from_delayed`::

* It allows proper lazy execution of your custom logic
* It enables column projection (as long as the mapped function supports a ``columns`` key-word argument)

See the `from_map API documentation <https://docs.dask.org/en/stable/generated/dask_expr.from_map.html#dask_expr.from_map>`__
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
for more details.


Sorting, Joining and Grouping
-----------------------------
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

Sorting, joining and grouping operations all have the potential to
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
require the global shuffling of data between distinct partitions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also applies to repartition, no? Maybe we should mention the same arguments here or point https://github.com/rapidsai/cudf/pull/16821/files#diff-1c3b287013ea5f3b56726f0b7e7538bd0242e8cadcbd89e966a82d3d36719317R93-R95 to here as well to make it more explicit where users are dealing with in those cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. Repartition does not really require data "shuffling". Data shuffling requires "all-to-all", while repartitioning is usually limited to data movement between neighboring partitions.

When the initial data fits comfortably in global GPU memory, these
"all-to-all" operations are typically bound by worker-to-worker
communication. When the data is larger than global GPU memory, the
bottleneck is typically device-to-host memory spilling.

Although every workflow is different, the following guidelines
are often recommended::

* `Use a distributed cluster with Dask-CUDA workers <Use Dask-CUDA>`_
* `Use native cuDF spilling whenever possible <Enable cuDF Spilling>`_
* Avoid shuffling whenever possible
* Use ``split_out=1`` for low-cardinality groupby aggregations
* Use ``broadcast=True`` for joins when at least one collection comprises a small number of partitions (e.g. ``<=5``)
* `Use UCX <https://docs.rapids.ai/api/dask-cuda/nightly/examples/ucx/>`__ if communication is a bottleneck
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

rjzamora marked this conversation as resolved.
Show resolved Hide resolved
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

rjzamora marked this conversation as resolved.
Show resolved Hide resolved
User-defined functions
----------------------

Most real-world Dask DataFrame workflows use `map_partitions
<https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html>`__
to map user-defined functions across every partition of the underlying data.
This API is a fantastic way to apply custom operations in an intuitive and
scalable way. With that said, the :func:`map_partitions` method will produce
in an opaque DataFrame expression that blocks the query-planning `optimizer
<https://docs.dask.org/en/stable/dataframe-optimizer.html>`__ from performing
useful optimizations (like projection and filter pushdown).

Since column-projection pushdown is often the most important optimization,
you can mitigate the loss of these optimizations by explicitly selecting
the necessary columns both before and after calling :func:`map_partitions`.
Adding explicit filter operations may further mitigate the loss of filter
pushdown.
12 changes: 8 additions & 4 deletions docs/dask_cudf/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ as the ``"cudf"`` dataframe backend for
.. note::
Neither Dask cuDF nor Dask DataFrame provide support for multi-GPU
or multi-node execution on their own. You must also deploy a
`dask.distributed <https://distributed.dask.org/en/stable/>` cluster
`dask.distributed <https://distributed.dask.org/en/stable/>`__ cluster
to leverage multiple GPUs. We strongly recommend using `Dask-CUDA
<https://docs.rapids.ai/api/dask-cuda/stable/>`__ to simplify the
setup of the cluster, taking advantage of all features of the GPU
Expand All @@ -29,14 +29,18 @@ minutes to Dask
by `10 minutes to cuDF and Dask cuDF
<https://docs.rapids.ai/api/cudf/stable/user_guide/10min.html>`__.

After reviewing the sections below, please see the
:ref:`Best Practices <best-practices>` page for further guidance on
using Dask cuDF effectively.


Using Dask cuDF
---------------

The Dask DataFrame API (Recommended)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Simply use the `Dask configuration <dask:configuration>` system to
Simply use the `Dask configuration <dask:configuration>`__ system to
set the ``"dataframe.backend"`` option to ``"cudf"``. From Python,
this can be achieved like so::

Expand Down Expand Up @@ -112,8 +116,8 @@ performance benefit over the CPU/GPU-portable ``dask.dataframe`` API.
Also, using some parts of the explicit API are incompatible with
automatic query planning (see the next section).

The explicit Dask cuDF API
~~~~~~~~~~~~~~~~~~~~~~~~~~
Query Planning
~~~~~~~~~~~~~~

Dask cuDF now provides automatic query planning by default (RAPIDS 24.06+).
As long as the ``"dataframe.query-planning"`` configuration is set to
Expand Down
1 change: 1 addition & 0 deletions python/dask_cudf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ See the [RAPIDS install page](https://docs.rapids.ai/install) for the most up-to
## Resources

- [Dask cuDF documentation](https://docs.rapids.ai/api/dask-cudf/stable/)
- [Best practices](https://docs.rapids.ai/api/dask-cudf/stable/best_practices/)
- [cuDF documentation](https://docs.rapids.ai/api/cudf/stable/)
- [10 Minutes to cuDF and Dask cuDF](https://docs.rapids.ai/api/cudf/stable/user_guide/10min/)
- [Dask-CUDA documentation](https://docs.rapids.ai/api/dask-cuda/stable/)
Expand Down
Loading