Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Command runner refactor and avoid source ~/.bashrc for better speed #3484

Merged
merged 109 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
109 commits
Select commit Hold shift + click to select a range
1571e4e
remove job_owner
Michaelvll Jan 27, 2024
afd5660
remove some clouds.Local related code
Michaelvll Jan 27, 2024
8314263
Remove Local cloud entirely
Michaelvll Jan 27, 2024
1a025c5
remove local cloud
Michaelvll Jan 27, 2024
c9f79b0
fix
Michaelvll Jan 27, 2024
308ca0f
slurm runner
Feb 3, 2024
f417c5f
kubernetes runner
Michaelvll Feb 14, 2024
520d457
Use command runner for kubernetes
Michaelvll Feb 14, 2024
d252582
rename back to ssh
Michaelvll Feb 14, 2024
e8bbd18
refactor runners in backend
Michaelvll Feb 14, 2024
645f067
Merge branch 'remove-local-cloud' of github.com:skypilot-org/skypilot…
Michaelvll Feb 14, 2024
0744608
Merge branch 'master' of github.com:skypilot-org/skypilot into kubern…
Michaelvll Feb 14, 2024
99aea26
fix
Michaelvll Feb 14, 2024
87ab4a8
fix
Michaelvll Feb 14, 2024
19791b1
fix rsync
Michaelvll Feb 14, 2024
84eff39
Fix runner
Michaelvll Feb 14, 2024
4c5f0e2
Fix run()
Michaelvll Feb 14, 2024
ed50603
errors and fix head runner
Michaelvll Feb 14, 2024
2355521
Merge branch 'master' of github.com:skypilot-org/skypilot into remove…
Michaelvll Feb 14, 2024
7486c0f
Merge branch 'remove-local-cloud' of github.com:skypilot-org/skypilot…
Michaelvll Feb 14, 2024
eae92cb
support different mode
Michaelvll Feb 14, 2024
07b0234
Merge branch 'master' of github.com:skypilot-org/skypilot into kubern…
Michaelvll Feb 21, 2024
8c0ffe1
Merge branch 'master' of github.com:skypilot-org/skypilot into kubern…
Michaelvll Mar 29, 2024
5a6b4c7
format
Michaelvll Mar 29, 2024
cbb7ac8
use whoami instead of $USER
Michaelvll Mar 29, 2024
872cfe5
timeline for run and rsync
Michaelvll Mar 30, 2024
69816a7
lazy imports for pandas and lazy data frame
Michaelvll Mar 30, 2024
065b80d
fix fetch_aws
Michaelvll Mar 30, 2024
3a27f61
fix fetchers
Michaelvll Mar 30, 2024
769840f
avoid sync script for task
Michaelvll Mar 30, 2024
356d851
Merge branch 'lazy-imports' of github.com:skypilot-org/skypilot into …
Michaelvll Mar 30, 2024
9cfaf09
add timeline
Michaelvll Mar 30, 2024
bf1ea40
cache cluster_info
Michaelvll Mar 30, 2024
68ebfdb
format
Michaelvll Mar 30, 2024
05ae471
cache cluster info
Michaelvll Mar 30, 2024
d6a7ef8
do not stream
Michaelvll Mar 30, 2024
9dca1a3
fix skip lines
Michaelvll Mar 30, 2024
b2ec63e
format
Michaelvll Mar 30, 2024
d2e358e
avoid source bashrc or -i for internal exec
Michaelvll Mar 30, 2024
34080c5
format
Michaelvll Mar 30, 2024
326bcb1
use -i
Michaelvll Mar 31, 2024
421e624
Add None arg
Michaelvll Mar 31, 2024
a1122ea
remove kubernets runner
Michaelvll Apr 25, 2024
7753b06
merge
Michaelvll Apr 25, 2024
6fb8711
fix
Michaelvll Apr 25, 2024
19d7016
fix fluidstack and paperspace
Michaelvll Apr 25, 2024
0b22aa7
remove k8s
Michaelvll Apr 25, 2024
26d0058
fix
Michaelvll Apr 25, 2024
16cc521
format
Michaelvll Apr 25, 2024
54131c8
update cluster info for the old clusters
Michaelvll Apr 26, 2024
ffa0312
move update
Michaelvll Apr 26, 2024
de6d6c3
fix backward compat
Michaelvll Apr 26, 2024
877a54c
fix
Michaelvll Apr 26, 2024
cafb93f
fix backward
Michaelvll Apr 26, 2024
95c4e8a
format
Michaelvll Apr 26, 2024
683b577
fix back compat
Michaelvll Apr 26, 2024
993983a
fix backward
Michaelvll Apr 26, 2024
9cb9f75
Update sky/backends/backend_utils.py
Michaelvll Apr 26, 2024
2930fcf
Update sky/utils/command_runner.py
Michaelvll Apr 26, 2024
b583bdf
Update sky/exceptions.py
Michaelvll Apr 26, 2024
4cb6f3f
address comments
Michaelvll Apr 26, 2024
0ca9f58
Merge branch 'command-runner-refactor' of github.com:skypilot-org/sky…
Michaelvll Apr 26, 2024
ec9ff75
format
Michaelvll Apr 26, 2024
cc6f5b7
format
Michaelvll Apr 26, 2024
f90254f
Merge branch 'master' of github.com:skypilot-org/skypilot into comman…
Michaelvll Apr 26, 2024
44e47cc
fix command
Michaelvll Apr 26, 2024
61510b4
Source bashrc for storage mount
Michaelvll Apr 26, 2024
a4d110e
fix mounts
Michaelvll Apr 26, 2024
90ef5a4
longer wait time for autostop on azure
Michaelvll Apr 26, 2024
735c9d2
Fix root command
Michaelvll Apr 26, 2024
8fdafb8
avoid using azure for pipeline to speed up tests
Michaelvll Apr 26, 2024
811d0d2
use aws and gcp only for the pipeline
Michaelvll Apr 26, 2024
5465306
Fix autostop for Azure
Michaelvll Apr 27, 2024
166086f
fix ray command
Michaelvll Apr 27, 2024
283e790
format
Michaelvll Apr 27, 2024
beaf3ce
source bashrc for storage copy
Michaelvll Apr 27, 2024
3e344f3
Fix logic
Michaelvll Apr 27, 2024
cb6cadc
format
Michaelvll Apr 27, 2024
6de1f10
avoid assert for ssh port
Michaelvll Apr 27, 2024
a8f9d2b
set true
Michaelvll Apr 27, 2024
d94ac4b
fix command
Michaelvll Apr 27, 2024
a6375bf
Merge branch 'master' of github.com:skypilot-org/skypilot into comman…
Michaelvll Apr 27, 2024
70d166a
format
Michaelvll Apr 27, 2024
99e143b
add source_bashrc for setup
Michaelvll Apr 28, 2024
6c67413
Fix tests
Michaelvll Apr 28, 2024
d4fd810
avoid two nodes for azure
Michaelvll Apr 28, 2024
b1bb7d3
Update sky/backends/cloud_vm_ray_backend.py
Michaelvll Apr 30, 2024
92486ce
Update sky/backends/cloud_vm_ray_backend.py
Michaelvll Apr 30, 2024
5723f9a
Add comment
Michaelvll Apr 30, 2024
3fc3352
Merge branch 'command-runner-refactor' of github.com:skypilot-org/sky…
Michaelvll Apr 30, 2024
a8fd2a6
Merge branch 'command-runner-refactor' of github.com:skypilot-org/sky…
Michaelvll Apr 30, 2024
92e5b38
format
Michaelvll Apr 30, 2024
1b6a434
Rename
Michaelvll Apr 30, 2024
4ebda95
Use cached ip for `ray status`
Michaelvll Apr 30, 2024
a56c751
reset azure test
Michaelvll Apr 30, 2024
d261cf4
fix error for cluster info loading for replica
Michaelvll May 2, 2024
600e98b
Merge branch 'master' of github.com:skypilot-org/skypilot into comman…
Michaelvll May 6, 2024
cbbf2a1
Merge branch 'master' of github.com:skypilot-org/skypilot into comman…
Michaelvll May 7, 2024
1b474e7
add comments
Michaelvll May 7, 2024
03028d0
longer time for azure stop
Michaelvll May 7, 2024
1a0cf52
Merge branch 'master' of github.com:skypilot-org/skypilot into comman…
Michaelvll May 7, 2024
307c019
fix runner
Michaelvll May 7, 2024
78b0116
longer time
Michaelvll May 7, 2024
33893aa
format
Michaelvll May 7, 2024
023adcb
remove zone for auto_restart yaml
Michaelvll May 8, 2024
74e06aa
install jq
Michaelvll May 8, 2024
7833f81
Merge branch 'master' of github.com:skypilot-org/skypilot into comman…
Michaelvll May 8, 2024
b53131e
fix source bashrc for run
Michaelvll May 8, 2024
65001c8
source bashrc for ray cluster
Michaelvll May 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 39 additions & 52 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def wrap_file_mount(cls, path: str) -> str:
def make_safe_symlink_command(cls, *, source: str, target: str) -> str:
"""Returns a command that safely symlinks 'source' to 'target'.

All intermediate directories of 'source' will be owned by $USER,
All intermediate directories of 'source' will be owned by $(whoami),
cblmemo marked this conversation as resolved.
Show resolved Hide resolved
excluding the root directory (/).

'source' must be an absolute path; both 'source' and 'target' must not
Expand All @@ -360,17 +360,17 @@ def make_safe_symlink_command(cls, *, source: str, target: str) -> str:
target)
# Below, use sudo in case the symlink needs sudo access to create.
# Prepare to create the symlink:
# 1. make sure its dir(s) exist & are owned by $USER.
# 1. make sure its dir(s) exist & are owned by $(whoami).
dir_of_symlink = os.path.dirname(source)
commands = [
# mkdir, then loop over '/a/b/c' as /a, /a/b, /a/b/c. For each,
# chown $USER on it so user can use these intermediate dirs
# chown $(whoami) on it so user can use these intermediate dirs
# (excluding /).
f'sudo mkdir -p {dir_of_symlink}',
# p: path so far
('(p=""; '
f'for w in $(echo {dir_of_symlink} | tr "/" " "); do '
'p=${p}/${w}; sudo chown $USER $p; done)')
'p=${p}/${w}; sudo chown $(whoami) $p; done)')
]
# 2. remove any existing symlink (ln -f may throw 'cannot
# overwrite directory', if the link exists and points to a
Expand All @@ -386,7 +386,7 @@ def make_safe_symlink_command(cls, *, source: str, target: str) -> str:
# Link.
f'sudo ln -s {target} {source}',
# chown. -h to affect symlinks only.
f'sudo chown -h $USER {source}',
f'sudo chown -h $(whoami) {source}',
]
return ' && '.join(commands)

Expand Down Expand Up @@ -1080,7 +1080,7 @@ def get_ready_nodes_counts(pattern, output):
def get_docker_user(ip: str, cluster_config_file: str) -> str:
"""Find docker container username."""
ssh_credentials = ssh_credential_from_yaml(cluster_config_file)
runner = command_runner.SSHCommandRunner(ip, port=22, **ssh_credentials)
runner = command_runner.SSHCommandRunner(node=(ip, 22), **ssh_credentials)
container_name = constants.DEFAULT_DOCKER_CONTAINER_NAME
whoami_returncode, whoami_stdout, whoami_stderr = runner.run(
f'sudo docker exec {container_name} whoami',
Expand Down Expand Up @@ -1113,7 +1113,7 @@ def wait_until_ray_cluster_ready(
try:
head_ip = _query_head_ip_with_retries(
cluster_config_file, max_attempts=WAIT_HEAD_NODE_IP_MAX_ATTEMPTS)
except exceptions.FetchIPError as e:
except exceptions.FetchClusterInfoError as e:
logger.error(common_utils.format_exception(e))
return False, None # failed

Expand All @@ -1129,8 +1129,7 @@ def wait_until_ray_cluster_ready(
ssh_credentials = ssh_credential_from_yaml(cluster_config_file, docker_user)
last_nodes_so_far = 0
start = time.time()
runner = command_runner.SSHCommandRunner(head_ip,
port=22,
runner = command_runner.SSHCommandRunner(node=(head_ip, 22),
**ssh_credentials)
with rich_utils.safe_status(
'[bold cyan]Waiting for workers...') as worker_status:
Expand Down Expand Up @@ -1236,7 +1235,7 @@ def ssh_credential_from_yaml(


def parallel_data_transfer_to_nodes(
runners: List[command_runner.SSHCommandRunner],
runners: List[command_runner.CommandRunner],
source: Optional[str],
target: str,
cmd: Optional[str],
Expand All @@ -1246,32 +1245,36 @@ def parallel_data_transfer_to_nodes(
# Advanced options.
log_path: str = os.devnull,
stream_logs: bool = False,
source_bashrc: bool = False,
):
"""Runs a command on all nodes and optionally runs rsync from src->dst.

