Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add best practices page to Dask cuDF docs #16821

Merged
merged 31 commits into from
Sep 20, 2024
Merged
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f01fd71
start best practices page for dask-cudf
rjzamora Sep 16, 2024
7aa8041
revisions
rjzamora Sep 17, 2024
b2ce634
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 17, 2024
1e028ea
address code review
rjzamora Sep 18, 2024
3332717
more revisions
rjzamora Sep 18, 2024
eee37f3
more revisions
rjzamora Sep 18, 2024
7c63c7e
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 18, 2024
a425405
add from_map note on meta
rjzamora Sep 18, 2024
9233524
add note on diagnostics
rjzamora Sep 18, 2024
bd144c2
fix typos
rjzamora Sep 18, 2024
5f854e7
tweak wording
rjzamora Sep 19, 2024
397efa7
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 19, 2024
6c8771b
fix map_partitions typo
rjzamora Sep 19, 2024
f7731b8
revisions
rjzamora Sep 19, 2024
581a69f
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 19, 2024
8515cb9
fix spelling error and add link to quick-start example
rjzamora Sep 19, 2024
a23deff
replace link to readme
rjzamora Sep 19, 2024
4c1b55d
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 19, 2024
8ecd536
add a bit more info about wait and CPU-GPU data movement
rjzamora Sep 20, 2024
251bf23
Merge branch 'branch-24.10' into dask-cudf-best-practices
rjzamora Sep 20, 2024
40a638e
update
rjzamora Sep 20, 2024
d082cac
Apply suggestions from code review
rjzamora Sep 20, 2024
8152fca
Apply suggestions from code review
rjzamora Sep 20, 2024
a653a5a
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 20, 2024
91d4fd5
fix lists
rjzamora Sep 20, 2024
d58a5ce
fix func list
rjzamora Sep 20, 2024
59e597a
roll back func change
rjzamora Sep 20, 2024
adbd22d
fix more double-colon mistakes
rjzamora Sep 20, 2024
216d5de
Merge branch 'branch-24.10' into dask-cudf-best-practices
rjzamora Sep 20, 2024
d76dbd6
Apply suggestions from code review
rjzamora Sep 20, 2024
da7308a
Merge branch 'branch-24.10' into dask-cudf-best-practices
rjzamora Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
start best practices page for dask-cudf
  • Loading branch information
rjzamora committed Sep 16, 2024
commit f01fd7102cfb08ad8adc76cfc753affd394f31f6
135 changes: 135 additions & 0 deletions docs/dask_cudf/source/best_practices.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
.. _best-practices:

Dask cuDF Best Practices
========================

Deployment
----------

Use Dask-CUDA
~~~~~~~~~~~~~

In order to execute a Dask workflow on multiple GPUs, a Dask "cluster" must
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
be deployed with Dask Distributed and/or Dask-CUDA.

When running on a single machine, :func:`dask_cuda.LocalCUDACluster` is strongly
recommended. This is still true when there is only a single GPU on the machine.
No matter the device count, using Dask-CUDA has several advantages over default
(threaded) execution::

* Dask-CUDA makes it easy to pin workers to specific devices.
* Dask-CUDA makes it easy to configure memory-spilling option.
* The distributed scheduler collects useful diagnostic information that can be viewed on a dashboard in real time.

Please see `Dask-CUDA's API <https://docs.rapids.ai/api/dask-cuda/stable/>`_
and `Best Practices <https://docs.rapids.ai/api/dask-cuda/stable/examples/best-practices/>`_
documentation for detailed information.

.. note::
When running on cloud infrastructure or HPC systems, it is often best to
leverage system-specific deployment libraries like Dask Operator and Dask
Jobqueue.

Please see `RAPIDS-deployment documentation <https://docs.rapids.ai/deployment/stable/>`_
for further details and examples.

Enable cuDF Spilling
~~~~~~~~~~~~~~~~~~~~

When using Dask cuDF for classic ETL workloads, it is usually best
to enable `native spilling support in cuDF
<https://docs.rapids.ai/api/cudf/stable/developer_guide/library_design/#spilling-to-host-memory>`_.
When using :func:`LocalCUDACluster`, this is easily accomplished by
setting ``enable_cudf_spill=True``.

When a Dask cuDF workflow includes conversion between DataFrame and Array
representations, native cuDF spilling may be insufficient. For these cases,
JIT unspilling is likely to produce better protection from out-of-memory
jacobtomlinson marked this conversation as resolved.
Show resolved Hide resolved
(OOM) errors.

