-
Notifications
You must be signed in to change notification settings - Fork 511
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Backward Compatibility][Spot] Avoid cluster leakage by ray yaml overwritten and reduce spot controller cost on AWS #1235
Changes from 8 commits
341c73b
1b55709
5411740
9d87021
8e14254
1002c38
90f5362
e1b75c0
a15841d
16391bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
from ray.autoscaler._private import util as ray_util | ||
import rich.console as rich_console | ||
import rich.progress as rich_progress | ||
import yaml | ||
|
||
import sky | ||
from sky import authentication as auth | ||
|
@@ -109,12 +110,33 @@ | |
# Remote dir that holds our runtime files. | ||
_REMOTE_RUNTIME_FILES_DIR = '~/.sky/.runtime_files' | ||
|
||
# Include the fields that will be used for generating tags that distinguishes the | ||
# cluster in ray, to avoid the stopped cluster being discarded due to updates in | ||
# the yaml template. | ||
# Some notes on the fields: | ||
# - 'provider' fields will be used for bootstrapping and insert more new items in | ||
# 'node_config'. | ||
# - keeping the auth is not enough becuase the content of the key file will be used | ||
# for calculating the hash. | ||
# TODO(zhwu): Keep in sync with the fields used in https://github.com/ray-project/ray/blob/e4ce38d001dbbe09cd21c497fedd03d692b2be3e/python/ray/autoscaler/_private/commands.py#L687-L701 | ||
_RAY_YAML_KEYS_TO_RESTORE_FOR_BACK_COMPATIBILITY = { | ||
'provider', 'auth', 'node_config' | ||
} | ||
|
||
|
||
def is_ip(s: str) -> bool: | ||
"""Returns whether this string matches IP_ADDR_REGEX.""" | ||
return len(re.findall(IP_ADDR_REGEX, s)) == 1 | ||
|
||
|
||
def _get_yaml_path_from_cluster_name(cluster_name: str, | ||
prefix: str = SKY_USER_FILE_PATH) -> str: | ||
output_path = pathlib.Path( | ||
prefix).expanduser().resolve() / f'{cluster_name}.yml' | ||
os.makedirs(output_path.parents[0], exist_ok=True) | ||
return str(output_path) | ||
|
||
|
||
def fill_template(template_name: str, | ||
variables: Dict, | ||
output_path: Optional[str] = None, | ||
|
@@ -129,10 +151,8 @@ def fill_template(template_name: str, | |
if output_path is None: | ||
assert ('cluster_name' in variables), ('cluster_name is required.') | ||
cluster_name = variables.get('cluster_name') | ||
output_path = pathlib.Path( | ||
output_prefix).expanduser() / f'{cluster_name}.yml' | ||
os.makedirs(output_path.parents[0], exist_ok=True) | ||
output_path = str(output_path) | ||
output_path = _get_yaml_path_from_cluster_name(cluster_name, | ||
output_prefix) | ||
output_path = os.path.abspath(output_path) | ||
|
||
# Add yaml file path to the template variables. | ||
|
@@ -655,6 +675,32 @@ def _remove_multinode_config( | |
break | ||
|
||
|
||
def _restore_original_block_for_yaml(new_yaml: str, old_yaml: str, | ||
Michaelvll marked this conversation as resolved.
Show resolved
Hide resolved
|
||
key_names: Set[str]) -> str: | ||
"""Replaces 'new' with 'old' for all keys in key_names. | ||
|
||
The replacement will be applied recursively and only for the blocks | ||
with the key in key_names, and have the same ancestors in both 'new' | ||
and 'old' YAML tree. | ||
""" | ||
|
||
def _restore_block(new_block: Dict[str, Any], old_block: Dict[str, Any]): | ||
for key, value in new_block.items(): | ||
if key in key_names: | ||
if key in old_block: | ||
new_block[key] = old_block[key] | ||
else: | ||
del new_block[key] | ||
elif isinstance(value, dict): | ||
if key in old_block: | ||
_restore_block(value, old_block[key]) | ||
|
||
new_config = yaml.safe_load(new_yaml) | ||
old_config = yaml.safe_load(old_yaml) | ||
_restore_block(new_config, old_config) | ||
return common_utils.dump_yaml_str(new_config) | ||
|
||
|
||
# TODO: too many things happening here - leaky abstraction. Refactor. | ||
@timeline.event | ||
def write_cluster_config(to_provision: 'resources.Resources', | ||
|
@@ -666,7 +712,8 @@ def write_cluster_config(to_provision: 'resources.Resources', | |
region: Optional[clouds.Region] = None, | ||
zones: Optional[List[clouds.Zone]] = None, | ||
auth_config: Optional[Dict[str, str]] = None, | ||
dryrun: bool = False) -> Dict[str, str]: | ||
dryrun: bool = False, | ||
force_overwrite: bool = True) -> Dict[str, str]: | ||
"""Fills in cluster configuration templates and writes them out. | ||
|
||
Returns: {provisioner: path to yaml, the provisioning spec}. | ||
|
@@ -700,6 +747,15 @@ def write_cluster_config(to_provision: 'resources.Resources', | |
auth_config = onprem_utils.get_local_auth_config(cluster_name) | ||
region_name = resources_vars.get('region') | ||
|
||
yaml_path = _get_yaml_path_from_cluster_name(cluster_name) | ||
old_yaml_content = None | ||
if os.path.exists(yaml_path): | ||
if force_overwrite: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When would be the case that the cluster didn't exist but this file exists? If this is an exceptional case to guard against, rename it to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The configure yaml files may not be correctly deleted if an error happens during |
||
os.unlink(yaml_path) | ||
else: | ||
with open(yaml_path, 'r') as f: | ||
old_yaml_content = f.read() | ||
|
||
yaml_path = fill_template( | ||
cluster_config_template, | ||
dict( | ||
|
@@ -750,6 +806,16 @@ def write_cluster_config(to_provision: 'resources.Resources', | |
# been fully tested yet. | ||
_optimize_file_mounts(yaml_path) | ||
|
||
# Restore the old yaml content for backward compatibility. | ||
if old_yaml_content is not None: | ||
with open(yaml_path, 'r') as f: | ||
new_yaml_content = f.read() | ||
restored_yaml_content = _restore_original_block_for_yaml( | ||
old_yaml_content, new_yaml_content, | ||
_RAY_YAML_KEYS_TO_RESTORE_FOR_BACK_COMPATIBILITY) | ||
with open(yaml_path, 'w') as f: | ||
f.write(restored_yaml_content) | ||
|
||
usage_lib.messages.usage.update_ray_yaml(yaml_path) | ||
# For TPU nodes. TPU VMs do not need TPU_NAME. | ||
if (resources_vars.get('tpu_type') is not None and | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it true that none of the following contribute to the launch hash?
resources
underray.head.default:
Just figuring out whether we should preserve these fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just added the
cluster_name
in the key set. For theresources
andhead_node_type
, I think it would be better to add them when we actually want to modify those fields and find them affecting backward compatibility in the future.As far as I understand, the
max_workers
,upscaling_speed
andidle_timeout_minutes
won't affect the launch hash.