Skip to content

Commit

Permalink
[sgd] Implement resources_per_worker (#18327)
Browse files Browse the repository at this point in the history
* [sgd] add support for additional resources per worker

* [sgd] add support for additional resources per worker

* update test

* lint

* update comments for case-sensitivity
  • Loading branch information
matthewdeng authored Sep 3, 2021
1 parent 01adf03 commit 26f73eb
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 16 deletions.
15 changes: 10 additions & 5 deletions python/ray/util/sgd/v2/backends/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ class BackendExecutor:
num_workers (int): Number of workers to use for training.
num_cpus_per_worker (float): Number of CPUs to use per worker.
num_gpus_per_worker (float): Number of GPUs to use per worker.
additional_resources_per_worker (Optional[Dict[str, float]]):
Dictionary specifying the extra resources that will be
requested for each worker in addition to ``num_cpus_per_worker``
and ``num_gpus_per_worker``.
max_retries (int): Number of retries when Ray actors fail.
Defaults to 3. Set to -1 for unlimited retries.
Expand All @@ -230,13 +234,14 @@ def __init__(
num_workers: int = 1,
num_cpus_per_worker: float = 1,
num_gpus_per_worker: float = 0,
max_retries: int = 3,
):
additional_resources_per_worker: Optional[Dict[str, float]] = None,
max_retries: int = 3):
self._backend_config = backend_config
self._backend = self._backend_config.backend_cls()
self._num_workers = num_workers
self._num_cpus_per_worker = num_cpus_per_worker
self._num_gpus_per_worker = num_gpus_per_worker
self._additional_resources_per_worker = additional_resources_per_worker
self._max_failures = max_retries
if self._max_failures < 0:
self._max_failures = float("inf")
Expand All @@ -254,9 +259,9 @@ def __init__(

def start(self, initialization_hook: Optional[Callable[[], None]] = None):
"""Starts the worker group."""
self.worker_group = WorkerGroup(self._num_workers,
self._num_cpus_per_worker,
self._num_gpus_per_worker)
self.worker_group = WorkerGroup(
self._num_workers, self._num_cpus_per_worker,
self._num_gpus_per_worker, self._additional_resources_per_worker)
try:
if initialization_hook:
self._initialization_hook = initialization_hook
Expand Down
93 changes: 92 additions & 1 deletion python/ray/util/sgd/v2/tests/test_trainer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import time
from pathlib import Path
from unittest.mock import patch
Expand All @@ -8,7 +9,7 @@
import ray.util.sgd.v2 as sgd
import tensorflow as tf
import torch

from ray._private.test_utils import wait_for_condition
from ray.util.sgd.v2 import Trainer, TorchConfig, TensorflowConfig, \
HorovodConfig
from ray.util.sgd.v2.backends.backend import BackendConfig, Backend, \
Expand Down Expand Up @@ -50,6 +51,14 @@ def ray_start_8_cpus():
ray.shutdown()


@pytest.fixture
def ray_start_4_cpus_4_gpus_4_extra():
address_info = ray.init(num_cpus=4, num_gpus=4, resources={"extra": 4})
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()


class TestConfig(BackendConfig):
@property
def backend_cls(self):
Expand Down Expand Up @@ -910,6 +919,88 @@ def train():
assert output == [1, 1]


@pytest.mark.parametrize("resource", ["CPU", "GPU", "extra"])
@pytest.mark.parametrize("num_requested", [0.5, 1, 2])
def test_resources(ray_start_4_cpus_4_gpus_4_extra, resource, num_requested):
num_workers = 2
config = TestConfig()
original = ray.available_resources().get(resource)
resources_per_worker = {resource: num_requested}
use_gpu = resource == "GPU"
trainer = Trainer(
config,
num_workers=num_workers,
use_gpu=use_gpu,
resources_per_worker=resources_per_worker)

trainer.start()
expected = original - num_workers * num_requested
wait_for_condition(
lambda: ray.available_resources().get(resource, 0) == expected)

trainer.shutdown()
wait_for_condition(
lambda: ray.available_resources().get(resource, 0) == original)


def test_gpu_requests(ray_start_4_cpus_4_gpus_4_extra):

# GPUs should not be requested if `use_gpu` is False.
with pytest.raises(ValueError):
Trainer(
TestConfig(),
num_workers=2,
use_gpu=False,
resources_per_worker={"GPU": 1})

# GPUs should not be set to 0 if `use_gpu` is True.
with pytest.raises(ValueError):
Trainer(
TestConfig(),
num_workers=2,
use_gpu=True,
resources_per_worker={"GPU": 0})

def get_resources():
return os.environ["CUDA_VISIBLE_DEVICES"]

# 0 GPUs will be requested and should not raise an error.
trainer = Trainer(TestConfig(), num_workers=2, use_gpu=False)
trainer.start()
result = trainer.run(get_resources)
assert result == ["", ""]
trainer.shutdown()

# 1 GPU will be requested and should not raise an error.
trainer = Trainer(TestConfig(), num_workers=2, use_gpu=True)
trainer.start()
result = trainer.run(get_resources)
assert result == ["0,1", "0,1"]
trainer.shutdown()

# Partial GPUs should not raise an error.
trainer = Trainer(
TestConfig(),
num_workers=2,
use_gpu=True,
resources_per_worker={"GPU": 0.1})
trainer.start()
result = trainer.run(get_resources)
assert result == ["0", "0"]
trainer.shutdown()

# Multiple GPUs should not raise an error.
trainer = Trainer(
TestConfig(),
num_workers=2,
use_gpu=True,
resources_per_worker={"GPU": 2})
trainer.start()
result = trainer.run(get_resources)
assert result == ["0,1,2,3", "0,1,2,3"]
trainer.shutdown()


if __name__ == "__main__":
import pytest
import sys
Expand Down
36 changes: 29 additions & 7 deletions python/ray/util/sgd/v2/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,17 @@ class Trainer:
a subclass of ``BackendConfig`` can be passed in.
Supported ``str`` values: {"torch"}.
num_workers (int): The number of workers (Ray actors) to launch.
Defaults to 1. Each worker will reserve 1 CPU by default.
Defaults to 1. Each worker will reserve 1 CPU by default. The
number of CPUs reserved by each worker can be overridden with the
``resources_per_worker`` argument.
use_gpu (bool): If True, training will be done on GPUs (1 per
worker). Defaults to False.
worker). Defaults to False. The number of GPUs reserved by each
worker can be overridden with the ``resources_per_worker``
argument.
resources_per_worker (Optional[Dict]): If specified, the resources
defined in this Dict will be reserved for each worker.
defined in this Dict will be reserved for each worker. The
``CPU`` and ``GPU`` keys (case-sensitive) can be defined to
override the number of CPU/GPUs used by each worker.
logdir (Optional[str]): Path to the file directory where logs
should be persisted. If this is not specified, one will be
generated.
Expand Down Expand Up @@ -96,15 +102,31 @@ def __init__(
# Setup executor.
backend_config = self._get_backend_config(backend)

num_cpus = 1
num_gpus = int(use_gpu)

if resources_per_worker:
raise NotImplementedError("`resources_per_worker` argument is not "
"supported yet.")
# Override CPU and GPU resources and remove from dict.
num_cpus = resources_per_worker.pop("CPU", num_cpus)
num_gpus = resources_per_worker.pop("GPU", num_gpus)
if not use_gpu and num_gpus > 0:
raise ValueError(
"`use_gpu` is False but `GPU` was found in "
"`resources_per_worker`. Either set `use_gpu` to True or "
"remove `GPU` from `resources_per_worker.")
if use_gpu and num_gpus == 0:
raise ValueError(
"`use_gpu` is True but `GPU` is set to 0 in "
"`resources_per_worker`. Either set `use_gpu` to False or "
"request a positive number of `GPU` in "
"`resources_per_worker.")

self._executor = BackendExecutor(
backend_config=backend_config,
num_workers=num_workers,
num_cpus_per_worker=1,
num_gpus_per_worker=int(use_gpu),
num_cpus_per_worker=num_cpus,
num_gpus_per_worker=num_gpus,
additional_resources_per_worker=resources_per_worker,
max_retries=max_retries)

def create_logdir(self, log_dir: Optional[Union[str, Path]]) -> Path:
Expand Down
17 changes: 14 additions & 3 deletions python/ray/util/sgd/v2/worker_group.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Callable, List, TypeVar
from typing import Callable, List, TypeVar, Optional, Dict

import ray
from ray.types import ObjectRef
Expand Down Expand Up @@ -39,6 +39,11 @@ class WorkerGroup:
worker. Fractional values are allowed. Defaults to 1.
num_gpus_per_worker (float): The number of GPUs to reserve for each
worker. Fractional values are allowed. Defaults to 0.
additional_resources_per_worker (Optional[Dict[str, float]]):
Dictionary specifying the extra resources that will be
requested for each worker in addition to ``num_cpus_per_worker``
and ``num_gpus_per_worker``.
Example:
Expand All @@ -53,7 +58,9 @@ class WorkerGroup:
def __init__(self,
num_workers: int = 1,
num_cpus_per_worker: float = 1,
num_gpus_per_worker: float = 0):
num_gpus_per_worker: float = 0,
additional_resources_per_worker: Optional[Dict[
str, float]] = None):

if num_workers <= 0:
raise ValueError("The provided `num_workers` must be greater "
Expand All @@ -68,10 +75,14 @@ def __init__(self,
self.num_workers = num_workers
self.num_cpus_per_worker = num_cpus_per_worker
self.num_gpus_per_worker = num_gpus_per_worker
self.additional_resources_per_worker = additional_resources_per_worker
self.workers = []
# TODO(matt): Validate resources. Fast-fail if it is impossible to
# handle the request, rather than hang indefinitely.
self._remote_cls = ray.remote(
num_cpus=self.num_cpus_per_worker,
num_gpus=self.num_gpus_per_worker)(BaseWorker)
num_gpus=self.num_gpus_per_worker,
resources=self.additional_resources_per_worker)(BaseWorker)
self.start()

def _create_worker(self):
Expand Down

0 comments on commit 26f73eb

Please sign in to comment.