Args:
runners: A list of SSHCommandRunner objects that represent multiple nodes.
runners: A list of CommandRunner objects that represent multiple nodes.
source: Optional[str]; Source for rsync on local node
target: str; Destination on remote node for rsync
cmd: str; Command to be executed on all nodes
action_message: str; Message to be printed while the command runs
log_path: str; Path to the log file
stream_logs: bool; Whether to stream logs to stdout
source_bashrc: bool; Source bashrc before running the command.
"""
fore = colorama.Fore
style = colorama.Style

origin_source = source

def _sync_node(runner: 'command_runner.SSHCommandRunner') -> None:
def _sync_node(runner: 'command_runner.CommandRunner') -> None:
if cmd is not None:
rc, stdout, stderr = runner.run(cmd,
log_path=log_path,
stream_logs=stream_logs,
require_outputs=True)
require_outputs=True,
source_bashrc=source_bashrc)
err_msg = ('Failed to run command before rsync '
f'{origin_source} -> {target}. '
'Ensure that the network is stable, then retry.')
'Ensure that the network is stable, then retry. '
f'{cmd}')
if log_path != os.devnull:
err_msg += f' See logs in {log_path}'
subprocess_utils.handle_returncode(rc,
Expand Down Expand Up @@ -1336,7 +1339,7 @@ def _query_head_ip_with_retries(cluster_yaml: str,
"""Returns the IP of the head node by querying the cloud.

