Skip to content

Commit

Permalink
[train/docs] Extend resource guide (training backend + choosing resou…
Browse files Browse the repository at this point in the history
…rces) (ray-project#39202)

Signed-off-by: Kai Fricke <kai@anyscale.com>
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
  • Loading branch information
krfricke and matthewdeng authored Sep 8, 2023
1 parent 7cb6037 commit 8190332
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 85 deletions.
15 changes: 0 additions & 15 deletions doc/source/train/doc_code/key_concepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,6 @@ def train_fn(config):
# __session_checkpoint_end__


# __scaling_config_start__
from ray.train import ScalingConfig

scaling_config = ScalingConfig(
# Number of distributed workers.
num_workers=2,
# Turn on/off GPU.
use_gpu=True,
# Specify resources used for trainer.
trainer_resources={"CPU": 1},
# Try to schedule workers on different nodes.
placement_strategy="SPREAD",
)
# __scaling_config_end__

# __run_config_start__
from ray.train import RunConfig
from ray.air.integrations.wandb import WandbLoggerCallback
Expand Down
39 changes: 19 additions & 20 deletions doc/source/train/user-guides/checkpoints.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ directory <train-log-dir>` of each run.
Saving checkpoints
------------------

:class:`Checkpoints <ray.train.Checkpoint>` can be saved by calling ``train.report(metrics, checkpoint=Checkpoint(...))`` in the
training function. This will saves the checkpoint from the distributed workers to the ``storage_path``. The metrics here are
tied to the checkpoint and are used to filter the top k checkpoints.
:class:`Checkpoints <ray.train.Checkpoint>` can be saved by calling :func:`train.report(metrics, checkpoint=Checkpoint(...)) <ray.train.report>` in the
training function. This will saves the checkpoint from the distributed workers to the :attr:`storage_path <ray.train.RunConfig.storage_path>`.
The metrics here are tied to the checkpoint and are used to filter the top k checkpoints.

The latest saved checkpoint can be accessed through the ``checkpoint`` attribute of
the :py:class:`~ray.train.Result`, and the best saved checkpoints can be accessed by the ``best_checkpoints``
The latest saved checkpoint can be accessed through the :attr:`checkpoint <ray.train.Result.checkpoint>` attribute of
the :py:class:`~ray.train.Result`, and the best saved checkpoints can be accessed by the :attr:`checkpoint <ray.train.Result.best_checkpoints>`
attribute.

Concrete examples are provided to demonstrate how checkpoints (model weights but not models) are saved
Expand Down Expand Up @@ -96,7 +96,7 @@ appropriately in distributed training.

- collects all the logged metrics from ``trainer.callback_metrics``
- saves a checkpoint via ``trainer.save_checkpoint``
- reports to Ray Train via ``ray.train.report(metrics, checkpoint)``
- reports to Ray Train via :func:`ray.train.report(metrics, checkpoint) <ray.train.report>`

.. code-block:: python
:emphasize-lines: 2,11,20,28,29,30,31,32
Expand Down Expand Up @@ -138,8 +138,8 @@ appropriately in distributed training.
result = ray_trainer.fit()
You can always get the saved checkpoint path from ``result.checkpoint`` and
``result.best_checkpoints``.
You can always get the saved checkpoint path from :attr:`result.checkpoint <ray.train.Result.checkpoint>` and
:attr:`result.best_checkpoints <ray.train.Result.best_checkpoints>`.

For more advanced usage (e.g. reporting at different frequency, reporting
customized checkpoint files), you can implement your own customized callback.
Expand Down Expand Up @@ -184,7 +184,7 @@ appropriately in distributed training.
**Option 1: Use Ray Train's default report callback**

We provide a simple callback implementation :class:`~ray.train.huggingface.transformers.RayTrainReportCallback` that
reports on checkpoint save. You can change the checkpointing frequency by `save_strategy` and `save_steps`.
reports on checkpoint save. You can change the checkpointing frequency by ``save_strategy`` and ``save_steps``.
It collects the latest logged metrics and report them together with the latest saved checkpoint.

.. code-block:: python
Expand Down Expand Up @@ -230,11 +230,11 @@ appropriately in distributed training.
Note that :class:`~ray.train.huggingface.transformers.RayTrainReportCallback`
binds the latest metrics and checkpoints together,
so users can properly configure `logging_strategy`, `save_strategy` and `evaluation_strategy`
so users can properly configure ``logging_strategy``, ``save_strategy`` and ``evaluation_strategy``
to ensure the monitoring metric is logged at the same step as checkpoint saving.

For example, the evaluation metrics (`eval_loss` in this case) are logged during
evaluation. If users want to keep the best 3 checkpoints according to `eval_loss`, they
For example, the evaluation metrics (``eval_loss`` in this case) are logged during
evaluation. If users want to keep the best 3 checkpoints according to ``eval_loss``, they
should align the saving and evaluation frequency. Below are two examples of valid configurations:

.. code-block:: python
Expand All @@ -258,7 +258,7 @@ appropriately in distributed training.
**Option 2: Implement your customized report callback**

If you feel that Ray Train's default `RayTrainReportCallback` is not sufficient for your use case, you can also
If you feel that Ray Train's default :class:`~ray.train.huggingface.transformers.RayTrainReportCallback` is not sufficient for your use case, you can also
implement a callback yourself! Below is a example implementation that collects latest metrics
and reports on checkpoint save.

Expand Down Expand Up @@ -292,7 +292,7 @@ appropriately in distributed training.
# Clear the metrics buffer
self.metrics = {}
You can customize when(`on_save`, `on_epoch_end`, `on_evaluate`) and
You can customize when(``on_save``, ``on_epoch_end``, ``on_evaluate``) and
what(customized metrics and checkpoint files) to report by implementing your own
Transformers Trainer callback.

Expand Down Expand Up @@ -330,13 +330,12 @@ checkpoints to disk), a :py:class:`~ray.train.CheckpointConfig` can be passed in
Loading checkpoints
-------------------

Checkpoints can be loaded into the training function in 2 steps:
:class:`Checkpoints <ray.train.Checkpoint>` can be accessed in the training function with :func:`ray.train.get_checkpoint <ray.train.get_checkpoint>`.

1. From the training function, :func:`ray.train.get_checkpoint` can be used to access
the most recently saved :py:class:`~ray.train.Checkpoint`. This is useful to continue training even
if there's a worker failure.
2. The checkpoint to start training with can be bootstrapped by passing in a
:py:class:`~ray.train.Checkpoint` to ``Trainer`` as the ``resume_from_checkpoint`` argument.
The checkpoint can be populated in two ways:

1. It can be auto-populated, e.g. for :ref:`automatic failure recovery <train-fault-tolerance>` or :ref:`on manual restoration <train-restore-guide>`.
2. The checkpoint can be passed to the :class:`Trainer <ray.train.trainer.BaseTrainer>` as the ``resume_from_checkpoint`` argument.


.. tab-set::
Expand Down
9 changes: 7 additions & 2 deletions doc/source/train/user-guides/fault-tolerance.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ Ray Train has built-in fault tolerance to recover from worker failures (i.e.
``RayActorError``\s). When a failure is detected, the workers will be shut
down and new workers will be added in.

.. note:: Elastic Training is not yet supported.

The training function will be restarted, but progress from the previous execution can
be resumed through checkpointing.

Expand All @@ -32,6 +30,13 @@ passed to the ``Trainer``:
:start-after: __failure_config_start__
:end-before: __failure_config_end__

Which checkpoint will be restored?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Ray Train will automatically resume training from the latest available
:ref:`checkpoint reported to Ray Train <train-checkpointing>`.

This will be the last checkpoint passed to :func:`train.report() <ray.train.report>`.

.. _train-restore-guide:

Restore a Ray Train Experiment
Expand Down
20 changes: 18 additions & 2 deletions doc/source/train/user-guides/persistent-storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ Persistent storage
Ray Train saves results and checkpoints at a persistent storage location.
Per default, this is a local directory in ``~/ray_results``.

This default setup is sufficient for single-node setups or distributed
training without :ref:`fault tolerance <train-fault-tolerance>`.
This directory will also contain logs generated on the driver, such as CSV
and JSON logs of the result, and logs that can be read using TensorBoard.

The default setup with a local directory is sufficient for single-node setups
or distributed training without :ref:`fault tolerance <train-fault-tolerance>`.
When you want to utilize fault tolerance, require access to shared data,
or are training on spot instances, it is recommended to set up
a remote persistent storage location.
Expand All @@ -60,3 +63,16 @@ network device, such as NFS.
When configuring a persistent storage path, it is important that all nodes have
access to the location.

.. _train-ray-storage:

Automatically setting up persistent storage
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
You can control where to store training results with the ``RAY_STORAGE``
environment variable.

For instance, if you set ``RAY_STORAGE="s3://my_bucket/train_results"``, your
results will automatically persisted there per default.

If you manually set a :attr:`RunConfig.storage_path <ray.train.RunConfig.storage_path>`, it
will take precedence over the environment variable.
8 changes: 8 additions & 0 deletions doc/source/train/user-guides/results.rst
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,11 @@ that was raised.
:start-after: __result_error_start__
:end-before: __result_error_end__


Finding results on persistent storage
-------------------------------------
All training results, including reported metrics, checkpoints, and error files,
are stored on the configured :ref:`persistent storage <train-log-dir>`.

See :ref:`our persistent storage guide <train-log-dir>` to configure this location
for your training run.
109 changes: 63 additions & 46 deletions doc/source/train/user-guides/using-gpus.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,12 @@

Configuring Scale and GPUs
==========================
Increasing the scale of a Ray Train training run is simple and can often
be done in a few lines of code.

The main interface for configuring scale and resources
is the :class:`~ray.train.ScalingConfig`.

Scaling Configurations in Train (``ScalingConfig``)
---------------------------------------------------

The scaling configuration specifies distributed training properties like the number of workers or the
resources per worker.

The properties of the scaling configuration are :ref:`tunable <tune-search-space-tutorial>`.

.. literalinclude:: ../doc_code/key_concepts.py
:language: python
:start-after: __scaling_config_start__
:end-before: __scaling_config_end__

.. seealso::

See the :class:`~ray.train.ScalingConfig` API reference.
Increasing the scale of a Ray Train training run is simple and can be done in a few lines of code.
The main interface for this is the :class:`~ray.train.ScalingConfig`,
which configures the number of workers and the resources they should use.

In this guide, a *worker* refers to a Ray Train distributed training worker,
which is a :ref:`Ray Actor <actor-key-concept>` that runs your training function.

Increasing the number of workers
--------------------------------
Expand Down Expand Up @@ -57,8 +40,40 @@ run on 8 GPUs (8 workers, each using one GPU).
)
More resources
--------------
Using GPUs in the training function
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
When ``use_gpu=True`` is set, Ray Train will automatically set up environment variables
in your training function so that the GPUs can be detected and used
(e.g. ``CUDA_VISIBLE_DEVICES``).

You can get the associated devices with :meth:`ray.train.torch.get_device`.

.. code-block:: python
import torch
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer, get_device
def train_func(config):
assert torch.cuda.is_available()
device = get_device()
assert device == torch.device("cuda:0")
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=1,
use_gpu=True
)
)
trainer.fit()
Setting the resources per worker
--------------------------------
If you want to allocate more than one CPU or GPU per training worker, or if you
defined :ref:`custom cluster resources <cluster-resources>`, set
the ``resources_per_worker`` attribute:
Expand All @@ -76,8 +91,10 @@ the ``resources_per_worker`` attribute:
use_gpu=True,
)
Note that if you specify GPUs in ``resources_per_worker``, you also need to keep
``use_gpu=True``.
.. note::
If you specify GPUs in ``resources_per_worker``, you also need to set
``use_gpu=True``.

You can also instruct Ray Train to use fractional GPUs. In that case, multiple workers
will be assigned the same CUDA device.
Expand All @@ -95,43 +112,42 @@ will be assigned the same CUDA device.
use_gpu=True,
)
Using GPUs in training code
~~~~~~~~~~~~~~~~~~~~~~~~~~~
When ``use_gpu=True`` is set, Ray Train will automatically set up environment variables
in your training loop so that the GPUs can be detected and used
(e.g. ``CUDA_VISIBLE_DEVICES``).
You can get the associated devices with :meth:`ray.train.torch.get_device`.
Setting the communication backend (PyTorch)
-------------------------------------------

.. code-block:: python
.. note::

import torch
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer, get_device
This is an advanced setting. In most cases, you don't have to change this setting.

You can set the PyTorch distributed communication backend (e.g. GLOO or NCCL) by passing a
:class:`~ray.train.torch.TorchConfig` to the :class:`~ray.train.torch.TorchTrainer`.

def train_loop(config):
assert torch.cuda.is_available()
See the `PyTorch API reference <https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group>`__
for valid options.

device = get_device()
assert device == torch.device("cuda:0")
.. code-block:: python
from ray.train.torch import TorchConfig, TorchTrainer
trainer = TorchTrainer(
train_loop,
train_func,
scaling_config=ScalingConfig(
num_workers=1,
use_gpu=True
)
num_workers=num_training_workers,
use_gpu=True,
),
torch_config=TorchConfig(backend="gloo"),
)
trainer.fit()
.. _train_trainer_resources:

Trainer resources
-----------------
So far we've configured resources for each training worker. Technically, each
training worker is a :ref:`Ray Actor <actor-guide>`. Ray Train also schedules
an actor for the :class:`Trainer <ray.train.trainer.BaseTrainer>` object.
an actor for the :class:`Trainer <ray.train.trainer.BaseTrainer>` object when
you call :meth:`Trainer.fit() <ray.train.trainer.BaseTrainer.fit>`.

This object often only manages lightweight communication between the training workers.
You can still specify its resources, which can be useful if you implemented your own
Expand Down Expand Up @@ -167,3 +183,4 @@ resources to use 0 CPUs:
"CPU": 0,
}
)
17 changes: 17 additions & 0 deletions python/ray/air/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,23 @@ class ScalingConfig:
placement_strategy: The placement strategy to use for the
placement group of the Ray actors. See :ref:`Placement Group
Strategies <pgroup-strategy>` for the possible options.
Example:
.. code-block:: python
from ray.train import ScalingConfig
scaling_config = ScalingConfig(
# Number of distributed workers.
num_workers=2,
# Turn on/off GPU.
use_gpu=True,
# Specify resources used for trainer.
trainer_resources={"CPU": 1},
# Try to schedule workers on different nodes.
placement_strategy="SPREAD",
)
"""

# If adding new attributes here, please also update
Expand Down

0 comments on commit 8190332

Please sign in to comment.