Please see `Dask-CUDA's spilling documentation
<https://docs.rapids.ai/api/dask-cuda/24.10/spilling/>`_ for more details.


Reading Data
------------

Tune the partition size
~~~~~~~~~~~~~~~~~~~~~~~

The ideal partition size is typically between 2-10% of the memory capacity
of a single GPU. Increasing the partition size will typically reduce the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we provide here a rule-of-thumb as to whether users should initially target more to the 2% or the 10% range, and how/when to increase/decrease that? Or is this too difficult to provide a good rule-of-thumb and the 2-10% phrasing is the best we can do? I understand it can be quite difficult to give more details for general purpose docs, so it's fine if you think the current phrasing is sufficient/best.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I attempted to turn this into the more-explicit "rule of thumb" I personally use: 1/16 or less if the workflow is memory-intensive (i.e. shuffle intensive), and 1/8 otherwise. The "best" partition size is definitely difficult to know a priori.

number of tasks in your workflow and improve the GPU utilization for each
task. However, if the partitions are too large, the risk of OOM errors can
become significant.

The best way to tune the partition size is to begin with appropriate sized
partitions when the DataFrame collection is first created with a function
like :func:`read_parquet`, :func:`read_csv`, or :func:`from_map`. Both
:func:`read_parquet` and :func:`read_csv` expose a ``blocksize`` argument
for adjusting the maximum partition size.

If the partition size cannot be tuned effectively at creation time, the
:func:`DataFrame.repartition` method can be used as a last resort.


Use Parquet files
~~~~~~~~~~~~~~~~~

`Parquet <https://parquet.apache.org/docs/file-format/>`_ is the recommended
file format for Dask cuDF. It provides efficient columnar storage and enables
Dask to perform valuable query optimizations like column projection and
predicate pushdown.

The most important arguments to :func:`read_parquet` are ``blocksize`` and
``aggregate_files``::
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

``blocksize``: Use this argument to specify the maximum partition size.
The default is `"256 MiB"`, but larger values are usually more performant
(e.g. `1 GiB` is usually safe). Dask will use the ``blocksize`` value to map
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe provide a guideline for when 1 GiB is safe. I imagine this is safe for large devices that we're usually used to work with, but given the recent NO-OOM effort I don't think a small laptop GPU will be capable of handling 1GiB safely.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to remove the 1 GiB comment since we already discuss the 1/16-1/8 "rule of thumb" above.

a discrete number of Parquet row-groups (or files) to each output partition.
This mapping will only account for the uncompressed storage size of each
row group, which is usually smaller than the correspondng ``cudf.DataFrame``.

``aggregate_files``: Use this argument to specify whether Dask is allowed
to map multiple files to the same DataFrame partition. The default is ``False``,
but ``aggregate_files=True`` is usually more performant when the dataset
contains many files that are smaller than half of ``blocksize``.

.. note::
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this note. Once we have more cloud IO specific optimizations it might make sense to add it to best practices or create a new one for cloud IO to discuss tips/tricks for those environments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that we need a lot more remote-IO information. However, it doesn't feel like there is much to say yet :/

Metadata collection can be extremely slow when reading from remote
storage (e.g. S3 and GCS). When reading many remote files that all
correspond to a reasonable partition size, it's usually best to set
`blocksize=None` and `aggregate_files=False`. In most cases, these
settings allow Dask to skip the metadata-collection stage altogether.


Use :func:`from_map`
~~~~~~~~~~~~~~~~~~~~

To implement custom DataFrame-creation logic that is not covered by
existing APIs (like :func:`read_parquet`), use :func:`dask.dataframe.from_map`
whenever possible. The :func:`from_map` API has several advantages
over :func:`from_delayed`::

* It allows proper lazy execution of your custom logic
* It enables column projection (as long as the mapped function supports a ``columns`` key-word argument)

See the `from_map API documentation <https://docs.dask.org/en/stable/generated/dask_expr.from_map.html#dask_expr.from_map>`_
for more details.


Sorting, Joining and Grouping
-----------------------------
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

* Make sure spilling is enabled
* Avoid shuffling whenever possible (e.g. use broadcast joins)
* Use ``split_out=1`` for low-cardinality groupby aggregations
* Use ``broadcast=True`` when at least one collection comprises a small number of partitions (e.g. ``>=5``).
* Use UCX if communication is a bottleneck

rjzamora marked this conversation as resolved.
Show resolved Hide resolved
User-defined functions
----------------------

* Use :func:`map_partitions`
* Select the necessary columns before/after :func:`map_partitions`