Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spot] Expose failure reason for spot jobs #1655

Merged
merged 39 commits into from
Feb 5, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
4ae4a2a
Expose failure reason for spot jobs
Michaelvll Feb 1, 2023
f88a408
Add failure reason for normal failure
Michaelvll Feb 1, 2023
3786936
Failure reason hint for sky logs sky-spot-controller
Michaelvll Feb 1, 2023
fbf720d
require failure reason for all
Michaelvll Feb 1, 2023
bd75460
Fix the conftest
Michaelvll Feb 1, 2023
585b268
fix controller name
Michaelvll Feb 1, 2023
b90bf51
revert SKYPILOT_USER
Michaelvll Feb 1, 2023
8aa176b
Show controller processs logs with sky spot logs for better UX
Michaelvll Feb 1, 2023
7ab8355
revert usage user ID
Michaelvll Feb 1, 2023
c6c0f45
do not overwrite failure reason for resource unavailable
Michaelvll Feb 2, 2023
5318c43
format
Michaelvll Feb 2, 2023
a609901
lint
Michaelvll Feb 2, 2023
bda6c72
address comments
Michaelvll Feb 2, 2023
4516596
fix comment
Michaelvll Feb 2, 2023
fcbabd2
Update docs/source/examples/spot-jobs.rst
Michaelvll Feb 3, 2023
d034b0b
improve readability and refactoring
Michaelvll Feb 3, 2023
cb501a7
Merge branch 'fix-spot-status-for-cluster-name' of github.com:concret…
Michaelvll Feb 3, 2023
555f0ec
address comments
Michaelvll Feb 3, 2023
5c2d005
format
Michaelvll Feb 3, 2023
903deb9
Add comment
Michaelvll Feb 3, 2023
11ef41c
address comments
Michaelvll Feb 3, 2023
0ee6703
format
Michaelvll Feb 3, 2023
78696c6
Add failover history to the error rasied by _launch
Michaelvll Feb 3, 2023
b054f10
Add comment
Michaelvll Feb 3, 2023
cb773ab
Update sky/spot/recovery_strategy.py
Michaelvll Feb 4, 2023
087c31b
refactor
Michaelvll Feb 4, 2023
5d6723b
Address comment
Michaelvll Feb 4, 2023
d3726ed
Update sky/spot/recovery_strategy.py
Michaelvll Feb 4, 2023
f524a41
format
Michaelvll Feb 4, 2023
1deab6f
Merge branch 'fix-spot-status-for-cluster-name' of github.com:concret…
Michaelvll Feb 4, 2023
1bf7993
format
Michaelvll Feb 4, 2023
5b43ae9
fix exception name
Michaelvll Feb 4, 2023
c409b66
refactor a bit
Michaelvll Feb 4, 2023
629b2d4
Add more comments
Michaelvll Feb 4, 2023
ccf8469
format
Michaelvll Feb 4, 2023
14b1b74
fix
Michaelvll Feb 4, 2023
f2ac7e9
fix logs
Michaelvll Feb 5, 2023
6505fd2
adopt suggestions
Michaelvll Feb 5, 2023
47be483
Fix rendering
Michaelvll Feb 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/source/examples/spot-jobs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ Here are some commands for managed spot jobs. Check :code:`sky spot --help` for
# Cancel a spot job by name
$ sky spot cancel -n bert-qa

.. note::
If any failure happens for a spot job, you can check :code:`sky spot queue -a` for the brief reason
of the failure. For more details, it would be helpful to check :code:`sky spot logs --controller <job_id>`.


