Skip to content

[Tune] Raise Error when there are insufficient resources. #17957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/tune/user-guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ These are the environment variables Ray Tune currently considers:
* **TUNE_WARN_THRESHOLD_S**: Threshold for logging if an Tune event loop operation takes too long. Defaults to 0.5 (seconds).
* **TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S**: Threshold for throwing a warning if no active trials are in ``RUNNING`` state
for this amount of seconds. If the Ray Tune job is stuck in this state (most likely due to insufficient resources),
the warning message is printed repeatedly every this amount of seconds. Defaults to 1 (seconds).
the warning message is printed repeatedly every this amount of seconds. Defaults to 10 (seconds).
* **TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S_AUTOSCALER**: Threshold for throwing a warning, when the autoscaler is enabled,
if no active trials are in ``RUNNING`` state for this amount of seconds.
If the Ray Tune job is stuck in this state (most likely due to insufficient resources), the warning message is printed
Expand Down
57 changes: 20 additions & 37 deletions python/ray/tune/tests/test_ray_trial_executor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
# coding: utf-8
from freezegun import freeze_time
from mock import patch
import os
import pytest
import unittest

import ray
from ray import tune
from ray.rllib import _register_all
from ray.tune import Trainable
from ray.tune import Trainable, TuneError
from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.registry import _global_registry, TRAINABLE_CLASS
from ray.tune.result import TRAINING_ITERATION
Expand All @@ -20,54 +19,39 @@

class TrialExecutorInsufficientResourcesTest(unittest.TestCase):
def setUp(self):
os.environ["TUNE_INSUFFICENT_RESOURCE_WARN_THRESHOLD_S"] = "1"
os.environ["TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S"] = "1"
self.cluster = Cluster(
initialize_head=True,
connect=True,
head_node_args={
"num_cpus": 4,
"num_gpus": 2,
"_system_config": {
"num_heartbeats_timeout": 10
}
})

def tearDown(self):
ray.shutdown()
self.cluster.shutdown()

@freeze_time("2021-08-03", auto_tick_seconds=15)
@patch.object(ray.tune.trial_executor.logger, "warning")
def testOutputWarningMessage(self, mocked_warn):
def train(config):
pass

tune.run(
train, resources_per_trial={
"cpu": 1,
"gpu": 1,
})
msg = (
"No trial is running and no new trial has been started within at"
" least the last 1.0 seconds. This could be due to the cluster "
"not having enough resources available to start the next trial. "
"Please stop the tuning job and readjust resources_per_trial "
"argument passed into tune.run() and/or start a cluster with more "
"resources.")
mocked_warn.assert_called_with(msg)

@freeze_time("2021-08-03")
@patch.object(ray.tune.trial_executor.logger, "warning")
def testNotOutputWarningMessage(self, mocked_warn):
# no autoscaler case, resource is not sufficient. Raise error.
def testRaiseErrorNoAutoscaler(self):
def train(config):
pass

tune.run(
train, resources_per_trial={
"cpu": 1,
"gpu": 1,
})
mocked_warn.assert_not_called()
with pytest.raises(TuneError) as cm:
tune.run(
train,
resources_per_trial={
"cpu": 5, # more than what the cluster can offer.
"gpu": 3,
})
msg = ("You asked for 5.0 cpu and 3.0 gpu per trial, "
"but the cluster only has 4.0 cpu and 2.0 gpu. "
"Stop the tuning job and "
"adjust the resources requested per trial "
"(possibly via `resources_per_trial` "
"or via `num_workers` for rllib) "
"and/or add more resources to your Ray runtime.")
assert str(cm._excinfo[1]) == msg


class RayTrialExecutorTest(unittest.TestCase):
Expand Down Expand Up @@ -475,6 +459,5 @@ def tearDown(self):


if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))
110 changes: 85 additions & 25 deletions python/ray/tune/trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
from typing import Dict, List, Optional

