Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sgd] add support for additional resources per worker #18327

Merged
merged 7 commits into from
Sep 3, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions python/ray/util/sgd/v2/backends/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ class BackendExecutor:
num_workers (int): Number of workers to use for training.
num_cpus_per_worker (float): Number of CPUs to use per worker.
num_gpus_per_worker (float): Number of GPUs to use per worker.
additional_resources_per_worker (Optional[Dict[str, float]]):
Dictionary specifying the extra resources that will be
requested for each worker in addition to ``num_cpus_per_worker``
and ``num_gpus_per_worker``.
max_retries (int): Number of retries when Ray actors fail.
Defaults to 3. Set to -1 for unlimited retries.

Expand All @@ -230,13 +234,15 @@ def __init__(
num_workers: int = 1,
num_cpus_per_worker: float = 1,
num_gpus_per_worker: float = 0,
max_retries: int = 3,
additional_resources_per_worker: Optional[Dict[str, float]] = None,
max_retries: int = 3
):
self._backend_config = backend_config
self._backend = self._backend_config.backend_cls()
self._num_workers = num_workers
self._num_cpus_per_worker = num_cpus_per_worker
self._num_gpus_per_worker = num_gpus_per_worker
self._additional_resources_per_worker = additional_resources_per_worker
self._max_failures = max_retries
if self._max_failures < 0:
self._max_failures = float("inf")
Expand All @@ -256,7 +262,8 @@ def start(self, initialization_hook: Optional[Callable[[], None]] = None):
"""Starts the worker group."""
self.worker_group = WorkerGroup(self._num_workers,
self._num_cpus_per_worker,
self._num_gpus_per_worker)
self._num_gpus_per_worker,
self._additional_resources_per_worker)
try:
if initialization_hook:
self._initialization_hook = initialization_hook
Expand Down
93 changes: 92 additions & 1 deletion python/ray/util/sgd/v2/tests/test_trainer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import time
from pathlib import Path
from unittest.mock import patch
Expand All @@ -8,7 +9,7 @@
import ray.util.sgd.v2 as sgd
import tensorflow as tf
import torch

from ray._private.test_utils import wait_for_condition
from ray.util.sgd.v2 import Trainer, TorchConfig, TensorflowConfig, \
HorovodConfig
from ray.util.sgd.v2.backends.backend import BackendConfig, Backend, \
Expand Down Expand Up @@ -50,6 +51,14 @@ def ray_start_8_cpus():
ray.shutdown()


@pytest.fixture
def ray_start_4_cpus_4_gpus_4_extra():
address_info = ray.init(num_cpus=4, num_gpus=4, resources={"extra": 4})
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()


class TestConfig(BackendConfig):
@property
def backend_cls(self):
Expand Down Expand Up @@ -910,6 +919,88 @@ def train():
assert output == [1, 1]


@pytest.mark.parametrize("resource", ["CPU", "GPU", "extra"])
@pytest.mark.parametrize("num_requested", [0.5, 1, 2])
def test_resources(ray_start_4_cpus_4_gpus_4_extra, resource, num_requested):
num_workers = 2
config = TestConfig()
original = ray.available_resources().get(resource)
resources_per_worker = {resource: num_requested}
use_gpu = resource == "GPU"
trainer = Trainer(
config,
num_workers=num_workers,
use_gpu=use_gpu,
resources_per_worker=resources_per_worker)

trainer.start()
expected = original - num_workers * num_requested
wait_for_condition(
lambda: ray.available_resources().get(resource, 0) == expected)

trainer.shutdown()
wait_for_condition(
lambda: ray.available_resources().get(resource, 0) == original)


def test_gpu_requests(ray_start_4_cpus_4_gpus_4_extra):

# GPUs should not be requested if `use_gpu` is False.
with pytest.raises(ValueError):
Trainer(
TestConfig(),
num_workers=2,
use_gpu=False,
resources_per_worker={"GPU": 1})

# GPUs should not be set to 0 if `use_gpu` is True.
with pytest.raises(ValueError):
Trainer(
TestConfig(),
num_workers=2,
use_gpu=True,
resources_per_worker={"GPU": 0})

def get_resources():
return os.environ["CUDA_VISIBLE_DEVICES"]

# 0 GPUs will be requested and should not raise an error.
trainer = Trainer(TestConfig(), num_workers=2, use_gpu=False)
trainer.start()
result = trainer.run(get_resources)
assert result == ["", ""]
trainer.shutdown()

# 1 GPU will be requested and should not raise an error.
trainer = Trainer(TestConfig(), num_workers=2, use_gpu=True)
trainer.start()
result = trainer.run(get_resources)
assert result == ["0,1", "0,1"]
trainer.shutdown()

# Partial GPUs should not raise an error.
trainer = Trainer(
TestConfig(),
num_workers=2,
use_gpu=True,
resources_per_worker={"GPU": 0.1})
trainer.start()
result = trainer.run(get_resources)
assert result == ["0", "0"]
trainer.shutdown()

