Skip to content

Commit

Permalink
[Spot] Stop the cluster if the cancellation fails (#1998)
Browse files Browse the repository at this point in the history
* stop the cluster if the cancellation fails

* Allow cancel for partial cluster

* format

* terminate instead of stop

* format

* rename

* Address coments

* address comments

* format
  • Loading branch information
Michaelvll authored and concretevitamin committed Jun 4, 2023
1 parent 9477120 commit 6ee8002
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 20 deletions.
9 changes: 6 additions & 3 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2287,7 +2287,8 @@ def check_cluster_available(
raise exceptions.ClusterNotUpError(
constants.UNINITIALIZED_ONPREM_CLUSTER_MESSAGE.format(
cluster_name),
cluster_status=cluster_status)
cluster_status=cluster_status,
handle=handle)
with ux_utils.print_exception_no_traceback():
hint_for_init = ''
if cluster_status == global_user_state.ClusterStatus.INIT:
Expand All @@ -2302,14 +2303,16 @@ def check_cluster_available(
f'{global_user_state.ClusterStatus.UP.value} clusters.'
f'{hint_for_init}'
f'{reset}',
cluster_status=cluster_status)
cluster_status=cluster_status,
handle=handle)

if handle.head_ip is None:
with ux_utils.print_exception_no_traceback():
raise exceptions.ClusterNotUpError(
f'Cluster {cluster_name!r} has been stopped or not properly '
'set up. Please re-launch it with `sky start`.',
cluster_status=cluster_status)
cluster_status=cluster_status,
handle=handle)
return handle


Expand Down
36 changes: 29 additions & 7 deletions sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,13 +524,22 @@ def queue(cluster_name: str,

@usage_lib.entrypoint
# pylint: disable=redefined-builtin
def cancel(cluster_name: str,
all: bool = False,
job_ids: Optional[List[int]] = None) -> None:
def cancel(
cluster_name: str,
all: bool = False,
job_ids: Optional[List[int]] = None,
# pylint: disable=invalid-name
_try_cancel_if_cluster_is_init: bool = False,
) -> None:
# NOTE(dev): Keep the docstring consistent between the Python API and CLI.
"""Cancel jobs on a cluster.
Please refer to the sky.cli.cancel for the document.
Additional arguments:
_try_cancel_if_cluster_is_init: (bool) whether to try cancelling the job
even if the cluster is not UP, but the head node is still alive.
This is used by the spot controller to cancel the job when the
worker node is preempted in the spot cluster.
Raises:
ValueError: if arguments are invalid, or the cluster does not exist.
Expand All @@ -551,10 +560,23 @@ def cancel(cluster_name: str,
cluster_name, operation_str='Cancelling jobs')

# Check the status of the cluster.
handle = backend_utils.check_cluster_available(
cluster_name,
operation='cancelling jobs',
)
try:
handle = backend_utils.check_cluster_available(
cluster_name,
operation='cancelling jobs',
)
except exceptions.ClusterNotUpError as e:
if not _try_cancel_if_cluster_is_init:
raise
assert (e.handle is None or
isinstance(e.handle, backends.CloudVmRayResourceHandle)), e
if (e.handle is None or e.handle.head_ip is None):
raise
# Even if the cluster is not UP, we can still try to cancel the job if
# the head node is still alive. This is useful when a spot cluster's
# worker node is preempted, but we can still cancel the job on the head
# node.

backend = backend_utils.get_backend_from_handle(handle)

if all:
Expand Down
10 changes: 6 additions & 4 deletions sky/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

if typing.TYPE_CHECKING:
from sky import global_user_state
from sky.backends import backend

# Return code for keyboard interruption and SIGTSTP
KEYBOARD_INTERRUPT_CODE = 130
Expand Down Expand Up @@ -94,12 +95,13 @@ def __init__(self, returncode: int, command: str, error_msg: str) -> None:
class ClusterNotUpError(Exception):
"""Raised when a cluster is not up."""

def __init__(
self, message: str,
cluster_status: Optional['global_user_state.ClusterStatus']
) -> None:
def __init__(self,
message: str,
cluster_status: Optional['global_user_state.ClusterStatus'],
handle: Optional['backend.ResourceHandle'] = None) -> None:
super().__init__(message)
self.cluster_status = cluster_status
self.handle = handle


class ClusterSetUpError(Exception):
Expand Down
36 changes: 30 additions & 6 deletions sky/spot/recovery_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,37 @@ def _try_cancel_all_jobs(self):
return
try:
usage_lib.messages.usage.set_internal()
sky.cancel(cluster_name=self.cluster_name, all=True)
# Note that `sky.cancel()` may not go through for a variety of
# reasons:
# (1) head node is preempted; or
# (2) somehow user programs escape the cancel codepath's kill.
# The latter is silent and is a TODO.
#
# For the former, an exception will be thrown, in which case we
# fallback to terminate_cluster() in the except block below. This
# is because in the event of recovery on the same set of remaining
# worker nodes, we don't want to leave some old job processes
# running.
# TODO(zhwu): This is non-ideal and we should figure out another way
# to reliably cancel those processes and not have to down the
# remaining nodes first.
#
# In the case where the worker node is preempted, the `sky.cancel()`
# should be functional with the `_try_cancel_if_cluster_is_init`
# flag, i.e. it sends the cancel signal to the head node, which will
# then kill the user process on remaining worker nodes.
sky.cancel(cluster_name=self.cluster_name,
all=True,
_try_cancel_if_cluster_is_init=True)
except Exception as e: # pylint: disable=broad-except
# Ignore the failure as the cluster can be totally stopped, and the
# job canceling can get connection error.
logger.info('Ignoring the job cancellation failure; '
'the spot cluster is likely completely stopped.'
f'\n Detailed exception: {e}')
logger.info(
'Failed to cancel the job on the cluster. The cluster '
'might be already down or the head node is preempted.'
'\n Detailed exception: '
f'{common_utils.format_exception(e)}\n'
'Terminating the cluster again to make sure there is no '
'remaining job on the worker nodes.')
terminate_cluster(self.cluster_name)

def _wait_until_job_starts_on_cluster(self) -> Optional[float]:
"""Wait for MAX_JOB_CHECKING_RETRY times until job starts on the cluster
Expand Down

0 comments on commit 6ee8002

Please sign in to comment.