import ray
from ray.tune.resources import Resources
from ray.util.annotations import DeveloperAPI
from ray.tune.trial import Trial, Checkpoint
Expand All @@ -15,37 +16,78 @@
logger = logging.getLogger(__name__)


# Ideally we want to use @cache; but it's only available for python 3.9.
# Caching is only helpful/correct for no autoscaler case.
@lru_cache()
def _get_warning_threshold() -> float:
def _get_cluster_resources_no_autoscaler() -> Dict:
return ray.cluster_resources()


def _get_trial_cpu_and_gpu(trial: Trial) -> Dict:
cpu = trial.resources.cpu + trial.resources.extra_cpu
gpu = trial.resources.gpu + trial.resources.extra_gpu
if trial.placement_group_factory is not None:
cpu = trial.placement_group_factory.required_resources.get("CPU", 0)
gpu = trial.placement_group_factory.required_resources.get("GPU", 0)
return {"CPU": cpu, "GPU": gpu}


def _can_fulfill_no_autoscaler(trial: Trial) -> bool:
"""Calculates if there is enough resources for a PENDING trial.

For no autoscaler case.
"""
assert trial.status == Trial.PENDING
trial_cpu_gpu = _get_trial_cpu_and_gpu(trial)

return trial_cpu_gpu["CPU"] <= _get_cluster_resources_no_autoscaler().get(
"CPU", 0
) and trial_cpu_gpu["GPU"] <= _get_cluster_resources_no_autoscaler().get(
"GPU", 0)


