Skip to content

Commit

Permalink
[Core] Make multi-node job fail fast when one fails, and output segme…
Browse files Browse the repository at this point in the history
…nt fault (#3081)

* [minor] Make job scheduling more efficient and output segment fault

* remove additional space

* fail early for multiple tasks

* fix

* Fix

* address comments

* add smoke tests

* Add setup IP and ranks

* Fix returncodes order

* Add todo

* Add comment

* Add comment back

* fix returncodes

* remove print

* Add todo in smoke test

* Fix failed run yaml

* Address comments

* use run_timestamp

* Update sky/backends/cloud_vm_ray_backend.py

Co-authored-by: Zongheng Yang <zongheng.y@gmail.com>

* address comments

* mypy

* format

* address comments

---------

Co-authored-by: Zongheng Yang <zongheng.y@gmail.com>
  • Loading branch information
Michaelvll and concretevitamin authored Feb 9, 2024
1 parent d2b2118 commit b042741
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 84 deletions.
30 changes: 27 additions & 3 deletions docs/source/running-jobs/environment-variables.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,31 @@ it available in your current shell, then using ``--env`` to pass it to SkyPilot:
SkyPilot environment variables
------------------------------------------------------------------

SkyPilot exports these environment variables for a task's execution (while ``run`` commands are running):
SkyPilot exports these environment variables for a task's execution. ``setup``
and ``run`` stages have different environment variables available.

Environment variables for ``setup``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


.. list-table::
:widths: 20 70 10
:header-rows: 1

* - Name
- Definition
- Example
* - ``SKYPILOT_SETUP_NODE_RANK``
- Rank (an integer ID from 0 to :code:`num_nodes-1`) of the node being set up.
- 0
* - ``SKYPILOT_SETUP_NODE_IPS``
- A string of IP addresses of the nodes in the cluster with the same order as the node ranks, where each line contains one IP address.
- 1.2.3.4

Since setup commands always run on all nodes of a cluster, SkyPilot ensures both of these environment variables (the ranks and the IP list) never change across multiple setups on the same cluster.

Environment variables for ``run``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. list-table::
:widths: 20 70 10
Expand Down Expand Up @@ -120,6 +144,6 @@ The values of these variables are filled in by SkyPilot at task execution time.

You can access these variables in the following ways:

* In the task YAML's ``run`` commands (a Bash script), access them using the ``${MYVAR}`` syntax;
* In the program(s) launched in ``run``, access them using the
* In the task YAML's ``setup``/``run`` commands (a Bash script), access them using the ``${MYVAR}`` syntax;
* In the program(s) launched in ``setup``/``run``, access them using the
language's standard method (e.g., ``os.environ`` for Python).
197 changes: 118 additions & 79 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,34 @@ def add_prologue(self, job_id: int, is_local: bool = False) -> None:
log_to_driver=True,
**kwargs
)
def get_or_fail(futures, pg) -> List[int]:
\"\"\"Wait for tasks, if any fails, cancel all unready.\"\"\"
returncodes = [1] * len(futures)
# Wait for 1 task to be ready.
ready, unready = ray.wait(futures)
idx = futures.index(ready[0])
returncodes[idx] = ray.get(ready[0])
while unready:
if returncodes[idx] != 0:
for task in unready:
# ray.cancel without force fails to kill tasks.
# We use force=True to kill unready tasks.
ray.cancel(task, force=True)
# Use SIGKILL=128+9 to indicate the task is forcely
# killed.
idx = futures.index(task)
returncodes[idx] = 137
break
ready, unready = ray.wait(unready)
idx = futures.index(ready[0])
returncodes[idx] = ray.get(ready[0])
# Remove the placement group after all tasks are done, so that the
# next job can be scheduled on the released resources immediately.
ray_util.remove_placement_group(pg)
sys.stdout.flush()
sys.stderr.flush()
return returncodes
run_fn = None
futures = []
"""),
Expand Down Expand Up @@ -399,7 +427,7 @@ def add_gang_scheduling_placement_group_and_setup(
with_ray=True,
use_sudo={self.is_local},
) for i in range(total_num_nodes)]
setup_returncodes = ray.get(setup_workers)
setup_returncodes = get_or_fail(setup_workers, setup_pg)
if sum(setup_returncodes) != 0:
job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED_SETUP)
# This waits for all streaming logs to finish.
Expand Down Expand Up @@ -591,25 +619,32 @@ def add_epilogue(self) -> None:

self._code += [
textwrap.dedent(f"""\
returncodes = ray.get(futures)
returncodes = get_or_fail(futures, pg)
if sum(returncodes) != 0:
job_lib.set_status({self.job_id!r}, job_lib.JobStatus.FAILED)
# This waits for all streaming logs to finish.
# Schedule the next pending job immediately to make the job
# scheduling more efficient.
job_lib.scheduler.schedule_step()
# This waits for all streaming logs to finish.
time.sleep(0.5)
reason = ''
# 139 is the return code of SIGSEGV, i.e. Segmentation Fault.
if any(r == 139 for r in returncodes):
reason = '(likely due to Segmentation Fault)'
print('ERROR: {colorama.Fore.RED}Job {self.job_id} failed with '
'return code list:{colorama.Style.RESET_ALL}',
returncodes,
reason,
file=sys.stderr,
flush=True)
# Need this to set the job status in ray job to be FAILED.
sys.exit(1)
else:
sys.stdout.flush()
sys.stderr.flush()
job_lib.set_status({self.job_id!r}, job_lib.JobStatus.SUCCEEDED)
# This waits for all streaming logs to finish.
# Schedule the next pending job immediately to make the job
# scheduling more efficient.
job_lib.scheduler.schedule_step()
# This waits for all streaming logs to finish.
time.sleep(0.5)
""")
]
Expand Down Expand Up @@ -3082,83 +3117,87 @@ def _setup(self, handle: CloudVmRayResourceHandle, task: task_lib.Task,

if task.setup is None:
return

setup_script = log_lib.make_task_bash_script(task.setup,
env_vars=task.envs)
with tempfile.NamedTemporaryFile('w', prefix='sky_setup_') as f:
f.write(setup_script)
f.flush()
setup_sh_path = f.name
setup_file = os.path.basename(setup_sh_path)
# Sync the setup script up and run it.
ip_list = handle.external_ips()
port_list = handle.external_ssh_ports()
assert ip_list is not None, 'external_ips is not cached in handle'
ssh_credentials = backend_utils.ssh_credential_from_yaml(
handle.cluster_yaml, handle.docker_user, handle.ssh_user)
# Disable connection sharing for setup script to avoid old
# connections being reused, which may cause stale ssh agent
# forwarding.
ssh_credentials.pop('ssh_control_name')
runners = command_runner.SSHCommandRunner.make_runner_list(
ip_list, port_list=port_list, **ssh_credentials)

# Need this `-i` option to make sure `source ~/.bashrc` work
setup_cmd = f'/bin/bash -i /tmp/{setup_file} 2>&1'

def _setup_node(runner: command_runner.SSHCommandRunner) -> None:
setup = task.setup
# Sync the setup script up and run it.
ip_list = handle.external_ips()
internal_ips = handle.internal_ips()
port_list = handle.external_ssh_ports()
assert ip_list is not None, 'external_ips is not cached in handle'
ssh_credentials = backend_utils.ssh_credential_from_yaml(
handle.cluster_yaml, handle.docker_user, handle.ssh_user)
# Disable connection sharing for setup script to avoid old
# connections being reused, which may cause stale ssh agent
# forwarding.
ssh_credentials.pop('ssh_control_name', None)

remote_setup_file_name = f'/tmp/sky_setup_{self.run_timestamp}'
# Need this `-i` option to make sure `source ~/.bashrc` work
setup_cmd = f'/bin/bash -i {remote_setup_file_name} 2>&1'

def _setup_node(node_id: int) -> None:
setup_envs = task.envs.copy()
setup_envs['SKYPILOT_SETUP_NODE_IPS'] = '\n'.join(internal_ips)
setup_envs['SKYPILOT_SETUP_NODE_RANK'] = str(node_id)
runner = command_runner.SSHCommandRunner(ip_list[node_id],
port=port_list[node_id],
**ssh_credentials)
setup_script = log_lib.make_task_bash_script(setup,
env_vars=setup_envs)
with tempfile.NamedTemporaryFile('w', prefix='sky_setup_') as f:
f.write(setup_script)
f.flush()
setup_sh_path = f.name
runner.rsync(source=setup_sh_path,
target=f'/tmp/{setup_file}',
target=remote_setup_file_name,
up=True,
stream_logs=False)
if detach_setup:
return
setup_log_path = os.path.join(self.log_dir,
f'setup-{runner.ip}.log')
returncode = runner.run(
setup_cmd,
log_path=setup_log_path,
process_stream=False,
)
if detach_setup:
return
setup_log_path = os.path.join(self.log_dir,
f'setup-{runner.ip}.log')
returncode = runner.run(
setup_cmd,
log_path=setup_log_path,
process_stream=False,
)

def error_message() -> str:
# Use the function to avoid tailing the file in success case
try:
last_10_lines = subprocess.run(
[
'tail', '-n10',
os.path.expanduser(setup_log_path)
],
stdout=subprocess.PIPE,
check=True).stdout.decode('utf-8')
except subprocess.CalledProcessError:
last_10_lines = None

err_msg = (
f'Failed to setup with return code {returncode}. '
f'Check the details in log: {setup_log_path}')
if last_10_lines:
err_msg += (
f'\n\n{colorama.Fore.RED}'
'****** START Last lines of setup output ******'
f'{colorama.Style.RESET_ALL}\n'
f'{last_10_lines}'
f'{colorama.Fore.RED}'
'******* END Last lines of setup output *******'
f'{colorama.Style.RESET_ALL}')
return err_msg

subprocess_utils.handle_returncode(returncode=returncode,
command=setup_cmd,
error_msg=error_message)

num_nodes = len(ip_list)
plural = 's' if num_nodes > 1 else ''
if not detach_setup:
logger.info(
f'{fore.CYAN}Running setup on {num_nodes} node{plural}.'
f'{style.RESET_ALL}')
subprocess_utils.run_in_parallel(_setup_node, runners)
def error_message() -> str:
# Use the function to avoid tailing the file in success case
try:
last_10_lines = subprocess.run(
['tail', '-n10',
os.path.expanduser(setup_log_path)],
stdout=subprocess.PIPE,
check=True).stdout.decode('utf-8')
except subprocess.CalledProcessError:
last_10_lines = None

err_msg = (f'Failed to setup with return code {returncode}. '
f'Check the details in log: {setup_log_path}')
if last_10_lines:
err_msg += (f'\n\n{colorama.Fore.RED}'
'****** START Last lines of setup output ******'
f'{colorama.Style.RESET_ALL}\n'
f'{last_10_lines}'
f'{colorama.Fore.RED}'
'******* END Last lines of setup output *******'
f'{colorama.Style.RESET_ALL}')
return err_msg

subprocess_utils.handle_returncode(returncode=returncode,
command=setup_cmd,
error_msg=error_message)

num_nodes = len(ip_list)
plural = 's' if num_nodes > 1 else ''
if not detach_setup:
logger.info(f'{fore.CYAN}Running setup on {num_nodes} node{plural}.'
f'{style.RESET_ALL}')
# TODO(zhwu): run_in_parallel uses multi-thread to run the commands,
# which can cause the program waiting for all the threads to finish,
# even if some of them raise exceptions. We should replace it with
# multi-process.
subprocess_utils.run_in_parallel(_setup_node, range(num_nodes))

if detach_setup:
# Only set this when setup needs to be run outside the self._setup()
Expand Down
4 changes: 2 additions & 2 deletions sky/utils/subprocess_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import random
import subprocess
import time
from typing import Any, Callable, List, Optional, Tuple, Union
from typing import Any, Callable, Iterable, List, Optional, Tuple, Union

import colorama
import psutil
Expand Down Expand Up @@ -50,7 +50,7 @@ def get_parallel_threads() -> int:
return max(4, cpu_count - 1)


def run_in_parallel(func: Callable, args: List[Any]) -> List[Any]:
def run_in_parallel(func: Callable, args: Iterable[Any]) -> List[Any]:
"""Run a function in parallel on a list of arguments.
The function should raise a CommandError if the command fails.
Expand Down
24 changes: 24 additions & 0 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -1505,6 +1505,30 @@ def test_multi_hostname(generic_cloud: str):
run_one_test(test)