Raises:
exceptions.FetchIPError: if we failed to get the head IP.
exceptions.FetchClusterInfoError: if we failed to get the head IP.
"""
backoff = common_utils.Backoff(initial_backoff=5, max_backoff_factor=5)
for i in range(max_attempts):
Expand Down Expand Up @@ -1365,8 +1368,8 @@ def _query_head_ip_with_retries(cluster_yaml: str,
break
except subprocess.CalledProcessError as e:
if i == max_attempts - 1:
raise exceptions.FetchIPError(
reason=exceptions.FetchIPError.Reason.HEAD) from e
raise exceptions.FetchClusterInfoError(
reason=exceptions.FetchClusterInfoError.Reason.HEAD) from e
# Retry if the cluster is not up yet.
logger.debug('Retrying to get head ip.')
time.sleep(backoff.current_backoff())
Expand All @@ -1391,7 +1394,7 @@ def get_node_ips(cluster_yaml: str,
IPs.

Raises:
exceptions.FetchIPError: if we failed to get the IPs. e.reason is
exceptions.FetchClusterInfoError: if we failed to get the IPs. e.reason is
HEAD or WORKER.
"""
ray_config = common_utils.read_yaml(cluster_yaml)
Expand All @@ -1412,11 +1415,12 @@ def get_node_ips(cluster_yaml: str,
'Failed to get cluster info for '
f'{ray_config["cluster_name"]} from the new provisioner '
f'with {common_utils.format_exception(e)}.')
raise exceptions.FetchIPError(
exceptions.FetchIPError.Reason.HEAD) from e
raise exceptions.FetchClusterInfoError(
exceptions.FetchClusterInfoError.Reason.HEAD) from e
if len(metadata.instances) < expected_num_nodes:
# Simulate the exception when Ray head node is not up.
raise exceptions.FetchIPError(exceptions.FetchIPError.Reason.HEAD)
raise exceptions.FetchClusterInfoError(
exceptions.FetchClusterInfoError.Reason.HEAD)
return metadata.get_feasible_ips(get_internal_ips)

