Skip to content

Commit

Permalink
[train] Hard-deprecate MosaicTrainer and remove SklearnTrainer (r…
Browse files Browse the repository at this point in the history
…ay-project#42814)

This PR removes some already deprecated APIs to reduce the library surface area and remove unused/unnecessary components. (`MosaicTrainer` can be folded into `TorchTrainer`, and `SklearnTrainer` doesn't provide any value over using Tune with your own training loop.)

---------

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
  • Loading branch information
justinvyu authored Jan 31, 2024
1 parent 6b6f23a commit c13f233
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 1,172 deletions.
16 changes: 0 additions & 16 deletions python/ray/train/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,6 @@ py_test(
deps = [":train_lib"]
)

py_test(
name = "test_mosaic_trainer",
size = "medium",
srcs = ["tests/test_mosaic_trainer.py"],
tags = ["team:ml", "exclusive", "ray_air", "torch_1_11"],
deps = [":train_lib", ":conftest"]
)

py_test(
name = "test_lightgbm_predictor",
size = "small",
Expand Down Expand Up @@ -509,14 +501,6 @@ py_test(
deps = [":train_lib", ":conftest"]
)

py_test(
name = "test_sklearn_trainer",
size = "medium",
srcs = ["tests/test_sklearn_trainer.py"],
tags = ["team:ml", "exclusive", "ray_air"],
deps = [":train_lib"]
)

py_test(
name = "test_tensorflow_checkpoint",
size = "small",
Expand Down
265 changes: 20 additions & 245 deletions python/ray/train/mosaic/mosaic_trainer.py
Original file line number Diff line number Diff line change
@@ -1,255 +1,30 @@
import inspect
import warnings
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Type
from ray.util.annotations import Deprecated

from composer.loggers.logger_destination import LoggerDestination
from composer.trainer import Trainer
_DEPRECATION_MESSAGE = (
"`ray.train.mosaic.MosaicTrainer` is deprecated. "
"Use `ray.train.torch.TorchTrainer` instead. "
"See this issue for more information: "
"https://github.com/ray-project/ray/issues/42893"
)

from ray.train import Checkpoint, DataConfig, RunConfig, ScalingConfig
from ray.train.mosaic._mosaic_utils import RayLogger
from ray.train.torch import TorchConfig, TorchTrainer
from ray.train.trainer import GenDataset
from ray.util import PublicAPI

if TYPE_CHECKING:
from ray.data.preprocessor import Preprocessor


@PublicAPI(stability="alpha")
class MosaicTrainer(TorchTrainer):
"""A Trainer for data parallel Mosaic Composers on PyTorch training.
This Trainer runs the ``composer.trainer.Trainer.fit()`` method on multiple
Ray Actors. The training is carried out in a distributed fashion through PyTorch
DDP. These actors already have the necessary torch process group already
configured for distributed PyTorch training.
The training function ran on every Actor will first run the
specified ``trainer_init_per_worker`` function to obtain an instantiated
``composer.Trainer`` object. The ``trainer_init_per_worker`` function
will have access to preprocessed train and evaluation datasets.
Example:
..
TODO(yunxuan): Enable the test after we resolve the mosaicml dependency issue
.. testcode::
:skipif: True
import torch.utils.data
import torchvision
from torchvision import transforms, datasets
from composer.models.tasks import ComposerClassifier
import composer.optim
from composer.algorithms import LabelSmoothing
import ray
import ray.train as train
from ray.train import ScalingConfig
from ray.train.mosaic import MosaicTrainer
def trainer_init_per_worker(config):
# prepare the model for distributed training and wrap with
# ComposerClassifier for Composer Trainer compatibility
model = torchvision.models.resnet18(num_classes=10)
model = ComposerClassifier(ray.train.torch.prepare_model(model))
# prepare train/test dataset
mean = (0.507, 0.487, 0.441)
std = (0.267, 0.256, 0.276)
cifar10_transforms = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize(mean, std)]
)
data_directory = "~/data"
train_dataset = datasets.CIFAR10(
data_directory,
train=True,
download=True,
transform=cifar10_transforms
)
# prepare train dataloader
batch_size_per_worker = BATCH_SIZE // session.get_world_size()
train_dataloader = torch.utils.data.DataLoader(
train_dataset,
batch_size=batch_size_per_worker
)
train_dataloader = ray.train.torch.prepare_data_loader(train_dataloader)
# prepare optimizer
optimizer = composer.optim.DecoupledSGDW(
model.parameters(),
lr=0.05,
momentum=0.9,
weight_decay=2.0e-3,
)
return composer.trainer.Trainer(
model=model,
train_dataloader=train_dataloader,
optimizers=optimizer,
**config
)
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
trainer_init_config = {
"max_duration": "1ba",
"algorithms": [LabelSmoothing()],
}
trainer = MosaicTrainer(
trainer_init_per_worker=trainer_init_per_worker,
trainer_init_config=trainer_init_config,
scaling_config=scaling_config,
)
trainer.fit()
.. testoutput::
:hide:
...
Args:
trainer_init_per_worker: The function that returns an instantiated
``composer.Trainer`` object and takes in configuration
dictionary (``config``) as an argument. This dictionary is based on
``trainer_init_config`` and is modified for Ray - Composer integration.
datasets: Any Datasets to use for training. At the moment, we do not support
passing datasets to the trainer and using the dataset shards in the trainer
loop. Instead, configure and load the datasets inside
``trainer_init_per_worker`` function
trainer_init_config: Configurations to pass into ``trainer_init_per_worker`` as
kwargs. Although the kwargs can be hard-coded in the
``trainer_init_per_worker``, using the config allows the flexibility of
reusing the same worker init function while changing the trainer arguments.
For example, when hyperparameter tuning you can reuse the
same ``trainer_init_per_worker`` function with different hyperparameter
values rather than having multiple ``trainer_init_per_worker`` functions
with different hard-coded hyperparameter values.
torch_config: Configuration for setting up the PyTorch backend. If set to
None, use the default configuration. This replaces the ``backend_config``
arg of ``DataParallelTrainer``. Same as in ``TorchTrainer``.
scaling_config: Configuration for how to scale data parallel training.
dataset_config: Configuration for dataset ingest.
run_config: Configuration for the execution of the training run.
resume_from_checkpoint: A ``ray.train.Checkpoint`` to resume training from.
# TODO(justinvyu): [code_removal] Delete in Ray 2.11.
@Deprecated
class MosaicTrainer:
"""Deprecated. See this issue for more information:
https://github.com/ray-project/ray/issues/42893
"""

def __init__(
self,
trainer_init_per_worker: Callable[[Optional[Dict]], Trainer],
*,
datasets: Optional[Dict[str, GenDataset]] = None,
trainer_init_config: Optional[Dict] = None,
torch_config: Optional[TorchConfig] = None,
scaling_config: Optional[ScalingConfig] = None,
dataset_config: Optional[DataConfig] = None,
run_config: Optional[RunConfig] = None,
preprocessor: Optional["Preprocessor"] = None,
resume_from_checkpoint: Optional[Checkpoint] = None,
):

warnings.warn(
"This MosaicTrainer will be deprecated in Ray 2.8. "
"It is recommended to use the TorchTrainer instead.",
DeprecationWarning,
)

self._validate_trainer_init_per_worker(
trainer_init_per_worker, "trainer_init_per_worker"
)
def __new__(cls, *args, **kwargs):
raise DeprecationWarning(_DEPRECATION_MESSAGE)

self._validate_datasets(datasets)
self._validate_trainer_init_config(trainer_init_config)

if resume_from_checkpoint:
# TODO(ml-team): Reenable after Mosaic checkpointing is supported
raise NotImplementedError

super().__init__(
train_loop_per_worker=_mosaic_train_loop_per_worker,
train_loop_config=self._create_trainer_init_config(
trainer_init_per_worker, trainer_init_config
),
torch_config=torch_config,
scaling_config=scaling_config,
dataset_config=dataset_config,
run_config=run_config,
datasets=datasets,
preprocessor=preprocessor,
resume_from_checkpoint=resume_from_checkpoint,
)
def __init__(self, *args, **kwargs):
raise DeprecationWarning(_DEPRECATION_MESSAGE)

@classmethod
def _create_trainer_init_config(
cls,
trainer_init_per_worker: Callable[[Optional[Dict]], Trainer],
trainer_init_config: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
trainer_init_config = trainer_init_config.copy() if trainer_init_config else {}
if "_trainer_init_per_worker" in trainer_init_config:
raise ValueError(
"'_trainer_init_per_worker' is a reserved key in `trainer_init_config`."
)
trainer_init_config["_trainer_init_per_worker"] = trainer_init_per_worker
return trainer_init_config
def restore(cls, *args, **kwargs):
raise DeprecationWarning(_DEPRECATION_MESSAGE)

@classmethod
def restore(cls: Type["MosaicTrainer"], **kwargs) -> "MosaicTrainer":
# TODO(ml-team): Reenable after Mosaic checkpointing is supported
raise NotImplementedError

def _validate_trainer_init_per_worker(
self, trainer_init_per_worker: Callable, fn_name: str
) -> None:
num_params = len(inspect.signature(trainer_init_per_worker).parameters)
if num_params != 1:
raise ValueError(
f"{fn_name} should take in at most 1 argument (`config`), "
f"but it accepts {num_params} arguments instead."
)

def _validate_datasets(self, datasets) -> None:
if not (datasets is None or len(datasets) == 0):
raise ValueError(
"MosaicTrainer does not support providing dataset shards \
to `trainer_init_per_worker`. Instead of passing in the dataset into \
MosaicTrainer, define a dataloader and use `prepare_dataloader` \
inside the `trainer_init_per_worker`."
)

def _validate_trainer_init_config(self, config) -> None:
if config is not None and "loggers" in config:
warnings.warn(
"Composer's Loggers (any subclass of LoggerDestination) are \
not supported for MosaicComposer. Use Ray provided loggers instead"
)


def _mosaic_train_loop_per_worker(config):
"""Per-worker training loop for Mosaic Composers."""
trainer_init_per_worker = config.pop("_trainer_init_per_worker")

# Replace Composer's Loggers with RayLogger
ray_logger = RayLogger(keys=config.pop("log_keys", []))

# initialize Composer trainer
trainer: Trainer = trainer_init_per_worker(config)

# Remove Composer's Loggers if there are any added in the trainer_init_per_worker
# this removes the logging part of the loggers
filtered_callbacks = list()
for callback in trainer.state.callbacks:
if not isinstance(callback, LoggerDestination):
filtered_callbacks.append(callback)
filtered_callbacks.append(ray_logger)
trainer.state.callbacks = filtered_callbacks

# this prevents data to be routed to all the Composer Loggers
trainer.logger.destinations = (ray_logger,)

# call the trainer
trainer.fit()
def can_restore(cls, *args, **kwargs):
raise DeprecationWarning(_DEPRECATION_MESSAGE)
26 changes: 0 additions & 26 deletions python/ray/train/sklearn/_sklearn_utils.py

This file was deleted.

17 changes: 16 additions & 1 deletion python/ray/train/sklearn/sklearn_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,29 @@
from ray.air.util.data_batch_conversion import _unwrap_ndarray_object_type_if_needed
from ray.train.predictor import Predictor
from ray.train.sklearn import SklearnCheckpoint
from ray.train.sklearn._sklearn_utils import _set_cpu_params
from ray.util.annotations import PublicAPI
from ray.util.joblib import register_ray

if TYPE_CHECKING:
from ray.data.preprocessor import Preprocessor


# thread_count is a catboost parameter
SKLEARN_CPU_PARAM_NAMES = ["n_jobs", "thread_count"]


def _set_cpu_params(estimator: BaseEstimator, num_cpus: int) -> None:
"""Sets all CPU-related params to num_cpus (incl. nested)."""
cpu_params = {
param: num_cpus
for param in estimator.get_params(deep=True)
if any(
param.endswith(cpu_param_name) for cpu_param_name in SKLEARN_CPU_PARAM_NAMES
)
}
estimator.set_params(**cpu_params)


@PublicAPI(stability="alpha")
class SklearnPredictor(Predictor):
"""A predictor for scikit-learn compatible estimators.
Expand Down
Loading

0 comments on commit c13f233

Please sign in to comment.