@pytest.mark.no_scp # SCP does not support num_nodes > 1 yet
def test_multi_node_failure(generic_cloud: str):
name = _get_cluster_name()
test = Test(
'multi_node_failure',
[
# TODO(zhwu): we use multi-thread to run the commands in setup
# commands in parallel, which makes it impossible to fail fast
# when one of the nodes fails. We should fix this in the future.
# The --detach-setup version can fail fast, as the setup is
# submitted to the remote machine, which does not use multi-thread.
# Refer to the comment in `subprocess_utils.run_in_parallel`.
# f'sky launch -y -c {name} --cloud {generic_cloud} tests/test_yamls/failed_worker_setup.yaml && exit 1', # Ensure the job setup failed.
f'sky launch -y -c {name} --cloud {generic_cloud} --detach-setup tests/test_yamls/failed_worker_setup.yaml',
f'sky logs {name} 1 --status | grep FAILED_SETUP', # Ensure the job setup failed.
f'sky exec {name} tests/test_yamls/failed_worker_run.yaml',
f'sky logs {name} 2 --status | grep FAILED', # Ensure the job failed.
f'sky logs {name} 2 | grep "My hostname:" | wc -l | grep 2', # Ensure there 2 of the hosts printed their hostname.
],
f'sky down -y {name}',
)
run_one_test(test)


# ---------- Web apps with custom ports on GCP. ----------
@pytest.mark.gcp
def test_gcp_http_server_with_custom_ports():
Expand Down
11 changes: 11 additions & 0 deletions tests/test_yamls/failed_worker_run.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
resources:
cpus: 2+

num_nodes: 3

run: |
if [ "$SKYPILOT_NODE_RANK" == "1" ]; then
exit 1
fi
echo My hostname: $(hostname)
sleep 10000
16 changes: 16 additions & 0 deletions tests/test_yamls/failed_worker_setup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
resources:
cpus: 2+

num_nodes: 3

setup: |
echo "Setting up nodes"
echo "$SKYPILOT_SETUP_NODE_RANK"
if [ "$SKYPILOT_SETUP_NODE_RANK" == "1" ]; then
echo FAILING $SKYPILOT_SETUP_NODE_RANK
exit 1
fi
sleep 10000
run: |
echo Should not get here

0 comments on commit b042741

Please sign in to comment.