# Multiple GPUs should not raise an error.
trainer = Trainer(
TestConfig(),
num_workers=2,
use_gpu=True,
resources_per_worker={"GPU": 2})
trainer.start()
result = trainer.run(get_resources)
assert result == ["0,1,2,3", "0,1,2,3"]
trainer.shutdown()


if __name__ == "__main__":
import pytest
import sys
Expand Down
37 changes: 30 additions & 7 deletions python/ray/util/sgd/v2/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,17 @@ class Trainer:
a subclass of ``BackendConfig`` can be passed in.
Supported ``str`` values: {"torch"}.
num_workers (int): The number of workers (Ray actors) to launch.
Defaults to 1. Each worker will reserve 1 CPU by default.
Defaults to 1. Each worker will reserve 1 CPU by default. The
number of CPUs reserved by each worker can be overridden with the
``resources_per_worker`` argument.
use_gpu (bool): If True, training will be done on GPUs (1 per
worker). Defaults to False.
worker). Defaults to False. The number of GPUs reserved by each
worker can be overridden with the ``resources_per_worker``
argument.
resources_per_worker (Optional[Dict]): If specified, the resources
defined in this Dict will be reserved for each worker.
defined in this Dict will be reserved for each worker. The
``CPU`` and ``GPU`` keys can be defined to override the number of
CPU/GPUs used by each worker.
logdir (Optional[str]): Path to the file directory where logs
should be persisted. If this is not specified, one will be
generated.
Expand Down Expand Up @@ -96,15 +102,32 @@ def __init__(
# Setup executor.
backend_config = self._get_backend_config(backend)

num_cpus = 1
num_gpus = int(use_gpu)

if resources_per_worker:
raise NotImplementedError("`resources_per_worker` argument is not "
"supported yet.")
# Override CPU and GPU resources and remove from dict.
# TODO: case-sensitive? Ray uses capital, RayTune uses lower
num_cpus = resources_per_worker.pop("CPU", num_cpus)
num_gpus = resources_per_worker.pop("GPU", num_gpus)
if not use_gpu and num_gpus > 0:
raise ValueError(
"`use_gpu` is False but `GPU` was found in "
"`resources_per_worker`. Either set `use_gpu` to True or "
"remove `GPU` from `resources_per_worker.")
if use_gpu and num_gpus == 0:
raise ValueError(
"`use_gpu` is True but `GPU` is set to 0 in "
"`resources_per_worker`. Either set `use_gpu` to False or "
"request a positive number of `GPU` in "
"`resources_per_worker.")

self._executor = BackendExecutor(
backend_config=backend_config,
num_workers=num_workers,
num_cpus_per_worker=1,
num_gpus_per_worker=int(use_gpu),
num_cpus_per_worker=num_cpus,
num_gpus_per_worker=num_gpus,
additional_resources_per_worker=resources_per_worker,
max_retries=max_retries)

def create_logdir(self, log_dir: Optional[Union[str, Path]]) -> Path:
Expand Down
17 changes: 14 additions & 3 deletions python/ray/util/sgd/v2/worker_group.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Callable, List, TypeVar
from typing import Callable, List, TypeVar, Optional, Dict

import ray
from ray.types import ObjectRef
Expand Down Expand Up @@ -39,6 +39,11 @@ class WorkerGroup:
worker. Fractional values are allowed. Defaults to 1.
num_gpus_per_worker (float): The number of GPUs to reserve for each
worker. Fractional values are allowed. Defaults to 0.
additional_resources_per_worker (Optional[Dict[str, float]]):
Dictionary specifying the extra resources that will be
requested for each worker in addition to ``num_cpus_per_worker``
and ``num_gpus_per_worker``.


Example:

Expand All @@ -53,7 +58,9 @@ class WorkerGroup:
def __init__(self,
num_workers: int = 1,
num_cpus_per_worker: float = 1,
num_gpus_per_worker: float = 0):
num_gpus_per_worker: float = 0,
additional_resources_per_worker: Optional[Dict[
str, float]] = None):

if num_workers <= 0:
raise ValueError("The provided `num_workers` must be greater "
Expand All @@ -68,10 +75,14 @@ def __init__(self,
self.num_workers = num_workers
self.num_cpus_per_worker = num_cpus_per_worker
self.num_gpus_per_worker = num_gpus_per_worker
self.additional_resources_per_worker = additional_resources_per_worker
self.workers = []
# TODO(matt): Validate resources. Fast-fail if it is impossible to
# handle the request, rather than hang indefinitely.
self._remote_cls = ray.remote(
num_cpus=self.num_cpus_per_worker,
num_gpus=self.num_gpus_per_worker)(BaseWorker)
num_gpus=self.num_gpus_per_worker,
resources=self.additional_resources_per_worker)(BaseWorker)
self.start()

def _create_worker(self):
Expand Down