if get_internal_ips:
Expand Down Expand Up @@ -1446,8 +1450,8 @@ def get_node_ips(cluster_yaml: str,
break
except subprocess.CalledProcessError as e:
if retry_cnt == worker_ip_max_attempts - 1:
raise exceptions.FetchIPError(
exceptions.FetchIPError.Reason.WORKER) from e
raise exceptions.FetchClusterInfoError(
exceptions.FetchClusterInfoError.Reason.WORKER) from e
# Retry if the ssh is not ready for the workers yet.
backoff_time = backoff.current_backoff()
logger.debug('Retrying to get worker ip '
Expand All @@ -1472,8 +1476,8 @@ def get_node_ips(cluster_yaml: str,
f'detected IP(s): {worker_ips[-n:]}.')
worker_ips = worker_ips[-n:]
else:
raise exceptions.FetchIPError(
exceptions.FetchIPError.Reason.WORKER)
raise exceptions.FetchClusterInfoError(
exceptions.FetchClusterInfoError.Reason.WORKER)
else:
worker_ips = []
return head_ip_list + worker_ips
Expand Down Expand Up @@ -1760,42 +1764,25 @@ def _update_cluster_status_no_lock(

def run_ray_status_to_check_ray_cluster_healthy() -> bool:
try:
# TODO(zhwu): This function cannot distinguish transient network
# error in ray's get IPs vs. ray runtime failing.

# NOTE: fetching the IPs is very slow as it calls into
# `ray get head-ip/worker-ips`. Using cached IPs is safe because
# in the worst case we time out in the `ray status` SSH command
# below.
external_ips = handle.cached_external_ips
runners = handle.get_command_runners(force_cached=True)
# This happens when user interrupt the `sky launch` process before
# the first time resources handle is written back to local database.
# This is helpful when user interrupt after the provision is done
# and before the skylet is restarted. After #2304 is merged, this
# helps keep the cluster status to INIT after `sky status -r`, so
# user will be notified that any auto stop/down might not be
# triggered.
if external_ips is None or len(external_ips) == 0:
if not runners:
logger.debug(f'Refreshing status ({cluster_name!r}): No cached '
f'IPs found. Handle: {handle}')
raise exceptions.FetchIPError(
reason=exceptions.FetchIPError.Reason.HEAD)

# Potentially refresh the external SSH ports, in case the existing
# cluster before #2491 was launched without external SSH ports
# cached.
external_ssh_ports = handle.external_ssh_ports()
head_ssh_port = external_ssh_ports[0]

# Check if ray cluster status is healthy.
ssh_credentials = ssh_credential_from_yaml(handle.cluster_yaml,
handle.docker_user,
handle.ssh_user)

runner = command_runner.SSHCommandRunner(external_ips[0],
**ssh_credentials,
port=head_ssh_port)
rc, output, stderr = runner.run(
raise exceptions.FetchClusterInfoError(
reason=exceptions.FetchClusterInfoError.Reason.HEAD)
head_runner = runners[0]
rc, output, stderr = head_runner.run(
instance_setup.RAY_STATUS_WITH_SKY_RAY_PORT_COMMAND,
stream_logs=False,
require_outputs=True,
Expand All @@ -1815,7 +1802,7 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
f'Refreshing status ({cluster_name!r}): ray status not showing '
f'all nodes ({ready_head + ready_workers}/'
f'{total_nodes}); output: {output}; stderr: {stderr}')
except exceptions.FetchIPError:
except exceptions.FetchClusterInfoError:
logger.debug(
f'Refreshing status ({cluster_name!r}) failed to get IPs.')
except RuntimeError as e:
Expand Down Expand Up @@ -2356,9 +2343,9 @@ def is_controller_accessible(
handle.docker_user,
handle.ssh_user)

runner = command_runner.SSHCommandRunner(handle.head_ip,
**ssh_credentials,
port=handle.head_ssh_port)
runner = command_runner.SSHCommandRunner(node=(handle.head_ip,
handle.head_ssh_port),
**ssh_credentials)
if not runner.check_connection():
error_msg = controller.value.connection_error_hint
else:
Expand Down
Loading
Loading