Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spot] Let cancel interrupt the spot job (#1414) #1433

Merged
merged 35 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c3a8599
Let cancel interrupt the job
Michaelvll Nov 14, 2022
169e8b5
Add test
Michaelvll Nov 14, 2022
3e8bac6
Fix test
Michaelvll Nov 14, 2022
e82d89c
Cancel early
Michaelvll Nov 14, 2022
c081c60
fix test
Michaelvll Nov 14, 2022
3441b43
fix test
Michaelvll Nov 14, 2022
88f4db5
Fix exceptions
Michaelvll Nov 14, 2022
331fa32
pass test
Michaelvll Nov 14, 2022
25c568c
increase waiting time
Michaelvll Nov 15, 2022
71a253d
address comments
Michaelvll Nov 17, 2022
5253d5d
add job id
Michaelvll Nov 17, 2022
6e9ba0c
remove 'auto' in ray.init
Michaelvll Nov 17, 2022
7bffd84
Fix serialization problem
Michaelvll Nov 19, 2022
a5e7b20
refactor a bit
Michaelvll Nov 19, 2022
0b66584
Fix
Michaelvll Nov 19, 2022
aa6dd91
Add comments
Michaelvll Nov 19, 2022
3065fed
format
Michaelvll Nov 19, 2022
5f0d801
pylint
Michaelvll Nov 19, 2022
7875d35
revert a format change
Michaelvll Nov 19, 2022
f7b4f8b
Add docstr
Michaelvll Nov 19, 2022
3cfa747
Move ray.init
Michaelvll Nov 19, 2022
c140801
replace ray with multiprocess.Process
Michaelvll Nov 20, 2022
256d1f9
Add test for setup cancelation
Michaelvll Nov 20, 2022
3feb30f
Fix logging
Michaelvll Nov 20, 2022
8f469a5
Fix test
Michaelvll Nov 20, 2022
ba0f7b7
lint
Michaelvll Nov 20, 2022
af72709
Use SIGTERM instead
Michaelvll Nov 20, 2022
0556774
format
Michaelvll Nov 20, 2022
98db4a6
Change exception type
Michaelvll Nov 20, 2022
1338a22
revert to KeyboardInterrupt
Michaelvll Nov 20, 2022
76b62fb
remove
Michaelvll Nov 20, 2022
8985d73
Fix test
Michaelvll Nov 21, 2022
d93c5f6
fix test
Michaelvll Nov 21, 2022
832bde1
fix test
Michaelvll Nov 22, 2022
2e3fbe4
typo
Michaelvll Nov 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/managed_spot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ run: |
import time
import tqdm

for i in tqdm.trange(120):
for i in tqdm.trange(1200):
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
time.sleep(1)

EOF
18 changes: 2 additions & 16 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import jinja2
import jsonschema
from packaging import version
import psutil
import requests
from requests import adapters
from requests.packages.urllib3.util import retry as retry_lib
Expand Down Expand Up @@ -2002,23 +2001,10 @@ def check_gcp_cli_include_tpu_vm() -> None:
' TPU VM APIs, check "gcloud version" for details.')


def kill_children_processes():
# We need to kill the children, so that the underlying subprocess
# will not print the logs to the terminal, after this program
# exits.
parent_process = psutil.Process()
for child in parent_process.children(recursive=True):
try:
child.terminate()
except psutil.NoSuchProcess:
# The child process may have already been terminated.
pass