Spot controller (Advanced)
-------------------------------
Expand Down
6 changes: 3 additions & 3 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,17 +743,17 @@ def write_cluster_config(
- 'tpu-create-script' (if TPU is requested)
- 'tpu-delete-script' (if TPU is requested)
Raises:
ResourceUnavailableError: if the region/zones requested does not appear
exceptions.ResourcesUnavailableError: if the region/zones requested does not appear
in the catalog, or an ssh_proxy_command is specified but not for the given region.
"""
# task.best_resources may not be equal to to_provision if the user
# is running a job with less resources than the cluster has.
cloud = to_provision.cloud
# This can raise a ResourceUnavailableError, when the region/zones requested
# This can raise a ResourcesUnavailableError, when the region/zones requested
# does not appear in the catalog. It can be triggered when the user changed
# the catalog file, while there is a cluster in the removed region/zone.
# TODO(zhwu): We should change the exception type to a more specific one,
# as the ResourceUnavailableError is overly used. Also, it would be better
# as the ResourcesUnavailableError is overly used. Also, it would be better
# to move the check out of this function, i.e. the caller should be
# responsible for the validation.
resources_vars = cloud.make_deploy_resources_variables(
Expand Down
57 changes: 36 additions & 21 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -953,8 +953,8 @@ def _yield_region_zones(self, to_provision: resources_lib.Resources,
except FileNotFoundError:
# Happens if no previous cluster.yaml exists.
pass
if region is not None and cluster_exists:

if region is not None and cluster_exists:
region = clouds.Region(name=region)
if zones is not None:
zones = [clouds.Zone(name=zone) for zone in zones.split(',')]
Expand Down Expand Up @@ -1662,6 +1662,8 @@ def provision_with_retries(
launchable_retries_disabled = (self._dag is None or
self._optimize_target is None)

failover_history: List[Exception] = list()

style = colorama.Style
# Retrying launchable resources.
while True:
Expand Down Expand Up @@ -1699,16 +1701,18 @@ def provision_with_retries(
logger.warning(common_utils.format_exception(e))
self._blocked_resources.add(
resources_lib.Resources(cloud=to_provision.cloud))
failover_history.append(e)
except exceptions.ResourcesUnavailableError as e:
failover_history.append(e)
if e.no_failover:
raise e
raise e.with_failover_history(failover_history)
if launchable_retries_disabled:
logger.warning(
'DAG and optimize_target needs to be registered first '
'to enable cross-cloud retry. '
'To fix, call backend.register_info(dag=dag, '
'optimize_target=sky.OptimizeTarget.COST)')
raise e
raise e.with_failover_history(failover_history)

logger.warning(common_utils.format_exception(e))
else:
Expand Down Expand Up @@ -1741,9 +1745,18 @@ def provision_with_retries(
# (otherwise will skip re-optimizing this task).
# TODO: set all remaining tasks' best_resources to None.
task.best_resources = None
self._dag = sky.optimize(self._dag,
minimize=self._optimize_target,
blocked_resources=self._blocked_resources)
try:
self._dag = sky.optimize(
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
self._dag,
minimize=self._optimize_target,
blocked_resources=self._blocked_resources)
except exceptions.ResourcesUnavailableError as e:
# Optimizer failed to find a feasible resources for the task,
# either because the previous failovers have blocked all the
# possible resources or the requested resources is too
# restrictive. If we reach here, our failover logic finally
# ends here.
raise e.with_failover_history(failover_history)
to_provision = task.best_resources
assert task in self._dag.tasks, 'Internal logic error.'
assert to_provision is not None, task
Expand Down Expand Up @@ -2177,7 +2190,8 @@ def _provision(self,
'`--retry-until-up` flag.')
with ux_utils.print_exception_no_traceback():
raise exceptions.ResourcesUnavailableError(
error_message) from None
error_message,
failover_history=e.failover_history) from None
if dryrun:
return
cluster_config_file = config_dict['ray']
Expand Down Expand Up @@ -2517,20 +2531,21 @@ def _exec_code_on_head(
finally:
name = handle.cluster_name
if name == spot_lib.SPOT_CONTROLLER_NAME:
logger.info(f'{fore.CYAN}Spot Job ID: '
f'{style.BRIGHT}{job_id}{style.RESET_ALL}'
'\nTo cancel the job:\t\t'
f'{backend_utils.BOLD}sky spot cancel {job_id}'
f'{backend_utils.RESET_BOLD}'
'\nTo stream job logs:\t\t'
f'{backend_utils.BOLD}sky spot logs {job_id}'
f'{backend_utils.RESET_BOLD}'
f'\nTo stream controller logs:\t'
f'{backend_utils.BOLD}sky logs {name} {job_id}'
f'{backend_utils.RESET_BOLD}'
'\nTo view all spot jobs:\t\t'
f'{backend_utils.BOLD}sky spot queue'
f'{backend_utils.RESET_BOLD}')
logger.info(
f'{fore.CYAN}Spot Job ID: '
f'{style.BRIGHT}{job_id}{style.RESET_ALL}'
'\nTo cancel the job:\t\t'
f'{backend_utils.BOLD}sky spot cancel {job_id}'
f'{backend_utils.RESET_BOLD}'
'\nTo stream job logs:\t\t'
f'{backend_utils.BOLD}sky spot logs {job_id}'
f'{backend_utils.RESET_BOLD}'
f'\nTo stream controller logs:\t'
f'{backend_utils.BOLD}sky spot logs --controller {job_id}'
f'{backend_utils.RESET_BOLD}'
'\nTo view all spot jobs:\t\t'
f'{backend_utils.BOLD}sky spot queue'
f'{backend_utils.RESET_BOLD}')
else:
logger.info(f'{fore.CYAN}Job ID: '
f'{style.BRIGHT}{job_id}{style.RESET_ALL}'
Expand Down
33 changes: 24 additions & 9 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3076,11 +3076,6 @@ def spot_launch(
"""
if name is None:
name = backend_utils.generate_cluster_name()
else:
# This does the basic regex check for the cluster name, while the name
# length check will be done by the controller when it starts
# provisioning the cluster.
clouds.Cloud.check_cluster_name_is_valid(name)

task = _make_task_from_entrypoint_with_overrides(
entrypoint,
Expand All @@ -3105,6 +3100,15 @@ def spot_launch(
if prompt is not None:
click.confirm(prompt, default=True, abort=True, show_default=True)

# We try our best to validate the cluster name before we launch the task.
# If the cloud is not specified, this will only validate the cluster name
# against the regex, and the cloud-specific validation will be done by
# the spot controller when actually launching the spot cluster.
resources = list(task.resources)[0]
task_cloud = (resources.cloud
if resources.cloud is not None else clouds.Cloud)
task_cloud.check_cluster_name_is_valid(name)

sky.spot_launch(task,
name,
detach_run=detach_run,
Expand Down Expand Up @@ -3154,8 +3158,7 @@ def spot_queue(all: bool, refresh: bool, skip_finished: bool):
- CANCELLED: The job was cancelled by the user.

If the job failed, either due to user code or spot unavailability, the error
log can be found with ``sky logs sky-spot-controller-<user_hash> job_id``.
Please find your exact spot controller name with ``sky status``.
log can be found with ``sky spot logs --controller job_id``.
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved

(Tip) To fetch job statuses every 60 seconds, use ``watch``:

Expand Down Expand Up @@ -3288,12 +3291,24 @@ def spot_cancel(name: Optional[str], job_ids: Tuple[int], all: bool, yes: bool):
default=True,
help=('Follow the logs of the job. [default: --follow] '
'If --no-follow is specified, print the log so far and exit.'))
@click.option(
'--controller',
is_flag=True,
default=False,
help=('Show the controller logs of this job; useful for debugging '
'launching/recoveries, etc.'))
@click.argument('job_id', required=False, type=int)
@usage_lib.entrypoint
def spot_logs(name: Optional[str], job_id: Optional[int], follow: bool):
def spot_logs(name: Optional[str], job_id: Optional[int], follow: bool,
controller: bool):
"""Tail the log of a managed spot job."""
try:
core.spot_tail_logs(name=name, job_id=job_id, follow=follow)
if controller:
core.tail_logs(spot_lib.SPOT_CONTROLLER_NAME,
job_id=job_id,
follow=follow)
else:
core.spot_tail_logs(name=name, job_id=job_id, follow=follow)
except exceptions.ClusterNotUpError:
# Hint messages already printed by the call above.
sys.exit(1)
Expand Down
7 changes: 4 additions & 3 deletions sky/clouds/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,13 +401,14 @@ def check_cluster_name_is_valid(cls, cluster_name: str) -> None:
'ensure it is fully matched by regex (e.g., '
'only contains lower letters, numbers and dash): '
f'{valid_regex}')
if max_cluster_name_len_limit is not None and len(
cluster_name) > max_cluster_name_len_limit:
if (max_cluster_name_len_limit is not None and
len(cluster_name) > max_cluster_name_len_limit):
cloud_name = '' if cls is Cloud else f' on {cls._REPR}'
with ux_utils.print_exception_no_traceback():
raise exceptions.InvalidClusterNameError(
f'Cluster name {cluster_name!r} has {len(cluster_name)} '
'chars; maximum length is '
f'{max_cluster_name_len_limit} chars.')
f'{max_cluster_name_len_limit} chars{cloud_name}.')

def __repr__(self):
return self._REPR
28 changes: 27 additions & 1 deletion sky/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Exceptions."""
import enum
from typing import List, Optional

# Return code for keyboard interruption and SIGTSTP
KEYBOARD_INTERRUPT_CODE = 130
Expand All @@ -10,9 +11,34 @@
class ResourcesUnavailableError(Exception):
"""Raised when resources are unavailable."""

def __init__(self, *args: object, no_failover: bool = False) -> None:
def __init__(self,
*args: object,
no_failover: bool = False,
failover_history: Optional[List[Exception]] = None) -> None:
super().__init__(*args)
self.no_failover = no_failover
# Mapping from exception type to reason for failover.
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
if failover_history is None:
failover_history = []
# Copy the list to avoid modifying from outside.
self.failover_history: List[Exception] = list(failover_history)

def with_failover_history(self, failover_history: List[Exception]) -> None:
# Copy the list to avoid modifying from outside.
self.failover_history = list(failover_history)
return self


class SpotJobFailedBeforeProvisionError(Exception):
"""Raised when a spot job fails before provision.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add #1655 (comment)? E.g.,

...

This is only raised by the spot controller process (`recovery_strategy`) when one of the following happens:

  - The optimizer cannot find feasible resources: e.g., this includes the case where a maximum number retries are attempted and the launch still failed, corresponding to the case above where the optimizer cannot find any more feasible resources. 
  - or none of the exceptions in failover history are because of resources unavailability returned from an actual provision request.



This exception differs from an ResourcesUnavailableError with an empty failover_history, because the latter will only happen when ....

Not sure these are correct; please check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May need updates after offline discussions.

Michaelvll marked this conversation as resolved.
Show resolved Hide resolved

Args:
reason: (Exception) The reason why the job fails.
"""

def __init__(self, *args: object, reason: Exception) -> None:
super().__init__(*args)
self.reason = reason


class ResourcesMismatchError(Exception):
Expand Down
12 changes: 12 additions & 0 deletions sky/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ def optimize(
minimize=OptimizeTarget.COST,
blocked_resources: Optional[List[resources_lib.Resources]] = None,
quiet: bool = False):
"""Find the best execution plan for the given DAG.

Args:
dag: the DAG to optimize.
minimize: whether to minimize cost or time.
blocked_resources: a list of resources that should not be used.
quiet: whether to suppress logging.

Raises:
exceptions.ResourcesUnavailableError: if no resources are available
for a task.
"""
# This function is effectful: mutates every node in 'dag' by setting
# node.best_resources if it is None.
Optimizer._add_dummy_source_sink_nodes(dag)
Expand Down
4 changes: 3 additions & 1 deletion sky/setup_files/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ def parse_readme(readme: str) -> str:
'oauth2client',
'pandas',
'pendulum',
'PrettyTable',
# PrettyTable with version >=2.0.0 is required for the support of
# `add_rows` method.
'PrettyTable>=2.0.0',
# Lower local ray version is not fully supported, due to the
# autoscaler issues (also tracked in #537).
'ray[default]>=1.9.0,<=2.2.0',
Expand Down
58 changes: 47 additions & 11 deletions sky/spot/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import time
import traceback

import colorama
import filelock

import sky
Expand Down Expand Up @@ -60,7 +59,15 @@ def __init__(self, job_id: int, task_yaml: str,
self._cluster_name, self._backend, self._task, retry_until_up)

def _run(self):
"""Busy loop monitoring spot cluster status and handling recovery."""
"""Busy loop monitoring spot cluster status and handling recovery.

Raises:
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
exceptions.ResourcesUnavailableError: if the spot cluster fails
to be launched or the job fails to be submitted to the cluster.
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
This will happen iff none of the failover are due to resources
unavailability, or retry_until_up is False and we've reached the
maximum number of retries.
"""
logger.info(f'Started monitoring spot task {self._task_name} '
f'(id: {self._job_id})')
spot_state.set_starting(self._job_id)
Expand Down Expand Up @@ -134,11 +141,16 @@ def _run(self):
None,
spot_job_id=self._job_id)
logger.info(f'\n== End of logs (ID: {self._job_id}) ==')
status_to_set = spot_state.SpotStatus.FAILED
spot_status_to_set = spot_state.SpotStatus.FAILED
if job_status == job_lib.JobStatus.FAILED_SETUP:
status_to_set = spot_state.SpotStatus.FAILED_SETUP
spot_status_to_set = spot_state.SpotStatus.FAILED_SETUP
failure_reason = (
'To see the details, run: '
f'sky spot logs --controller {self._job_id}')
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved

spot_state.set_failed(self._job_id,
failure_type=status_to_set,
failure_type=spot_status_to_set,
failure_reason=failure_reason,
end_time=end_time)
break
# Although the cluster is healthy, we fail to access the
Expand Down Expand Up @@ -174,16 +186,37 @@ def run(self):
# Kill the children processes launched by log_lib.run_with_log.
subprocess_utils.kill_children_processes()
spot_state.set_cancelled(self._job_id)
except exceptions.SpotJobFailedBeforeProvisionError as e:
# The exception will be caught when:
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
# None of the failovers are caused by resource unavailability;
# i.e., they are caused by errors before actual provisioning,
# e.g., InvalidClusterNameError, NotSupportedError,
# CloudUserIdentityError, etc.
logger.error(common_utils.format_exception(e.reason))
spot_state.set_failed(
self._job_id,
failure_type=spot_state.SpotStatus.FAILED_OTHER_REASON,
failure_reason=common_utils.format_exception(e.reason))
except exceptions.ResourcesUnavailableError as e:
logger.error(f'{common_utils.class_fullname(e.__class__)}: '
f'{colorama.Fore.RED}{e}{colorama.Style.RESET_ALL}')
# The exception will be caught when:
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
# The strategy_executor fails to launch/recover the cluster
# after the max number of retries when retry_until_up is not set.
logger.error(common_utils.format_exception(e))
# The spot job should be marked as FAILED_NO_RESOURCE, as the
# spot job may be able to launch next time.
spot_state.set_failed(
self._job_id,
failure_type=spot_state.SpotStatus.FAILED_NO_RESOURCE)
failure_type=spot_state.SpotStatus.FAILED_NO_RESOURCE,
failure_reason=common_utils.format_exception(e))
except (Exception, SystemExit) as e: # pylint: disable=broad-except
logger.error(traceback.format_exc())
logger.error('Unexpected error occurred: '
f'{common_utils.format_exception(e)}')
msg = ('Unexpected error occurred: '
f'{common_utils.format_exception(e, use_bracket=True)}')
logger.error(msg)
spot_state.set_failed(
self._job_id,
failure_type=spot_state.SpotStatus.FAILED_CONTROLLER,
failure_reason=msg)
finally:
self._strategy_executor.terminate_cluster()
job_status = spot_state.get_status(self._job_id)
Expand All @@ -193,7 +226,10 @@ def run(self):
logger.info(f'Previous spot job status: {job_status.value}')
spot_state.set_failed(
self._job_id,
failure_type=spot_state.SpotStatus.FAILED_CONTROLLER)
failure_type=spot_state.SpotStatus.FAILED_CONTROLLER,
failure_reason=(
'Unexpected error occurred. For details, '
f'run: sky spot logs --controller {self._job_id}'))

# Clean up Storages with persistent=False.
self._backend.teardown_ephemeral_storage(self._task)
Expand Down
Loading