Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve UX for logs to include SSH name and rank #1380

Merged
merged 39 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
ed3cf06
Messy WIP
concretevitamin Nov 4, 2022
ad61a1e
Fixes two more yamls
concretevitamin Nov 5, 2022
3d3da86
Improve log UX and ensure stableness
iojw Nov 6, 2022
ead021b
Remove print statement
iojw Nov 6, 2022
9d6aeb1
Remove task name from logs
iojw Nov 6, 2022
531171f
Fix name for single-node tasks
iojw Nov 6, 2022
4f49828
Update var names and comments for clarity
iojw Nov 7, 2022
fed1dd4
Update logic for single and multi-node clusters
iojw Nov 7, 2022
c7ae34f
Cache stable cluster IP list in ResourceHandle
iojw Nov 7, 2022
31a9faa
Properly cache and invalidate stable list
iojw Nov 7, 2022
2e3523a
Merge branch 'master' into worker-id
iojw Nov 7, 2022
a5ab207
Merge branch 'master' into worker-id
iojw Nov 7, 2022
f7ec774
Add back SKYPILOT_NODE_IPS
iojw Nov 7, 2022
965a1ab
Update log file name
iojw Nov 8, 2022
ac6c4c9
Refactor backend to use cached stable IP list
iojw Nov 11, 2022
396eb09
Fix spot test
iojw Nov 11, 2022
800f5a0
Fix formatting
iojw Nov 13, 2022
dffd39f
Refactor ResourceHandle
iojw Nov 14, 2022
2edbd2c
Fixes for correctness
iojw Nov 14, 2022
f7120a7
Remove unneeded num_nodes arg
iojw Nov 19, 2022
437edf9
Merge branch 'master' into worker-id
iojw Nov 19, 2022
64a39d9
Fix _gang_schedule_ray_up
iojw Nov 19, 2022
511bca9
Ensure stable IP list is cached
iojw Nov 20, 2022
fe98d95
Formatting fixes
iojw Nov 22, 2022
5580abd
Refactor updating stable IPs to be part of handle
iojw Nov 22, 2022
ad82b80
Merge max attempts constant
iojw Nov 22, 2022
20e6e5a
Fix ordering for setting TPU name
iojw Nov 22, 2022
7a38ccd
Fix bugs and clean up code
iojw Nov 28, 2022
1ac6eb4
Fix backwards compatibility
iojw Nov 28, 2022
a7b812c
Fix bug with old autostopped clusters
iojw Nov 29, 2022
28ad6ea
Fix comment
iojw Nov 29, 2022
621aff9
Merge branch 'master' into worker-id
iojw Nov 29, 2022
2625788
Fix assertion statement
iojw Nov 29, 2022
2300b10
Update assertion message
iojw Nov 30, 2022
66986e9
Fix linting
iojw Nov 30, 2022
3ca0e56
Fix retrieving IPs for TPU vm
iojw Nov 30, 2022
19228f6
Add optimization for updating IPs
iojw Nov 30, 2022
c5ade55
Linting fix
iojw Nov 30, 2022
26ead20
Update comment
iojw Nov 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 28 additions & 27 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ def add_cluster(

Args:
cluster_name: Cluster name (see `sky status`)
ips: List of IP addresses in the cluster. First IP is head node.
ips: List of public IP addresses in the cluster. First IP is head
node.
auth_config: read_yaml(handle.cluster_yaml)['auth']
"""
username = auth_config['ssh_user']
Expand Down Expand Up @@ -468,7 +469,7 @@ def add_cluster(
def _add_multinode_config(
cls,
cluster_name: str,
worker_ips: List[str],
external_worker_ips: List[str],
auth_config: Dict[str, str],
):
username = auth_config['ssh_user']
Expand All @@ -477,13 +478,17 @@ def _add_multinode_config(
sky_autogen_comment = ('# Added by sky (use `sky stop/down '
f'{cluster_name}` to remove)')

overwrites = [False] * len(worker_ips)
overwrite_begin_idxs = [None] * len(worker_ips)
codegens = [None] * len(worker_ips)
# Ensure stableness of the aliases worker-<i> by sorting based on
# public IPs.
external_worker_ips = list(sorted(external_worker_ips))

overwrites = [False] * len(external_worker_ips)
overwrite_begin_idxs = [None] * len(external_worker_ips)
codegens = [None] * len(external_worker_ips)
worker_names = []
extra_path_name = cls.ssh_multinode_path.format(cluster_name)

for idx in range(len(worker_ips)):
for idx in range(len(external_worker_ips)):
worker_names.append(cluster_name + f'-worker{idx+1}')

config_path = os.path.expanduser(cls.ssh_conf_path)
Expand Down Expand Up @@ -527,11 +532,11 @@ def _add_multinode_config(
prev_line = config[i - 1] if i > 0 else ''
logger.warning(f'{cls.ssh_conf_path} contains '
f'host named {worker_names[idx]}.')
host_name = worker_ips[idx]
host_name = external_worker_ips[idx]
logger.warning(f'Using {host_name} to identify host instead.')
codegens[idx] = cls._get_generated_config(
sky_autogen_comment, host_name, worker_ips[idx], username,
key_path)
sky_autogen_comment, host_name, external_worker_ips[idx],
username, key_path)

# All workers go to SKY_USER_FILE_PATH/ssh/{cluster_name}
for i, line in enumerate(extra_config):
Expand All @@ -543,17 +548,17 @@ def _add_multinode_config(
overwrites[idx] = True
overwrite_begin_idxs[idx] = i - 1
codegens[idx] = cls._get_generated_config(
sky_autogen_comment, host_name, worker_ips[idx], username,
key_path)
sky_autogen_comment, host_name, external_worker_ips[idx],
username, key_path)

# This checks if all codegens have been created.
for idx, ip in enumerate(worker_ips):
for idx, ip in enumerate(external_worker_ips):
if not codegens[idx]:
codegens[idx] = cls._get_generated_config(
sky_autogen_comment, worker_names[idx], ip, username,
key_path)

for idx in range(len(worker_ips)):
for idx in range(len(external_worker_ips)):
# Add (or overwrite) the new config.
overwrite = overwrites[idx]
overwrite_begin_idx = overwrite_begin_idxs[idx]
Expand Down Expand Up @@ -1138,7 +1143,7 @@ def get_node_ips(cluster_yaml: str,
head_ip_max_attempts: int = 1,
worker_ip_max_attempts: int = 1,
get_internal_ips: bool = False) -> List[str]:
"""Returns the IPs of all nodes in the cluster."""
"""Returns the IPs of all nodes in the cluster, with head node at front."""
iojw marked this conversation as resolved.
Show resolved Hide resolved

# When ray up launches TPU VM Pod, Pod workers (except for the head)
# won't be connected to Ray cluster. Thus "ray get-worker-ips"
Expand All @@ -1147,12 +1152,10 @@ def get_node_ips(cluster_yaml: str,
ray_config = common_utils.read_yaml(cluster_yaml)
use_tpu_vm = ray_config['provider'].get('_has_tpus', False)
if use_tpu_vm:
return _get_tpu_vm_pod_ips(ray_config, get_internal_ips)

# Try optimize for the common case where we have 1 node.
if (expected_num_nodes == 1 and handle is not None and
handle.head_ip is not None):
return [handle.head_ip]
ips = _get_tpu_vm_pod_ips(ray_config, get_internal_ips)
assert expected_num_nodes == 1, 'TPU VM only supports single node for now.'
if len(ips) != expected_num_nodes:
raise exceptions.FetchIPError(exceptions.FetchIPError.Reason.HEAD)

if get_internal_ips:
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
Expand Down Expand Up @@ -1257,8 +1260,6 @@ def get_head_ip(
max_attempts: int = 1,
) -> str:
"""Returns the ip of the head node."""
assert not use_cached_head_ip or max_attempts == 1, (
'Cannot use cached_head_ip when max_attempts is not 1')
if use_cached_head_ip:
if handle.head_ip is None:
# This happens for INIT clusters (e.g., exit 1 in setup).
Expand Down Expand Up @@ -1615,26 +1616,26 @@ def _update_cluster_status_no_lock(
try:
# TODO(zhwu): This function cannot distinguish transient network error
# in ray's get IPs vs. ray runtime failing.
ips = get_node_ips(handle.cluster_yaml, handle.launched_nodes)
external_ips = handle.external_ips(use_cached_ips=False)
# This happens to a stopped TPU VM as we use gcloud to query the IP.
if len(ips) == 0:
if len(external_ips) == 0:
raise exceptions.FetchIPError(
reason=exceptions.FetchIPError.Reason.HEAD)
if handle.launched_nodes == 1:
# Check the ray cluster status. We have to check it for single node
# case, since the get_node_ips() does not require ray cluster to be
# running.
ssh_credentials = ssh_credential_from_yaml(handle.cluster_yaml)
runner = command_runner.SSHCommandRunner(ips[0], **ssh_credentials)
runner = command_runner.SSHCommandRunner(external_ips[0],
**ssh_credentials)
returncode = runner.run('ray status', stream_logs=False)
if returncode:
raise exceptions.FetchIPError(
reason=exceptions.FetchIPError.Reason.HEAD)
# If we get node ips correctly, the cluster is UP. It is safe to
# set the status to UP, as the `get_node_ips` function uses ray
# set the status to UP, as the `handle.external_ips` function uses ray
# to fetch IPs and starting ray is the final step of sky launch.
record['status'] = global_user_state.ClusterStatus.UP
iojw marked this conversation as resolved.
Show resolved Hide resolved
handle.head_ip = ips[0]
global_user_state.add_or_update_cluster(cluster_name,
handle,
ready=True,
Expand Down
Loading