Skip to content

Commit

Permalink
[train] Add documentation for using metadata argument to save preproc…
Browse files Browse the repository at this point in the history
…essors (#38701)
  • Loading branch information
ericl authored Aug 29, 2023
1 parent 59515e1 commit 02d9a04
Showing 1 changed file with 28 additions and 11 deletions.
39 changes: 28 additions & 11 deletions doc/source/train/user-guides/data-loading-preprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -527,10 +527,11 @@ You can use this with Ray Train Trainers by applying them on the dataset before
.. testcode::

import numpy as np
from tempfile import TemporaryDirectory

import ray
from ray import train
from ray.train import ScalingConfig
from ray.train import Checkpoint, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.data.preprocessors import Concatenator, StandardScaler

Expand All @@ -540,31 +541,47 @@ You can use this with Ray Train Trainers by applying them on the dataset before
scaler = StandardScaler(columns=["mean radius", "mean texture"])
concatenator = Concatenator(exclude=["target"], dtype=np.float32)

# Apply both preprocessors. They will be applied lazily.
# Compute dataset statistics and get transformed datasets. Note that the
# fit call is executed immediately, but the transformation is lazy.
dataset = scaler.fit_transform(dataset)
dataset = concatenator.fit_transform(dataset)

def train_loop_per_worker():
context = train.get_context()
print(context.get_metadata()) # prints {"preprocessor_pkl": ...}

# Get an iterator to the dataset we passed in below.
it = train.get_dataset_shard("train")
for _ in range(2):
# Prefetch 10 batches at a time.
for batch in it.iter_batches(batch_size=128, prefetch_batches=10):
print("Do some training on batch", batch)

# Save a checkpoint.
with TemporaryDirectory() as temp_dir:
train.report(
{"score": 2.0},
checkpoint=Checkpoint.from_directory(temp_dir),
)

my_trainer = TorchTrainer(
train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=2),
datasets={"train": dataset},
metadata={"preprocessor_pkl": scaler.serialize()},
)
my_trainer.fit()

# Get the fitted preprocessor back from the result metadata.
metadata = my_trainer.fit().checkpoint.get_metadata()
print(StandardScaler.deserialize(metadata["preprocessor_pkl"]))


.. testoutput::
:hide:

...

In this example, we persist the fitted preprocessor using the ``Trainer(metadata={...})`` constructor argument. This arg specifies a dict that will available from ``TrainContext.get_metadata()`` and ``checkpoint.get_metadata()`` for checkpoints saved from the Trainer. This enables recreation of the fitted preprocessor for use for inference.

Performance tips
----------------
Expand Down Expand Up @@ -618,14 +635,6 @@ Transformations that you want run per-epoch, such as randomization, should go af

...

Adding CPU-only nodes to your cluster
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If you are bottlenecked on expensive CPU preprocessing and the preprocessed Dataset is too large to fit in object store memory, then the above tip doesn't work. In this case, since Ray supports heterogeneous clusters, you can add more CPU-only nodes to your cluster.

For cases where you're bottlenecked by object store memory, adding more CPU-only nodes to your cluster increases total cluster object store memory, allowing more data to be buffered in between preprocessing and training stages.

For cases where you're bottlenecked by preprocessing compute time, adding more CPU-only nodes adds more CPU cores to your cluster, further parallelizing preprocessing. If your preprocessing is still not fast enough to saturate GPUs, then add enough CPU-only nodes to :ref:`cache the preprocessed dataset <dataset_cache_performance>`.

Prefetching batches
~~~~~~~~~~~~~~~~~~~
While iterating over your dataset for training, you can increase ``prefetch_batches`` in :meth:`iter_batches <ray.data.DataIterator.iter_batches>` or :meth:`iter_torch_batches <ray.data.DataIterator.iter_torch_batches>` to further increase performance. While training on the current batch, this launches N background threads to fetch and process the next N batches.
Expand Down Expand Up @@ -665,3 +674,11 @@ For example, the following code prefetches 10 batches at a time for each trainin

...


Adding CPU-only nodes to your cluster
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If you are bottlenecked on expensive CPU preprocessing and the preprocessed Dataset is too large to fit in object store memory, then the above tip doesn't work. In this case, since Ray supports heterogeneous clusters, you can add more CPU-only nodes to your cluster.

For cases where you're bottlenecked by object store memory, adding more CPU-only nodes to your cluster increases total cluster object store memory, allowing more data to be buffered in between preprocessing and training stages.

For cases where you're bottlenecked by preprocessing compute time, adding more CPU-only nodes adds more CPU cores to your cluster, further parallelizing preprocessing. If your preprocessing is still not fast enough to saturate GPUs, then add enough CPU-only nodes to :ref:`cache the preprocessed dataset <dataset_cache_performance>`.

0 comments on commit 02d9a04

Please sign in to comment.