# Handle ctrl-c
def interrupt_handler(signum, frame):
del signum, frame
kill_children_processes()
subprocess_utils.kill_children_processes()
# Avoid using logger here, as it will print the stack trace for broken
# pipe, when the output is piped to another program.
print(f'{colorama.Style.DIM}Tip: The job will keep '
Expand All @@ -2030,7 +2016,7 @@ def interrupt_handler(signum, frame):
# Handle ctrl-z
def stop_handler(signum, frame):
del signum, frame
kill_children_processes()
subprocess_utils.kill_children_processes()
# Avoid using logger here, as it will print the stack trace for broken
# pipe, when the output is piped to another program.
print(f'{colorama.Style.DIM}Tip: The job will keep '
Expand Down
75 changes: 42 additions & 33 deletions sky/spot/controller.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Controller: handles the life cycle of a managed spot cluster (job)."""
import argparse
import multiprocessing
import pathlib
import signal
import time
import traceback

import colorama
import filelock
import ray

import sky
from sky import exceptions
Expand All @@ -20,6 +21,7 @@
from sky.spot import spot_state
from sky.spot import spot_utils
from sky.utils import common_utils
from sky.utils import subprocess_utils

logger = sky_logging.init_logger(__name__)

Expand Down Expand Up @@ -147,9 +149,9 @@ def run(self):
"""Run controller logic and handle exceptions."""
try:
self._run()
except KeyboardInterrupt as e:
# ray.cancel will raise KeyboardInterrupt.
logger.error(e)
except KeyboardInterrupt:
# Kill the children processes launched by log_lib.run_with_log.
subprocess_utils.kill_children_processes()
spot_state.set_cancelled(self._job_id)
except exceptions.ResourcesUnavailableError as e:
logger.error(f'Resources unavailable: {colorama.Fore.RED}{e}'
Expand All @@ -175,9 +177,18 @@ def run(self):
self._backend.teardown_ephemeral_storage(self._task)


@ray.remote(num_cpus=0)
def _run_controller(job_id: int, task_yaml: str, retry_until_up: bool):
"""Runs the controller in a remote process for interruption."""

# Override the SIGTERM handler to gracefully terminate the controller.
def handle_interupt(signum, frame):
"""Handle the interrupt signal."""
# Need to raise KeyboardInterrupt to avoid the exception being caught by
# the strategy executor.
raise KeyboardInterrupt()

signal.signal(signal.SIGTERM, handle_interupt)

# The controller needs to be instantiated in the remote process, since
# the controller is not serializable.
spot_controller = SpotController(job_id, task_yaml, retry_until_up)
Expand All @@ -187,7 +198,7 @@ def _run_controller(job_id: int, task_yaml: str, retry_until_up: bool):
def _handle_signal(job_id):
"""Handle the signal if the user sent it."""
signal_file = pathlib.Path(spot_utils.SIGNAL_FILE_PREFIX.format(job_id))
signal = None
user_signal = None
if signal_file.exists():
# Filelock is needed to prevent race condition with concurrent
# signal writing.
Expand All @@ -196,49 +207,47 @@ def _handle_signal(job_id):
# pylint: disable=abstract-class-instantiated
with filelock.FileLock(str(signal_file) + '.lock'):
with signal_file.open(mode='r') as f:
signal = f.read().strip()
user_signal = f.read().strip()
try:
signal = spot_utils.UserSignal(signal)
user_signal = spot_utils.UserSignal(user_signal)
except ValueError:
logger.warning(
f'Unknown signal received: {signal}. Ignoring.')
signal = None
f'Unknown signal received: {user_signal}. Ignoring.')
user_signal = None
# Remove the signal file, after reading the signal.
signal_file.unlink()
if signal is None:
if user_signal is None:
# None or empty string.
return
assert signal == spot_utils.UserSignal.CANCEL, (
f'Only cancel signal is supported, but {signal} got.')
raise exceptions.SpotUserCancelledError(f'User sent {signal.value} signal.')
assert user_signal == spot_utils.UserSignal.CANCEL, (
f'Only cancel signal is supported, but {user_signal} got.')
raise exceptions.SpotUserCancelledError(
f'User sent {user_signal.value} signal.')


def start(job_id, task_yaml, retry_until_up):
"""Start the controller."""
ray.init()
controller_task = None
controller_process = None
try:
_handle_signal(job_id)
controller_task = _run_controller.remote(job_id, task_yaml,
retry_until_up)
# Signal can interrupt the underlying controller process.
ready, _ = ray.wait([controller_task], timeout=0)
while not ready:
controller_process = multiprocessing.Process(target=_run_controller,
args=(job_id, task_yaml,
retry_until_up))
controller_process.start()
while controller_process.is_alive():
_handle_signal(job_id)
ready, _ = ray.wait([controller_task], timeout=1)
time.sleep(1)
except exceptions.SpotUserCancelledError:
logger.info(f'Cancelling spot job {job_id}...')
try:
if controller_task is not None:
# This will raise KeyboardInterrupt in the task.
ray.cancel(controller_task)
ray.get(controller_task)
except ray.exceptions.RayTaskError:
# When the controller task is cancelled, it will raise
# ray.exceptions.RayTaskError, which can be ignored,
# since the SpotUserCancelledError will be raised and
# handled later.
pass
if controller_process is not None:
logger.info('sending SIGTERM to controller process '
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved
f'{controller_process.pid}')
# This will raise KeyboardInterrupt in the task.
# Using SIGTERM instead of SIGINT, as the SIGINT is weirdly ignored
# by the controller process when it is started inside a ray job.
controller_process.terminate()
if controller_process is not None:
controller_process.join()


if __name__ == '__main__':
Expand Down
3 changes: 1 addition & 2 deletions sky/spot/spot_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ def cancel_job_by_name(job_name: str) -> str:
f'with name {job_name!r}.\n'
f'Job IDs: {job_ids}{colorama.Style.RESET_ALL}')
cancel_jobs_by_id(job_ids)
return (f'Job {job_name!r} is scheduled to be cancelled within '
f'{JOB_STATUS_CHECK_GAP_SECONDS} seconds.')
return f'Job {job_name!r} is scheduled to be cancelled.'


def stream_logs_by_id(job_id: int, follow: bool = True) -> str:
Expand Down
14 changes: 14 additions & 0 deletions sky/utils/subprocess_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Utility functions for subprocesses."""
from multiprocessing import pool
import psutil
import subprocess
from typing import Any, Callable, List, Optional, Union

Expand Down Expand Up @@ -74,3 +75,16 @@ def handle_returncode(returncode: int,
f'{colorama.Fore.RED}{error_msg}{colorama.Style.RESET_ALL}')
with ux_utils.print_exception_no_traceback():
raise exceptions.CommandError(returncode, command, format_err_msg)


def kill_children_processes():
# We need to kill the children, so that the underlying subprocess
# will not print the logs to the terminal, after this program
# exits.
parent_process = psutil.Process()
for child in parent_process.children(recursive=True):
try:
child.terminate()
except psutil.NoSuchProcess:
# The child process may have already been terminated.
pass
41 changes: 28 additions & 13 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def run_one_test(test: Test) -> Tuple[int, str, str]:
stdout=log_file,
stderr=subprocess.STDOUT,
shell=True,
executable='/bin/bash',
)
try:
proc.wait(timeout=test.timeout)
Expand Down Expand Up @@ -860,38 +861,52 @@ def test_spot_cancellation():
test = Test(
'managed-spot-cancellation',
[
# Test cancellation during spot cluster being launched.
f'sky spot launch --cloud aws --region {region} -n {name} "sleep 1000" -y -d',
'sleep 60',
f's=$(sky spot queue); printf "$s"; echo; echo; printf "$s" | grep {name} | head -n1 | grep "STARTING"',
# Test cancelling the spot job during launching.
f'sky spot cancel -y -n {name}',
'sleep 5',
f's=$(sky spot queue); printf "$s"; echo; echo; printf "$s" | grep {name} | head -n1 | grep "CANCELLED"',
'sleep 100',
(f'aws ec2 describe-instances --region {region} '
(f's=$(aws ec2 describe-instances --region {region} '
f'--filters Name=tag:ray-cluster-name,Values={name}* '
f'--query Reservations[].Instances[].State[].Name '
'--output text | grep terminated'),
# Test cancelling the spot job during running.
f'sky spot launch --cloud aws --region {region} -n {name}-2 "sleep 1000" -y -d',
'--output text) && printf "$s" && echo; [[ -z "$s" ]] || [[ "$s" = "terminated" ]] || [[ "$s" = "shutting-down" ]]'
),
# Test cancelling the spot cluster during spot job being setup.
f'sky spot launch --cloud aws --region {region} -n {name}-2 tests/test_yamls/long_setup.yaml -y -d',
'sleep 300',
f's=$(sky spot queue); printf "$s"; echo; echo; printf "$s" | grep {name} | head -n1 | grep "RUNNING"',
f'sky spot cancel -y -n {name}-2',
'sleep 5',
f's=$(sky spot queue); printf "$s"; echo; echo; printf "$s" | grep {name}-2 | head -n1 | grep "CANCELLED"',
'sleep 100',
(f's=$(aws ec2 describe-instances --region {region} '
f'--filters Name=tag:ray-cluster-name,Values={name}-2* '
f'--query Reservations[].Instances[].State[].Name '
'--output text) && printf "$s" && echo; [[ -z "$s" ]] || [[ "$s" = "terminated" ]] || [[ "$s" = "shutting-down" ]]'
),
# Test cancellation during spot job is recovering.
f'sky spot launch --cloud aws --region {region} -n {name}-3 "sleep 1000" -y -d',
'sleep 300',
f's=$(sky spot queue); printf "$s"; echo; echo; printf "$s" | grep {name}-3 | head -n1 | grep "RUNNING"',
# Terminate the cluster manually.
(f'aws ec2 terminate-instances --region {region} --instance-ids $('
f'aws ec2 describe-instances --region {region} '
f'--filters Name=tag:ray-cluster-name,Values={name}-2* '
f'--filters Name=tag:ray-cluster-name,Values={name}-3* '
f'--query Reservations[].Instances[].InstanceId '
'--output text)'),
'sleep 50',
f's=$(sky spot queue); printf "$s"; echo; echo; printf "$s" | grep {name}-2 | head -n1 | grep "RECOVERING"',
f'sky spot cancel -y -n {name}-2',
f's=$(sky spot queue); printf "$s"; echo; echo; printf "$s" | grep {name}-3 | head -n1 | grep "RECOVERING"',
f'sky spot cancel -y -n {name}-3',
'sleep 10',
f's=$(sky spot queue); printf "$s"; echo; echo; printf "$s" | grep {name}-2 | head -n1 | grep "CANCELLED"',
f's=$(sky spot queue); printf "$s"; echo; echo; printf "$s" | grep {name}-3 | head -n1 | grep "CANCELLED"',
'sleep 90',
(f'aws ec2 describe-instances --region {region} '
f'--filters Name=tag:ray-cluster-name,Values={name}-2* '
(f's=$(aws ec2 describe-instances --region {region} '
f'--filters Name=tag:ray-cluster-name,Values={name}-3* '
f'--query Reservations[].Instances[].State[].Name '
'--output text | grep terminated'),
'--output text) && printf "$s" && echo; [[ -z "$s" ]] || [[ "$s" = "terminated" ]] || [[ "$s" = "shutting-down" ]]'
),
])
run_one_test(test)

Expand Down
9 changes: 9 additions & 0 deletions tests/test_yamls/long_setup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
setup: |
echo long setup
for i in {1..10000}; do
echo $i
sleep 1
done

run: |
echo run