@lru_cache()
def _get_insufficient_resources_warning_threshold() -> float:
if is_ray_cluster():
return float(
os.environ.get(
"TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S_AUTOSCALER", "60"))
else:
# Set the default to 10s so that we don't prematurely determine that
# a cluster cannot fulfill the resources requirements.
return float(
os.environ.get("TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S", "1"))
os.environ.get("TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S", "10"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like on the product, this is being raised too frequently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem in current version of code is less about the frequency, but rather the assumption about condition checking is off. See this line. It incorrectly concludes if every time we arrive at this condition check with no running trial, we are not making progress - which is wrong.
E.g. [trial1(PENDING)] --> [trial1(TERMINATED), trial2(PENDING)] --> [trial1(TERMINATED), trial2(TERMINATED), trial3(PENDING)] etc. Every time, there is no running trial, but progress is made.

The current PR addresses this issue by changing the condition check to the length of all_trials, which corrects the bug.

As for the frequency, a warning message is printed for autoscaler case every 60s. For no autoscaler case, we will throw an error after 10s if the cluster doesn't have enough resources to fulfill a Trial, which is a hard fail(no repeated printing).

If 60s and 10s are too short, we can certainly elongate that. We just need to trade off with the time that user starts to wonder what is happening with pending trials without being informed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably test this in the product.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Although the nature of the bug is more within the core Tune code. See the response to Richard's comment above. I pasted a SxS comparison of w and w/o the change. Let me know if you want some more coverage.

Also Richard, Ameer, if we need more time to settle this PR, should we first revert the master?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think this PR is good (with a small adjustment to the error message).



# TODO(xwjiang): Consider having a help page with more detailed instructions.
@lru_cache()
def _get_warning_msg() -> str:
def _get_insufficient_resources_warning_msg() -> str:
msg = (
f"No trial is running and no new trial has been started within"
f" at least the last "
f"{_get_insufficient_resources_warning_threshold()} seconds. "
f"This could be due to the cluster not having enough "
f"resources available to start the next trial. "
f"Stop the tuning job and adjust the resources requested per trial "
f"(possibly via `resources_per_trial` or via `num_workers` for rllib) "
f"and/or add more resources to your Ray runtime.")
if is_ray_cluster():
return (
f"If autoscaler is still scaling up, ignore this message. No "
f"trial is running and no new trial has been started within at "
f"least the last {_get_warning_threshold()} seconds. "
f"This could be due to the cluster not having enough "
f"resources available to start the next trial. Please stop the "
f"tuning job and readjust resources_per_trial argument passed "
f"into tune.run() as well as max_workers and worker_nodes "
f"InstanceType specified in cluster.yaml.")
return "If autoscaler is still scaling up, ignore this message. " + msg
else:
return (f"No trial is running and no new trial has been started within"
f" at least the last {_get_warning_threshold()} seconds. "
f"This could be due to the cluster not having enough "
f"resources available to start the next trial. Please stop "
f"the tuning job and readjust resources_per_trial argument "
f"passed into tune.run() and/or start a cluster with more "
f"resources.")
return msg


# A beefed up version when Tune Error is raised.
def _get_insufficient_resources_error_msg(trial: Trial) -> str:
trial_cpu_gpu = _get_trial_cpu_and_gpu(trial)
return (
f"You asked for {trial_cpu_gpu['CPU']} cpu and "
f"{trial_cpu_gpu['GPU']} gpu per trial, but the cluster only has "
f"{_get_cluster_resources_no_autoscaler().get('CPU', 0)} cpu and "
f"{_get_cluster_resources_no_autoscaler().get('GPU', 0)} gpu. "
f"Stop the tuning job and adjust the resources requested per trial "
f"(possibly via `resources_per_trial` or via `num_workers` for rllib) "
f"and/or add more resources to your Ray runtime.")


@DeveloperAPI
Expand All @@ -68,12 +110,19 @@ def __init__(self, queue_trials: bool = False):
self._queue_trials = queue_trials
self._cached_trial_state = {}
self._trials_to_cache = set()
# The next two variables are used to keep track of if there is any
# "progress" made between subsequent calls to `on_no_available_trials`.
# TODO(xwjiang): Clean this up once figuring out who should have a
# holistic view of trials - runner or executor.
# Also iterating over list of trials every time is very inefficient.
# Need better visibility APIs into trials.
# The start time since when all active trials have been in PENDING
# state, or since last time we output a resource insufficent
# warning message, whichever comes later.
# -1 means either the TrialExecutor is just initialized without any
# trials yet, or there are some trials in RUNNING state.
self._no_running_trials_since = -1
self._all_trials_size = -1

def set_status(self, trial: Trial, status: str) -> None:
"""Sets status and checkpoints metadata if needed.
Expand Down Expand Up @@ -237,17 +286,28 @@ def force_reconcilation_on_next_step_end(self) -> None:
pass

def _may_warn_insufficient_resources(self, all_trials):
if not any(trial.status == Trial.RUNNING for trial in all_trials):
# This is approximately saying we are not making progress.
if len(all_trials) == self._all_trials_size:
if self._no_running_trials_since == -1:
self._no_running_trials_since = time.monotonic()
elif time.monotonic(
) - self._no_running_trials_since > _get_warning_threshold():
# TODO(xwjiang): We should ideally output a more helpful msg.
# https://github.com/ray-project/ray/issues/17799
logger.warning(_get_warning_msg())
elif (time.monotonic() - self._no_running_trials_since >
_get_insufficient_resources_warning_threshold()):
if not is_ray_cluster(): # autoscaler not enabled
# If any of the pending trial cannot be fulfilled,
# that's a good enough hint of trial resources not enough.
for trial in all_trials:
if (trial.status is Trial.PENDING
and not _can_fulfill_no_autoscaler(trial)):
raise TuneError(
_get_insufficient_resources_error_msg(trial))
else:
# TODO(xwjiang): Output a more helpful msg for autoscaler.
# https://github.com/ray-project/ray/issues/17799
logger.warning(_get_insufficient_resources_warning_msg())
self._no_running_trials_since = time.monotonic()
else:
self._no_running_trials_since = -1
self._all_trials_size = len(all_trials)

def on_no_available_trials(self, trials: List[Trial]) -> None:
"""
Expand Down