Skip to content

Commit

Permalink
[Core] Task level config (#3689)
Browse files Browse the repository at this point in the history
* Add docker run options

* Add docs

* Add warning for docker run options in kubernetes

* Add experimental config

* fix

* rename vars

* type

* format

* wip

* rename and add tests

* Fixes and add tests

* format

* Assert for override configs specification

* format

* Add comments

* fix

* fix assertions

* fix assertions

* Fix test

* fix

* remove unsupported keys

* format
  • Loading branch information
Michaelvll authored Jul 10, 2024
1 parent 38b101d commit 5e23f16
Show file tree
Hide file tree
Showing 12 changed files with 425 additions and 80 deletions.
33 changes: 8 additions & 25 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,23 +874,8 @@ def write_cluster_config(
f'open(os.path.expanduser("{constants.SKY_REMOTE_RAY_PORT_FILE}"), "w", encoding="utf-8"))\''
)

# Docker run options
docker_run_options = skypilot_config.get_nested(('docker', 'run_options'),
[])
if isinstance(docker_run_options, str):
docker_run_options = [docker_run_options]
if docker_run_options and isinstance(to_provision.cloud, clouds.Kubernetes):
logger.warning(f'{colorama.Style.DIM}Docker run options are specified, '
'but ignored for Kubernetes: '
f'{" ".join(docker_run_options)}'
f'{colorama.Style.RESET_ALL}')

# Use a tmp file path to avoid incomplete YAML file being re-used in the
# future.
initial_setup_commands = []
if (skypilot_config.get_nested(('nvidia_gpus', 'disable_ecc'), False) and
to_provision.accelerators is not None):
initial_setup_commands.append(constants.DISABLE_GPU_ECC_COMMAND)
tmp_yaml_path = yaml_path + '.tmp'
common_utils.fill_template(
cluster_config_template,
Expand Down Expand Up @@ -922,8 +907,6 @@ def write_cluster_config(
# currently only used by GCP.
'specific_reservations': specific_reservations,

# Initial setup commands.
'initial_setup_commands': initial_setup_commands,
# Conda setup
'conda_installation_commands':
constants.CONDA_INSTALLATION_COMMANDS,
Expand All @@ -935,9 +918,6 @@ def write_cluster_config(
wheel_hash).replace('{cloud}',
str(cloud).lower())),

# Docker
'docker_run_options': docker_run_options,

# Port of Ray (GCS server).
# Ray's default port 6379 is conflicted with Redis.
'ray_port': constants.SKY_REMOTE_RAY_PORT,
Expand Down Expand Up @@ -976,17 +956,20 @@ def write_cluster_config(
output_path=tmp_yaml_path)
config_dict['cluster_name'] = cluster_name
config_dict['ray'] = yaml_path

# Add kubernetes config fields from ~/.sky/config
if isinstance(cloud, clouds.Kubernetes):
kubernetes_utils.combine_pod_config_fields(
tmp_yaml_path,
cluster_config_overrides=to_provision.cluster_config_overrides)
kubernetes_utils.combine_metadata_fields(tmp_yaml_path)

if dryrun:
# If dryrun, return the unfinished tmp yaml path.
config_dict['ray'] = tmp_yaml_path
return config_dict
_add_auth_to_cluster_config(cloud, tmp_yaml_path)

# Add kubernetes config fields from ~/.sky/config
if isinstance(cloud, clouds.Kubernetes):
kubernetes_utils.combine_pod_config_fields(tmp_yaml_path)
kubernetes_utils.combine_metadata_fields(tmp_yaml_path)

# Restore the old yaml content for backward compatibility.
if os.path.exists(yaml_path) and keep_launch_fields_in_existing_config:
with open(yaml_path, 'r', encoding='utf-8') as f:
Expand Down
4 changes: 2 additions & 2 deletions sky/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def get_all_clouds():
# Use allowed_clouds from config if it exists, otherwise check all clouds.
# Also validate names with get_cloud_tuple.
config_allowed_cloud_names = [
get_cloud_tuple(c)[0] for c in skypilot_config.get_nested(
['allowed_clouds'], get_all_clouds())
get_cloud_tuple(c)[0] for c in skypilot_config.get_nested((
'allowed_clouds',), get_all_clouds())
]
# Use disallowed_cloud_names for logging the clouds that will be disabled
# because they are not included in allowed_clouds in config.yaml.
Expand Down
10 changes: 7 additions & 3 deletions sky/clouds/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,10 @@ def _unsupported_features_for_resources(
# because `skypilot_config` may change for an existing cluster.
# Clusters created with MIG (only GPU clusters) cannot be stopped.
if (skypilot_config.get_nested(
('gcp', 'managed_instance_group'), None) is not None and
resources.accelerators):
('gcp', 'managed_instance_group'),
None,
override_configs=resources.cluster_config_overrides) is not None
and resources.accelerators):
unsupported[clouds.CloudImplementationFeatures.STOP] = (
'Managed Instance Group (MIG) does not support stopping yet.')
unsupported[clouds.CloudImplementationFeatures.SPOT_INSTANCE] = (
Expand Down Expand Up @@ -506,7 +508,9 @@ def make_deploy_resources_variables(
resources_vars['tpu_node_name'] = tpu_node_name

managed_instance_group_config = skypilot_config.get_nested(
('gcp', 'managed_instance_group'), None)
('gcp', 'managed_instance_group'),
None,
override_configs=resources.cluster_config_overrides)
use_mig = managed_instance_group_config is not None
resources_vars['gcp_use_managed_instance_group'] = use_mig
# Convert boolean to 0 or 1 in string, as GCP does not support boolean
Expand Down
22 changes: 12 additions & 10 deletions sky/clouds/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,6 @@ class Kubernetes(clouds.Cloud):

SKY_SSH_KEY_SECRET_NAME = 'sky-ssh-keys'
SKY_SSH_JUMP_NAME = 'sky-ssh-jump-pod'
# Timeout for resource provisioning. This timeout determines how long to
# wait for pod to be in pending status before giving up.
# Larger timeout may be required for autoscaling clusters, since autoscaler
# may take some time to provision new nodes.
# Note that this timeout includes time taken by the Kubernetes scheduler
# itself, which can be upto 2-3 seconds.
# For non-autoscaling clusters, we conservatively set this to 10s.
timeout = skypilot_config.get_nested(['kubernetes', 'provision_timeout'],
10)

# Limit the length of the cluster name to avoid exceeding the limit of 63
# characters for Kubernetes resources. We limit to 42 characters (63-21) to
Expand Down Expand Up @@ -309,14 +300,25 @@ def make_deploy_resources_variables(
if resources.use_spot:
spot_label_key, spot_label_value = kubernetes_utils.get_spot_label()

# Timeout for resource provisioning. This timeout determines how long to
# wait for pod to be in pending status before giving up.
# Larger timeout may be required for autoscaling clusters, since
# autoscaler may take some time to provision new nodes.
# Note that this timeout includes time taken by the Kubernetes scheduler
# itself, which can be upto 2-3 seconds.
# For non-autoscaling clusters, we conservatively set this to 10s.
timeout = skypilot_config.get_nested(
('kubernetes', 'provision_timeout'),
10,
override_configs=resources.cluster_config_overrides)
deploy_vars = {
'instance_type': resources.instance_type,
'custom_resources': custom_resources,
'region': region.name,
'cpus': str(cpus),
'memory': str(mem),
'accelerator_count': str(acc_count),
'timeout': str(self.timeout),
'timeout': str(timeout),
'k8s_namespace':
kubernetes_utils.get_current_kube_config_context_namespace(),
'k8s_port_mode': port_mode.value,
Expand Down
22 changes: 16 additions & 6 deletions sky/provision/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1367,9 +1367,10 @@ def merge_dicts(source: Dict[Any, Any], destination: Dict[Any, Any]):
elif isinstance(value, list) and key in destination:
assert isinstance(destination[key], list), \
f'Expected {key} to be a list, found {destination[key]}'
if key == 'containers':
# If the key is 'containers', we take the first and only
# container in the list and merge it.
if key in ['containers', 'imagePullSecrets']:
# If the key is 'containers' or 'imagePullSecrets, we take the
# first and only container/secret in the list and merge it, as
# we only support one container per pod.
assert len(value) == 1, \
f'Expected only one container, found {value}'
merge_dicts(value[0], destination[key][0])
Expand All @@ -1392,7 +1393,10 @@ def merge_dicts(source: Dict[Any, Any], destination: Dict[Any, Any]):
destination[key] = value


def combine_pod_config_fields(cluster_yaml_path: str) -> None:
def combine_pod_config_fields(
cluster_yaml_path: str,
cluster_config_overrides: Dict[str, Any],
) -> None:
"""Adds or updates fields in the YAML with fields from the ~/.sky/config's
kubernetes.pod_spec dict.
This can be used to add fields to the YAML that are not supported by
Expand Down Expand Up @@ -1434,8 +1438,14 @@ def combine_pod_config_fields(cluster_yaml_path: str) -> None:
with open(cluster_yaml_path, 'r', encoding='utf-8') as f:
yaml_content = f.read()
yaml_obj = yaml.safe_load(yaml_content)
# We don't use override_configs in `skypilot_config.get_nested`, as merging
# the pod config requires special handling.
kubernetes_config = skypilot_config.get_nested(('kubernetes', 'pod_config'),
{})
default_value={},
override_configs={})
override_pod_config = (cluster_config_overrides.get('kubernetes', {}).get(
'pod_config', {}))
merge_dicts(override_pod_config, kubernetes_config)

# Merge the kubernetes config into the YAML for both head and worker nodes.
merge_dicts(
Expand Down Expand Up @@ -1567,7 +1577,7 @@ def get_head_pod_name(cluster_name_on_cloud: str):
def get_autoscaler_type(
) -> Optional[kubernetes_enums.KubernetesAutoscalerType]:
"""Returns the autoscaler type by reading from config"""
autoscaler_type = skypilot_config.get_nested(['kubernetes', 'autoscaler'],
autoscaler_type = skypilot_config.get_nested(('kubernetes', 'autoscaler'),
None)
if autoscaler_type is not None:
autoscaler_type = kubernetes_enums.KubernetesAutoscalerType(
Expand Down
53 changes: 50 additions & 3 deletions sky/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class Resources:
"""
# If any fields changed, increment the version. For backward compatibility,
# modify the __setstate__ method to handle the old version.
_VERSION = 18
_VERSION = 19

def __init__(
self,
Expand All @@ -68,6 +68,7 @@ def __init__(
_docker_login_config: Optional[docker_utils.DockerLoginConfig] = None,
_is_image_managed: Optional[bool] = None,
_requires_fuse: Optional[bool] = None,
_cluster_config_overrides: Optional[Dict[str, Any]] = None,
):
"""Initialize a Resources object.
Expand Down Expand Up @@ -218,6 +219,8 @@ def __init__(

self._requires_fuse = _requires_fuse

self._cluster_config_overrides = _cluster_config_overrides

self._set_cpus(cpus)
self._set_memory(memory)
self._set_accelerators(accelerators, accelerator_args)
Expand Down Expand Up @@ -448,6 +451,12 @@ def requires_fuse(self) -> bool:
return False
return self._requires_fuse

@property
def cluster_config_overrides(self) -> Dict[str, Any]:
if self._cluster_config_overrides is None:
return {}
return self._cluster_config_overrides

@requires_fuse.setter
def requires_fuse(self, value: Optional[bool]) -> None:
self._requires_fuse = value
Expand Down Expand Up @@ -1011,13 +1020,39 @@ def make_deploy_variables(self, cluster_name_on_cloud: str,
cloud.make_deploy_resources_variables() method, and the cloud-agnostic
variables are generated by this method.
"""
# Initial setup commands
initial_setup_commands = []
if (skypilot_config.get_nested(
('nvidia_gpus', 'disable_ecc'),
False,
override_configs=self.cluster_config_overrides) and
self.accelerators is not None):
initial_setup_commands = [constants.DISABLE_GPU_ECC_COMMAND]

# Docker run options
docker_run_options = skypilot_config.get_nested(
('docker', 'run_options'),
default_value=[],
override_configs=self.cluster_config_overrides)
if isinstance(docker_run_options, str):
docker_run_options = [docker_run_options]
if docker_run_options and isinstance(self.cloud, clouds.Kubernetes):
logger.warning(
f'{colorama.Style.DIM}Docker run options are specified, '
'but ignored for Kubernetes: '
f'{" ".join(docker_run_options)}'
f'{colorama.Style.RESET_ALL}')

docker_image = self.extract_docker_image()

# Cloud specific variables
cloud_specific_variables = self.cloud.make_deploy_resources_variables(
self, cluster_name_on_cloud, region, zones, dryrun)
docker_image = self.extract_docker_image()
return dict(
cloud_specific_variables,
**{
# Docker config
'docker_run_options': docker_run_options,
# Docker image. The image name used to pull the image, e.g.
# ubuntu:latest.
'docker_image': docker_image,
Expand All @@ -1027,7 +1062,9 @@ def make_deploy_variables(self, cluster_name_on_cloud: str,
constants.DEFAULT_DOCKER_CONTAINER_NAME,
# Docker login config (if any). This helps pull the image from
# private registries.
'docker_login_config': self._docker_login_config
'docker_login_config': self._docker_login_config,
# Initial setup commands.
'initial_setup_commands': initial_setup_commands,
})

def get_reservations_available_resources(self) -> Dict[str, int]:
Expand Down Expand Up @@ -1208,6 +1245,8 @@ def copy(self, **override) -> 'Resources':
_is_image_managed=override.pop('_is_image_managed',
self._is_image_managed),
_requires_fuse=override.pop('_requires_fuse', self._requires_fuse),
_cluster_config_overrides=override.pop(
'_cluster_config_overrides', self._cluster_config_overrides),
)
assert len(override) == 0
return resources
Expand Down Expand Up @@ -1367,6 +1406,8 @@ def _from_yaml_config_single(cls, config: Dict[str, str]) -> 'Resources':
resources_fields['_is_image_managed'] = config.pop(
'_is_image_managed', None)
resources_fields['_requires_fuse'] = config.pop('_requires_fuse', None)
resources_fields['_cluster_config_overrides'] = config.pop(
'_cluster_config_overrides', None)

if resources_fields['cpus'] is not None:
resources_fields['cpus'] = str(resources_fields['cpus'])
Expand Down Expand Up @@ -1410,6 +1451,8 @@ def add_if_not_none(key, value):
if self._docker_login_config is not None:
config['_docker_login_config'] = dataclasses.asdict(
self._docker_login_config)
add_if_not_none('_cluster_config_overrides',
self._cluster_config_overrides)
if self._is_image_managed is not None:
config['_is_image_managed'] = self._is_image_managed
if self._requires_fuse is not None:
Expand Down Expand Up @@ -1525,4 +1568,8 @@ def __setstate__(self, state):
if version < 18:
self._job_recovery = state.pop('_spot_recovery', None)

if version < 19:
self._cluster_config_overrides = state.pop(
'_cluster_config_overrides', None)

self.__dict__.update(state)
12 changes: 12 additions & 0 deletions sky/skylet/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Constants for SkyPilot."""
from typing import List, Tuple

from packaging import version

import sky
Expand Down Expand Up @@ -261,3 +263,13 @@
# Placeholder for the SSH user in proxy command, replaced when the ssh_user is
# known after provisioning.
SKY_SSH_USER_PLACEHOLDER = 'skypilot:ssh_user'

# The keys that can be overridden in the `~/.sky/config.yaml` file. The
# overrides are specified in task YAMLs.
OVERRIDEABLE_CONFIG_KEYS: List[Tuple[str, ...]] = [
('docker', 'run_options'),
('nvidia_gpus', 'disable_ecc'),
('kubernetes', 'pod_config'),
('kubernetes', 'provision_timeout'),
('gcp', 'managed_instance_group'),
]
Loading

0 comments on commit 5e23f16

Please sign in to comment.