diff --git a/docs/source/reference/yaml-spec.rst b/docs/source/reference/yaml-spec.rst index 586e0366cbc5..1465deff853b 100644 --- a/docs/source/reference/yaml-spec.rst +++ b/docs/source/reference/yaml-spec.rst @@ -47,6 +47,12 @@ Available fields: # Format: : (or simply , short for a count of 1). accelerators: V100:4 + # Number of vCPUs per node (optional). + # + # Format: (exactly vCPUs) or + + # (at least vCPUs). + cpus: 32 + # Instance type to use (optional). If 'accelerators' is specified, # the corresponding instance type is automatically inferred. instance_type: p3.8xlarge diff --git a/examples/example_app.py b/examples/example_app.py index 4c48147ea6b1..c26c49703438 100644 --- a/examples/example_app.py +++ b/examples/example_app.py @@ -43,7 +43,7 @@ def make_application(): sky.Resources(sky.AWS(), 'p3.2xlarge'), # 1 V100, EC2. sky.Resources(sky.AWS(), 'p3.8xlarge'), # 4 V100s, EC2. # Tuples mean all resources are required. - sky.Resources(sky.GCP(), 'n1-standard-8', 'tpu-v3-8'), + sky.Resources(sky.GCP(), 'n1-standard-8', accelerators='tpu-v3-8'), }) train_op.set_time_estimator(time_estimators.resnet50_estimate_runtime) @@ -60,8 +60,8 @@ def make_application(): infer_op.set_resources({ sky.Resources(sky.AWS(), 'inf1.2xlarge'), sky.Resources(sky.AWS(), 'p3.2xlarge'), - sky.Resources(sky.GCP(), 'n1-standard-4', 'T4'), - sky.Resources(sky.GCP(), 'n1-standard-8', 'T4'), + sky.Resources(sky.GCP(), 'n1-standard-4', accelerators='T4'), + sky.Resources(sky.GCP(), 'n1-standard-8', accelerators='T4'), }) infer_op.set_time_estimator( diff --git a/sky/cli.py b/sky/cli.py index 918d4b4db901..59dca8b5e9d2 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -174,6 +174,13 @@ def _interactive_node_cli_command(cli_func): default=None, type=str, help='Instance type to use.') + cpus = click.option( + '--cpus', + default=None, + type=str, + help=('Number of vCPUs each instance must have ' + '(e.g., ``--cpus=4`` (exactly 4) or ``--cpus=4+`` (at least 4)). ' + 'This is used to automatically select the instance type.')) gpus = click.option('--gpus', default=None, type=str, @@ -268,6 +275,7 @@ def _interactive_node_cli_command(cli_func): region_option, zone_option, instance_type_option, + cpus, *([gpus] if cli_func.__name__ == 'gpunode' else []), *([tpus] if cli_func.__name__ == 'tpunode' else []), spot_option, @@ -556,6 +564,7 @@ def _parse_override_params(cloud: Optional[str] = None, region: Optional[str] = None, zone: Optional[str] = None, gpus: Optional[str] = None, + cpus: Optional[str] = None, instance_type: Optional[str] = None, use_spot: Optional[bool] = None, image_id: Optional[str] = None, @@ -582,6 +591,11 @@ def _parse_override_params(cloud: Optional[str] = None, override_params['accelerators'] = None else: override_params['accelerators'] = gpus + if cpus is not None: + if cpus.lower() == 'none': + override_params['cpus'] = None + else: + override_params['cpus'] = cpus if instance_type is not None: if instance_type.lower() == 'none': override_params['instance_type'] = None @@ -908,6 +922,7 @@ def _make_task_from_entrypoint_with_overrides( region: Optional[str] = None, zone: Optional[str] = None, gpus: Optional[str] = None, + cpus: Optional[str] = None, instance_type: Optional[str] = None, num_nodes: Optional[int] = None, use_spot: Optional[bool] = None, @@ -949,6 +964,7 @@ def _make_task_from_entrypoint_with_overrides( region=region, zone=zone, gpus=gpus, + cpus=cpus, instance_type=instance_type, use_spot=use_spot, image_id=image_id, @@ -1090,6 +1106,13 @@ def cli(): default=False, help='If used, runs locally inside a docker container.') @_add_click_options(_TASK_OPTIONS + _EXTRA_RESOURCES_OPTIONS) +@click.option('--cpus', + default=None, + type=str, + required=False, + help=('Number of vCPUs each instance must have (e.g., ' + '``--cpus=4`` (exactly 4) or ``--cpus=4+`` (at least 4)). ' + 'This is used to automatically select the instance type.')) @click.option('--disk-size', default=None, type=int, @@ -1154,6 +1177,7 @@ def launch( region: Optional[str], zone: Optional[str], gpus: Optional[str], + cpus: Optional[str], instance_type: Optional[str], num_nodes: Optional[int], use_spot: Optional[bool], @@ -1198,6 +1222,7 @@ def launch( region=region, zone=zone, gpus=gpus, + cpus=cpus, instance_type=instance_type, num_nodes=num_nodes, use_spot=use_spot, @@ -1343,6 +1368,7 @@ def exec( region=region, zone=zone, gpus=gpus, + cpus=None, instance_type=instance_type, use_spot=use_spot, image_id=image_id, @@ -2416,11 +2442,11 @@ def _down_or_stop(name: str): # pylint: disable=redefined-outer-name def gpunode(cluster: str, yes: bool, port_forward: Optional[List[int]], cloud: Optional[str], region: Optional[str], zone: Optional[str], - instance_type: Optional[str], gpus: Optional[str], - use_spot: Optional[bool], screen: Optional[bool], - tmux: Optional[bool], disk_size: Optional[int], - idle_minutes_to_autostop: Optional[int], down: bool, - retry_until_up: bool): + instance_type: Optional[str], cpus: Optional[str], + gpus: Optional[str], use_spot: Optional[bool], + screen: Optional[bool], tmux: Optional[bool], + disk_size: Optional[int], idle_minutes_to_autostop: Optional[int], + down: bool, retry_until_up: bool): """Launch or attach to an interactive GPU node. Examples: @@ -2459,7 +2485,8 @@ def gpunode(cluster: str, yes: bool, port_forward: Optional[List[int]], user_requested_resources = not (cloud is None and region is None and zone is None and instance_type is None and - gpus is None and use_spot is None) + cpus is None and gpus is None and + use_spot is None) default_resources = _INTERACTIVE_NODE_DEFAULT_RESOURCES['gpunode'] cloud_provider = clouds.CLOUD_REGISTRY.from_str(cloud) if gpus is None and instance_type is None: @@ -2472,6 +2499,7 @@ def gpunode(cluster: str, yes: bool, port_forward: Optional[List[int]], region=region, zone=zone, instance_type=instance_type, + cpus=cpus, accelerators=gpus, use_spot=use_spot, disk_size=disk_size) @@ -2495,10 +2523,11 @@ def gpunode(cluster: str, yes: bool, port_forward: Optional[List[int]], # pylint: disable=redefined-outer-name def cpunode(cluster: str, yes: bool, port_forward: Optional[List[int]], cloud: Optional[str], region: Optional[str], zone: Optional[str], - instance_type: Optional[str], use_spot: Optional[bool], - screen: Optional[bool], tmux: Optional[bool], - disk_size: Optional[int], idle_minutes_to_autostop: Optional[int], - down: bool, retry_until_up: bool): + instance_type: Optional[str], cpus: Optional[str], + use_spot: Optional[bool], screen: Optional[bool], + tmux: Optional[bool], disk_size: Optional[int], + idle_minutes_to_autostop: Optional[int], down: bool, + retry_until_up: bool): """Launch or attach to an interactive CPU node. Examples: @@ -2536,7 +2565,7 @@ def cpunode(cluster: str, yes: bool, port_forward: Optional[List[int]], user_requested_resources = not (cloud is None and region is None and zone is None and instance_type is None and - use_spot is None) + cpus is None and use_spot is None) default_resources = _INTERACTIVE_NODE_DEFAULT_RESOURCES['cpunode'] cloud_provider = clouds.CLOUD_REGISTRY.from_str(cloud) if instance_type is None: @@ -2547,6 +2576,7 @@ def cpunode(cluster: str, yes: bool, port_forward: Optional[List[int]], region=region, zone=zone, instance_type=instance_type, + cpus=cpus, use_spot=use_spot, disk_size=disk_size) @@ -2569,11 +2599,12 @@ def cpunode(cluster: str, yes: bool, port_forward: Optional[List[int]], # pylint: disable=redefined-outer-name def tpunode(cluster: str, yes: bool, port_forward: Optional[List[int]], region: Optional[str], zone: Optional[str], - instance_type: Optional[str], tpus: Optional[str], - use_spot: Optional[bool], tpu_vm: Optional[bool], - screen: Optional[bool], tmux: Optional[bool], - disk_size: Optional[int], idle_minutes_to_autostop: Optional[int], - down: bool, retry_until_up: bool): + instance_type: Optional[str], cpus: Optional[str], + tpus: Optional[str], use_spot: Optional[bool], + tpu_vm: Optional[bool], screen: Optional[bool], + tmux: Optional[bool], disk_size: Optional[int], + idle_minutes_to_autostop: Optional[int], down: bool, + retry_until_up: bool): """Launch or attach to an interactive TPU node. Examples: @@ -2610,8 +2641,8 @@ def tpunode(cluster: str, yes: bool, port_forward: Optional[List[int]], name = _default_interactive_node_name('tpunode') user_requested_resources = not (region is None and zone is None and - instance_type is None and tpus is None and - use_spot is None) + instance_type is None and cpus is None and + tpus is None and use_spot is None) default_resources = _INTERACTIVE_NODE_DEFAULT_RESOURCES['tpunode'] accelerator_args = default_resources.accelerator_args if tpu_vm: @@ -2627,6 +2658,7 @@ def tpunode(cluster: str, yes: bool, port_forward: Optional[List[int]], region=region, zone=zone, instance_type=instance_type, + cpus=cpus, accelerators=tpus, accelerator_args=accelerator_args, use_spot=use_spot, @@ -2969,6 +3001,13 @@ def spot(): **_get_shell_complete_args(_complete_file_name)) # TODO(zhwu): Add --dryrun option to test the launch command. @_add_click_options(_TASK_OPTIONS + _EXTRA_RESOURCES_OPTIONS) +@click.option('--cpus', + default=None, + type=str, + required=False, + help=('Number of vCPUs each instance must have (e.g., ' + '``--cpus=4`` (exactly 4) or ``--cpus=4+`` (at least 4)). ' + 'This is used to automatically select the instance type.')) @click.option('--spot-recovery', default=None, type=str, @@ -3011,6 +3050,7 @@ def spot_launch( region: Optional[str], zone: Optional[str], gpus: Optional[str], + cpus: Optional[str], instance_type: Optional[str], num_nodes: Optional[int], use_spot: Optional[bool], @@ -3049,6 +3089,7 @@ def spot_launch( region=region, zone=zone, gpus=gpus, + cpus=cpus, instance_type=instance_type, num_nodes=num_nodes, use_spot=use_spot, diff --git a/sky/clouds/aws.py b/sky/clouds/aws.py index 418415040c7a..48c9282420af 100644 --- a/sky/clouds/aws.py +++ b/sky/clouds/aws.py @@ -274,10 +274,10 @@ def is_same_cloud(self, other: clouds.Cloud): return isinstance(other, AWS) @classmethod - def get_default_instance_type(cls) -> str: - # General-purpose instance with 8 vCPUs and 32 GB RAM. - # Intel Ice Lake 8375C - return 'm6i.2xlarge' + def get_default_instance_type(cls, + cpus: Optional[str] = None) -> Optional[str]: + return service_catalog.get_default_instance_type(cpus=cpus, + clouds='aws') # TODO: factor the following three methods, as they are the same logic # between Azure and AWS. @@ -334,12 +334,11 @@ def make_deploy_resources_variables( def get_feasible_launchable_resources(self, resources: 'resources_lib.Resources'): - fuzzy_candidate_list: List[str] = [] if resources.instance_type is not None: assert resources.is_launchable(), resources # Treat Resources(AWS, p3.2x, V100) as Resources(AWS, p3.2x). resources = resources.copy(accelerators=None) - return ([resources], fuzzy_candidate_list) + return ([resources], []) def _make(instance_list): resource_list = [] @@ -350,6 +349,7 @@ def _make(instance_list): # Setting this to None as AWS doesn't separately bill / # attach the accelerators. Billed as part of the VM type. accelerators=None, + cpus=None, ) resource_list.append(r) return resource_list @@ -357,9 +357,13 @@ def _make(instance_list): # Currently, handle a filter on accelerators only. accelerators = resources.accelerators if accelerators is None: - # No requirements to filter, so just return a default VM type. - return (_make([AWS.get_default_instance_type()]), - fuzzy_candidate_list) + # Return a default instance type with the given number of vCPUs. + default_instance_type = AWS.get_default_instance_type( + cpus=resources.cpus) + if default_instance_type is None: + return ([], []) + else: + return (_make([default_instance_type]), []) assert len(accelerators) == 1, resources acc, acc_count = list(accelerators.items())[0] @@ -368,6 +372,7 @@ def _make(instance_list): acc, acc_count, use_spot=resources.use_spot, + cpus=resources.cpus, region=resources.region, zone=resources.zone, clouds='aws') diff --git a/sky/clouds/azure.py b/sky/clouds/azure.py index ab1da9331c0e..299f44584572 100644 --- a/sky/clouds/azure.py +++ b/sky/clouds/azure.py @@ -94,10 +94,10 @@ def is_same_cloud(self, other): return isinstance(other, Azure) @classmethod - def get_default_instance_type(cls) -> str: - # General-purpose instance with 8 vCPUs and 32 GB RAM. - # Intel Ice Lake 8370C - return 'Standard_D8_v5' + def get_default_instance_type(cls, + cpus: Optional[str] = None) -> Optional[str]: + return service_catalog.get_default_instance_type(cpus=cpus, + clouds='azure') def _get_image_config(self, gen_version, instance_type): # az vm image list \ @@ -250,12 +250,11 @@ def get_feasible_launchable_resources(self, resources): # TODO(zhwu): our azure subscription offer ID does not support spot. # Need to support it. return ([], []) - fuzzy_candidate_list = [] if resources.instance_type is not None: assert resources.is_launchable(), resources # Treat Resources(AWS, p3.2x, V100) as Resources(AWS, p3.2x). resources = resources.copy(accelerators=None) - return ([resources], fuzzy_candidate_list) + return ([resources], []) def _make(instance_list): resource_list = [] @@ -265,16 +264,22 @@ def _make(instance_list): instance_type=instance_type, # Setting this to None as Azure doesn't separately bill / # attach the accelerators. Billed as part of the VM type. - accelerators=None) + accelerators=None, + cpus=None, + ) resource_list.append(r) return resource_list # Currently, handle a filter on accelerators only. accelerators = resources.accelerators if accelerators is None: - # No requirements to filter, so just return a default VM type. - return (_make([Azure.get_default_instance_type()]), - fuzzy_candidate_list) + # Return a default instance type with the given number of vCPUs. + default_instance_type = Azure.get_default_instance_type( + cpus=resources.cpus) + if default_instance_type is None: + return ([], []) + else: + return (_make([default_instance_type]), []) assert len(accelerators) == 1, resources acc, acc_count = list(accelerators.items())[0] @@ -282,6 +287,7 @@ def _make(instance_list): ) = service_catalog.get_instance_type_for_accelerator( acc, acc_count, + cpus=resources.cpus, use_spot=resources.use_spot, region=resources.region, zone=resources.zone, diff --git a/sky/clouds/cloud.py b/sky/clouds/cloud.py index 19b9865a911b..9c50b93b7b53 100644 --- a/sky/clouds/cloud.py +++ b/sky/clouds/cloud.py @@ -180,7 +180,18 @@ def get_accelerators_from_instance_type( raise NotImplementedError @classmethod - def get_default_instance_type(cls) -> str: + def get_default_instance_type(cls, + cpus: Optional[str] = None) -> Optional[str]: + """Returns the default instance type with the given number of vCPUs. + + For example, if cpus='4', this method returns the default instance type + with 4 vCPUs. If cpus='4+', this method returns the default instance + type with 4 or more vCPUs. + + When cpus is None, this method will never return None. + This method may return None if the cloud's default instance family + does not have a VM with the given number of vCPUs (e.g., when cpus='7'). + """ raise NotImplementedError @classmethod diff --git a/sky/clouds/gcp.py b/sky/clouds/gcp.py index 24dc3155aa22..28c793086781 100644 --- a/sky/clouds/gcp.py +++ b/sky/clouds/gcp.py @@ -279,10 +279,10 @@ def get_image_size(self, image_id: str, region: Optional[str]) -> float: raise @classmethod - def get_default_instance_type(cls) -> str: - # General-purpose instance with 8 vCPUs and 32 GB RAM. - # Intel Ice Lake 8373C or Cascade Lake 6268CL - return 'n2-standard-8' + def get_default_instance_type(cls, + cpus: Optional[str] = None) -> Optional[str]: + return service_catalog.get_default_instance_type(cpus=cpus, + clouds='gcp') @classmethod def _get_default_region(cls) -> clouds.Region: @@ -377,45 +377,73 @@ def make_deploy_resources_variables( return resources_vars def get_feasible_launchable_resources(self, resources): - fuzzy_candidate_list = [] if resources.instance_type is not None: assert resources.is_launchable(), resources - return ([resources], fuzzy_candidate_list) + return ([resources], []) - # No other resources (cpu/mem) to filter for now, so just return a - # default VM type. - host_vm_type = GCP.get_default_instance_type() - acc_dict = None - # Find instance candidates to meet user's requirements - if resources.accelerators is not None: - assert len(resources.accelerators.items( - )) == 1, 'cannot handle more than one accelerator candidates.' - acc, acc_count = list(resources.accelerators.items())[0] - (instance_list, fuzzy_candidate_list - ) = service_catalog.get_instance_type_for_accelerator( - acc, - acc_count, - use_spot=resources.use_spot, - region=resources.region, - zone=resources.zone, - clouds='gcp') - - if instance_list is None: - return ([], fuzzy_candidate_list) - assert len( - instance_list - ) == 1, f'More than one instance type matched, {instance_list}' + if resources.accelerators is None: + # Return a default instance type with the given number of vCPUs. + host_vm_type = GCP.get_default_instance_type(cpus=resources.cpus) + if host_vm_type is None: + return ([], []) + else: + r = resources.copy( + cloud=GCP(), + instance_type=host_vm_type, + accelerators=None, + cpus=None, + ) + return ([r], []) + + use_tpu_vm = False + if resources.accelerator_args is not None: + use_tpu_vm = resources.accelerator_args.get('tpu_vm', False) + # Find instance candidates to meet user's requirements + assert len(resources.accelerators.items() + ) == 1, 'cannot handle more than one accelerator candidates.' + acc, acc_count = list(resources.accelerators.items())[0] + + # For TPU VMs, the instance type is fixed to 'TPU-VM'. However, we still + # need to call the below function to get the fuzzy candidate list. + (instance_list, fuzzy_candidate_list + ) = service_catalog.get_instance_type_for_accelerator( + acc, + acc_count, + cpus=resources.cpus if not use_tpu_vm else None, + use_spot=resources.use_spot, + region=resources.region, + zone=resources.zone, + clouds='gcp') + + if instance_list is None: + return ([], fuzzy_candidate_list) + assert len( + instance_list + ) == 1, f'More than one instance type matched, {instance_list}' + + if use_tpu_vm: + host_vm_type = 'TPU-VM' + # FIXME(woosuk): This leverages the fact that TPU VMs have 96 vCPUs. + num_cpus_in_tpu_vm = 96 + if resources.cpus is not None: + if resources.cpus.endswith('+'): + cpus = float(resources.cpus[:-1]) + if cpus > num_cpus_in_tpu_vm: + return ([], fuzzy_candidate_list) + else: + cpus = float(resources.cpus) + if cpus != num_cpus_in_tpu_vm: + return ([], fuzzy_candidate_list) + else: host_vm_type = instance_list[0] - acc_dict = {acc: acc_count} - if resources.accelerator_args is not None: - use_tpu_vm = resources.accelerator_args.get('tpu_vm', False) - if use_tpu_vm: - host_vm_type = 'TPU-VM' + + acc_dict = {acc: acc_count} r = resources.copy( cloud=GCP(), instance_type=host_vm_type, accelerators=acc_dict, + cpus=None, ) return ([r], fuzzy_candidate_list) diff --git a/sky/clouds/local.py b/sky/clouds/local.py index 627358f7dc17..bad0d79b8c74 100644 --- a/sky/clouds/local.py +++ b/sky/clouds/local.py @@ -94,8 +94,9 @@ def is_same_cloud(self, other: clouds.Cloud) -> bool: return isinstance(other, Local) @classmethod - def get_default_instance_type(cls) -> str: + def get_default_instance_type(cls, cpus: Optional[str] = None) -> str: # There is only "1" instance type for local cloud: on-prem + del cpus # Unused. return Local._DEFAULT_INSTANCE_TYPE @classmethod diff --git a/sky/clouds/service_catalog/__init__.py b/sky/clouds/service_catalog/__init__.py index a722701cc53b..00680e2c2941 100644 --- a/sky/clouds/service_catalog/__init__.py +++ b/sky/clouds/service_catalog/__init__.py @@ -159,6 +159,17 @@ def get_vcpus_from_instance_type(instance_type: str, instance_type) +def get_default_instance_type(cpus: Optional[str] = None, + clouds: CloudFilter = None) -> Optional[str]: + """Returns the cloud's default instance type for the given number of vCPUs. + + For example, if cpus='4', this method returns the default instance type + with 4 vCPUs. If cpus='4+', this method returns the default instance + type with 4 or more vCPUs. + """ + return _map_clouds_catalog(clouds, 'get_default_instance_type', cpus) + + def get_accelerators_from_instance_type( instance_type: str, clouds: CloudFilter = None) -> Optional[Dict[str, int]]: @@ -170,6 +181,7 @@ def get_accelerators_from_instance_type( def get_instance_type_for_accelerator( acc_name: str, acc_count: int, + cpus: Optional[str] = None, use_spot: bool = False, region: Optional[str] = None, zone: Optional[str] = None, @@ -180,7 +192,8 @@ def get_instance_type_for_accelerator( accelerators with sorted prices and a list of candidates with fuzzy search. """ return _map_clouds_catalog(clouds, 'get_instance_type_for_accelerator', - acc_name, acc_count, use_spot, region, zone) + acc_name, acc_count, cpus, use_spot, region, + zone) def get_accelerator_hourly_cost( diff --git a/sky/clouds/service_catalog/aws_catalog.py b/sky/clouds/service_catalog/aws_catalog.py index 150e45b384e0..dfcba50848ad 100644 --- a/sky/clouds/service_catalog/aws_catalog.py +++ b/sky/clouds/service_catalog/aws_catalog.py @@ -19,6 +19,12 @@ logger = sky_logging.init_logger(__name__) +# This is the latest general-purpose instance family as of Jan 2023. +# CPU: Intel Ice Lake 8375C. +# Memory: 4 GiB RAM per 1 vCPU. +_DEFAULT_INSTANCE_FAMILY = 'm6i' +_DEFAULT_NUM_VCPUS = 8 + # Keep it synced with the frequency in # skypilot-catalog/.github/workflows/update-aws-catalog.yml _PULL_FREQUENCY_HOURS = 7 @@ -94,6 +100,14 @@ def get_vcpus_from_instance_type(instance_type: str) -> Optional[float]: return common.get_vcpus_from_instance_type_impl(_df, instance_type) +def get_default_instance_type(cpus: Optional[str] = None) -> Optional[str]: + if cpus is None: + cpus = str(_DEFAULT_NUM_VCPUS) + instance_type_prefix = f'{_DEFAULT_INSTANCE_FAMILY}.' + df = _df[_df['InstanceType'].str.startswith(instance_type_prefix)] + return common.get_instance_type_for_cpus_impl(df, cpus) + + def get_accelerators_from_instance_type( instance_type: str) -> Optional[Dict[str, int]]: return common.get_accelerators_from_instance_type_impl(_df, instance_type) @@ -102,6 +116,7 @@ def get_accelerators_from_instance_type( def get_instance_type_for_accelerator( acc_name: str, acc_count: int, + cpus: Optional[str] = None, use_spot: bool = False, region: Optional[str] = None, zone: Optional[str] = None, @@ -113,6 +128,7 @@ def get_instance_type_for_accelerator( return common.get_instance_type_for_accelerator_impl(df=_df, acc_name=acc_name, acc_count=acc_count, + cpus=cpus, use_spot=use_spot, region=region, zone=zone) diff --git a/sky/clouds/service_catalog/azure_catalog.py b/sky/clouds/service_catalog/azure_catalog.py index c07f8e8ac7c5..a3428c6791bb 100644 --- a/sky/clouds/service_catalog/azure_catalog.py +++ b/sky/clouds/service_catalog/azure_catalog.py @@ -3,6 +3,7 @@ This module loads the service catalog file and can be used to query instance types and pricing information for Azure. """ +import re from typing import Dict, List, Optional, Tuple from sky import clouds as cloud_lib @@ -11,6 +12,12 @@ _df = common.read_catalog('azure/vms.csv') +# This is the latest general-purpose instance family as of Jan 2023. +# CPU: Intel Ice Lake 8370C. +# Memory: 4 GiB RAM per 1 vCPU. +_DEFAULT_INSTANCE_FAMILY = 'D_v5' +_DEFAULT_NUM_VCPUS = 8 + def instance_type_exists(instance_type: str) -> bool: return common.instance_type_exists_impl(_df, instance_type) @@ -53,6 +60,37 @@ def get_vcpus_from_instance_type(instance_type: str) -> Optional[float]: return common.get_vcpus_from_instance_type_impl(_df, instance_type) +def _get_instance_family(instance_type: str) -> str: + if instance_type.startswith('Basic_A'): + return 'basic_a' + + assert instance_type.startswith('Standard_') + # Remove the 'Standard_' prefix. + instance_type = instance_type[len('Standard_'):] + # Remove the '_Promo' suffix if exists. + if '_Promo' in instance_type: + instance_type = instance_type[:-len('_Promo')] + + # TODO(woosuk): Use better regex. + if '-' in instance_type: + x = re.match(r'([A-Za-z]+)([0-9]+)(-)([0-9]+)(.*)', instance_type) + assert x is not None, x + instance_family = x.group(1) + '_' + x.group(5) + else: + x = re.match(r'([A-Za-z]+)([0-9]+)(.*)', instance_type) + assert x is not None, x + instance_family = x.group(1) + x.group(3) + return instance_family + + +def get_default_instance_type(cpus: Optional[str] = None) -> Optional[str]: + if cpus is None: + cpus = str(_DEFAULT_NUM_VCPUS) + df = _df[_df['InstanceType'].apply(_get_instance_family) == + _DEFAULT_INSTANCE_FAMILY] + return common.get_instance_type_for_cpus_impl(df, cpus) + + def get_accelerators_from_instance_type( instance_type: str) -> Optional[Dict[str, int]]: return common.get_accelerators_from_instance_type_impl(_df, instance_type) @@ -61,6 +99,7 @@ def get_accelerators_from_instance_type( def get_instance_type_for_accelerator( acc_name: str, acc_count: int, + cpus: Optional[str] = None, use_spot: bool = False, region: Optional[str] = None, zone: Optional[str] = None) -> Tuple[Optional[List[str]], List[str]]: @@ -74,6 +113,7 @@ def get_instance_type_for_accelerator( return common.get_instance_type_for_accelerator_impl(df=_df, acc_name=acc_name, acc_count=acc_count, + cpus=cpus, use_spot=use_spot, region=region, zone=zone) diff --git a/sky/clouds/service_catalog/common.py b/sky/clouds/service_catalog/common.py index 84b2c67be125..d0f83597b7be 100644 --- a/sky/clouds/service_catalog/common.py +++ b/sky/clouds/service_catalog/common.py @@ -270,6 +270,39 @@ def get_vcpus_from_instance_type_impl( return float(vcpus) +def _filter_with_cpus(df: pd.DataFrame, cpus: Optional[str]) -> pd.DataFrame: + if cpus is None: + return df + + # The following code is redundant with the code in resources.py::_set_cpus() + # but we add it here for safety. + if cpus.endswith('+'): + num_cpus_str = cpus[:-1] + else: + num_cpus_str = cpus + try: + num_cpus = float(num_cpus_str) + except ValueError: + with ux_utils.print_exception_no_traceback(): + raise ValueError(f'The "cpus" field should be either a number or ' + f'a string "+". Found: {cpus!r}') from None + + if cpus.endswith('+'): + return df[df['vCPUs'] >= num_cpus] + else: + return df[df['vCPUs'] == num_cpus] + + +def get_instance_type_for_cpus_impl( + df: pd.DataFrame, cpus: Optional[str] = None) -> Optional[str]: + df = _filter_with_cpus(df, cpus) + if df.empty: + return None + # Sort by the number of vCPUs and then by the price. + df = df.sort_values(by=['vCPUs', 'Price'], ascending=True) + return df['InstanceType'].iloc[0] + + def get_accelerators_from_instance_type_impl( df: pd.DataFrame, instance_type: str, @@ -289,6 +322,7 @@ def get_instance_type_for_accelerator_impl( df: pd.DataFrame, acc_name: str, acc_count: int, + cpus: Optional[str] = None, use_spot: bool = False, region: Optional[str] = None, zone: Optional[str] = None, @@ -313,6 +347,7 @@ def get_instance_type_for_accelerator_impl( f'{int(row["AcceleratorCount"])}') return (None, fuzzy_candidate_list) + result = _filter_with_cpus(result, cpus) if region is not None: result = result[result['Region'] == region] if zone is not None: diff --git a/sky/clouds/service_catalog/gcp_catalog.py b/sky/clouds/service_catalog/gcp_catalog.py index b2cb8f4b8939..01fbea392568 100644 --- a/sky/clouds/service_catalog/gcp_catalog.py +++ b/sky/clouds/service_catalog/gcp_catalog.py @@ -25,6 +25,13 @@ 'asia-east1', ] +# Default instance family for CPU-only VMs. +# This is the latest general-purpose instance family as of Jan 2023. +# CPU: Intel Ice Lake 8373C or Cascade Lake 6268CL. +# Memory: 4 GiB RAM per 1 vCPU. +_DEFAULT_INSTANCE_FAMILY = 'n2-standard' +_DEFAULT_NUM_VCPUS = 8 + # This can be switched between n1 and n2. # n2 is not allowed for launching GPUs. _DEFAULT_HOST_VM_FAMILY = 'n1' @@ -164,9 +171,19 @@ def get_vcpus_from_instance_type(instance_type: str) -> Optional[float]: return common.get_vcpus_from_instance_type_impl(_df, instance_type) +def get_default_instance_type(cpus: Optional[str] = None) -> Optional[str]: + if cpus is None: + cpus = str(_DEFAULT_NUM_VCPUS) + instance_type_prefix = f'{_DEFAULT_INSTANCE_FAMILY}-' + df = _df[_df['InstanceType'].notna()] + df = df[df['InstanceType'].str.startswith(instance_type_prefix)] + return common.get_instance_type_for_cpus_impl(df, cpus) + + def get_instance_type_for_accelerator( acc_name: str, acc_count: int, + cpus: Optional[str] = None, use_spot: bool = False, region: Optional[str] = None, zone: Optional[str] = None) -> Tuple[Optional[List[str]], List[str]]: @@ -178,20 +195,48 @@ def get_instance_type_for_accelerator( """ (instance_list, fuzzy_candidate_list) = common.get_instance_type_for_accelerator_impl( - _df, acc_name, acc_count, use_spot, region, zone) + _df, acc_name, acc_count, cpus, use_spot, region, zone) if instance_list is None: return None, fuzzy_candidate_list if acc_name in _A100_INSTANCE_TYPE_DICTS: # If A100 is used, host VM type must be A2. # https://cloud.google.com/compute/docs/gpus#a100-gpus + + # FIXME(woosuk): This uses the knowledge that the A2 machines provide + # 12 vCPUs per GPU, except for a2-megagpu-16g which has 16 GPUs. + if cpus is not None: + num_a2_cpus = min(12 * acc_count, 96) + if cpus.endswith('+'): + if num_a2_cpus < float(cpus[:-1]): + return None, [] + else: + if num_a2_cpus != float(cpus): + return None, [] return [_A100_INSTANCE_TYPE_DICTS[acc_name][acc_count]], [] + if acc_name not in _NUM_ACC_TO_NUM_CPU: acc_name = 'DEFAULT' - num_cpus = _NUM_ACC_TO_NUM_CPU[acc_name].get(acc_count, None) - # The (acc_name, acc_count) should be validated in the caller. - assert num_cpus is not None, (acc_name, acc_count) + assert _DEFAULT_HOST_VM_FAMILY == 'n1' + num_cpus = None + if cpus is None: + num_cpus = _NUM_ACC_TO_NUM_CPU[acc_name].get(acc_count, None) + else: + # FIXME(woosuk): This uses the knowledge that the N1-highmem machines + # have 2, 4, 8, 16, 32, 64, or 96 vCPUs. + for num_n1_cpus in [2, 4, 8, 16, 32, 64, 96]: + if cpus.endswith('+'): + if num_n1_cpus >= float(cpus[:-1]): + num_cpus = num_n1_cpus + break + else: + if num_n1_cpus == float(cpus): + num_cpus = num_n1_cpus + break + if num_cpus is None: + return None, [] + mem_type = 'highmem' # patches for the number of cores per GPU, as some of the combinations # are not supported by GCP. @@ -403,7 +448,7 @@ def check_accelerator_attachable_to_host(instance_type: str, acc_name, acc_count = acc[0] if acc_name.startswith('tpu-'): - # TODO(woosuk): Check max vcpus and memory for each TPU type. + # TODO(woosuk): Check max vCPUs and memory for each TPU type. assert instance_type == 'TPU-VM' or instance_type.startswith('n1-') return diff --git a/sky/execution.py b/sky/execution.py index 8164f107c33e..8eb85451bae8 100644 --- a/sky/execution.py +++ b/sky/execution.py @@ -176,6 +176,15 @@ def _execute( existing_handle = global_user_state.get_handle_from_cluster_name( cluster_name) cluster_exists = existing_handle is not None + if cluster_exists: + assert len(task.resources) == 1 + task_resources = list(task.resources)[0] + if task_resources.cpus is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError( + 'Cannot specify CPU when using an existing cluster. ' + 'CPU is only used for selecting the instance type when ' + 'creating a new cluster.') stages = stages if stages is not None else list(Stage) diff --git a/sky/optimizer.py b/sky/optimizer.py index 3a9189fa5456..7d5d4aede50b 100644 --- a/sky/optimizer.py +++ b/sky/optimizer.py @@ -960,6 +960,10 @@ def _fill_in_launchable_resources( f'{colorama.Fore.CYAN}' f'{sorted(all_fuzzy_candidates)}' f'{colorama.Style.RESET_ALL}') + elif resources.cpus is not None: + logger.info('Try specifying a different CPU count, ' + 'or add "+" to the end of the CPU count ' + 'to allow for larger instances.') launchable[resources] = _filter_out_blocked_launchable_resources( launchable[resources], blocked_resources) diff --git a/sky/resources.py b/sky/resources.py index 10baeaae93ff..f5b960c7465e 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -41,16 +41,15 @@ class Resources: # TODO: sky.Resources(requests={'mem': '16g', 'cpu': 8}) """ - # If any fields changed: - # 1. Increment the version. For backward compatibility. - # 2. Change the __setstate__ method to handle the new fields. - # 3. Modify the to_config method to handle the new fields. - _VERSION = 6 + # If any fields changed, increment the version. For backward compatibility, + # modify the __setstate__ method to handle the old version. + _VERSION = 7 def __init__( self, cloud: Optional[clouds.Cloud] = None, instance_type: Optional[str] = None, + cpus: Union[None, int, float, str] = None, accelerators: Union[None, str, Dict[str, int]] = None, accelerator_args: Optional[Dict[str, str]] = None, use_spot: Optional[bool] = None, @@ -96,10 +95,12 @@ def __init__( k.strip(): v.strip() for k, v in image_id.items() } + self._set_cpus(cpus) self._set_accelerators(accelerators, accelerator_args) self._try_validate_local() self._try_validate_instance_type() + self._try_validate_cpus() self._try_validate_accelerators() self._try_validate_spot() self._try_validate_image_id() @@ -135,6 +136,10 @@ def __repr__(self) -> str: if self.accelerator_args is not None: accelerator_args = f', accelerator_args={self.accelerator_args}' + cpus = '' + if self.cpus is not None: + cpus = f', cpus={self.cpus}' + if isinstance(self.cloud, clouds.Local): return f'{self.cloud}({self.accelerators})' @@ -160,7 +165,7 @@ def __repr__(self) -> str: hardware_str = ( f'{instance_type}{use_spot}' - f'{accelerators}{accelerator_args}{image_id}{disk_size}') + f'{cpus}{accelerators}{accelerator_args}{image_id}{disk_size}') # It may have leading ',' (for example, instance_type not set) or empty # spaces. Remove them. while hardware_str and hardware_str[0] in (',', ' '): @@ -188,6 +193,19 @@ def zone(self): def instance_type(self): return self._instance_type + @property + def cpus(self) -> Optional[str]: + """Returns the number of vCPUs that each instance must have. + + For example, cpus='4' means each instance must have exactly 4 vCPUs, + and cpus='4+' means each instance must have at least 4 vCPUs. + + (Developer note: The cpus field is only used to select the instance type + at launch time. Thus, Resources in the backend's ResourceHandle will + always have the cpus field set to None.) + """ + return self._cpus + @property def accelerators(self) -> Optional[Dict[str, int]]: """Returns the accelerators field directly or by inferring. @@ -227,6 +245,36 @@ def disk_size(self) -> int: def image_id(self) -> Optional[Dict[str, str]]: return self._image_id + def _set_cpus( + self, + cpus: Union[None, int, float, str], + ) -> None: + if cpus is None: + self._cpus = None + return + + self._cpus = str(cpus) + if isinstance(cpus, str): + if cpus.endswith('+'): + num_cpus_str = cpus[:-1] + else: + num_cpus_str = cpus + + try: + num_cpus = float(num_cpus_str) + except ValueError: + with ux_utils.print_exception_no_traceback(): + raise ValueError( + f'The "cpus" field should be either a number or ' + f'a string "+". Found: {cpus!r}') from None + else: + num_cpus = float(cpus) + + if num_cpus <= 0: + with ux_utils.print_exception_no_traceback(): + raise ValueError( + f'The "cpus" field should be positive. Found: {cpus!r}') + def _set_accelerators( self, accelerators: Union[None, str, Dict[str, int]], @@ -375,6 +423,30 @@ def _try_validate_instance_type(self) -> None: f'inferred from the instance_type {self.instance_type!r}.') self._cloud = valid_clouds[0] + def _try_validate_cpus(self) -> None: + if self.cpus is None: + return + if self.instance_type is not None: + # The assertion should be true because we have already executed + # _try_validate_instance_type() before this method. + # The _try_validate_instance_type() method infers and sets + # self.cloud if self.instance_type is not None. + assert self.cloud is not None + cpus = self.cloud.get_vcpus_from_instance_type(self.instance_type) + if self.cpus.endswith('+'): + if cpus < float(self.cpus[:-1]): + with ux_utils.print_exception_no_traceback(): + raise ValueError( + f'{self.instance_type} does not have enough vCPUs. ' + f'{self.instance_type} has {cpus} vCPUs, ' + f'but {self.cpus} is requested.') + elif cpus != float(self.cpus): + with ux_utils.print_exception_no_traceback(): + raise ValueError( + f'{self.instance_type} does not have the requested ' + f'number of vCPUs. {self.instance_type} has {cpus} ' + f'vCPUs, but {self.cpus} is requested.') + def _try_validate_accelerators(self) -> None: """Validate accelerators against the instance type and region/zone.""" acc_requested = self.accelerators @@ -526,62 +598,6 @@ def get_cost(self, seconds: float) -> float: self.accelerators, self.use_spot, self._region, self._zone) return hourly_cost * hours - def is_same_resources(self, other: 'Resources') -> bool: - """Returns whether two resources are the same. - - Returns True if they are the same, False if not. - """ - if (self.cloud is None) != (other.cloud is None): - # self and other's cloud should be both None or both not None - return False - - if self.cloud is not None and not self.cloud.is_same_cloud(other.cloud): - return False - # self.cloud == other.cloud - - if (self.region is None) != (other.region is None): - # self and other's region should be both None or both not None - return False - - if self.region is not None and self.region != other.region: - return False - # self.region <= other.region - - if (self.zone is None) != (other.zone is None): - # self and other's zone should be both None or both not None - return False - - if self.zone is not None and self.zone != other.zone: - return False - - if (self.image_id is None) != (other.image_id is None): - # self and other's image id should be both None or both not None - return False - - if (self.image_id is not None and self.image_id != other.image_id): - return False - - if (self._instance_type is not None and - self._instance_type != other.instance_type): - return False - # self._instance_type == other.instance_type - - other_accelerators = other.accelerators - accelerators = self.accelerators - if accelerators != other_accelerators: - return False - # self.accelerators == other.accelerators - - if self.accelerator_args != other.accelerator_args: - return False - # self.accelerator_args == other.accelerator_args - - if self.use_spot != other.use_spot: - return False - - # self == other - return True - def less_demanding_than(self, other: Union[List['Resources'], 'Resources'], requested_num_nodes: int = 1) -> bool: @@ -677,6 +693,7 @@ def is_empty(self) -> bool: return all([ self.cloud is None, self._instance_type is None, + self.cpus is None, self.accelerators is None, self.accelerator_args is None, not self._use_spot_specified, @@ -688,6 +705,7 @@ def copy(self, **override) -> 'Resources': resources = Resources( cloud=override.pop('cloud', self.cloud), instance_type=override.pop('instance_type', self.instance_type), + cpus=override.pop('cpus', self.cpus), accelerators=override.pop('accelerators', self.accelerators), accelerator_args=override.pop('accelerator_args', self.accelerator_args), @@ -726,6 +744,8 @@ def from_yaml_config(cls, config: Optional[Dict[str, str]]) -> 'Resources': config.pop('cloud')) if config.get('instance_type') is not None: resources_fields['instance_type'] = config.pop('instance_type') + if config.get('cpus') is not None: + resources_fields['cpus'] = str(config.pop('cpus')) if config.get('accelerators') is not None: resources_fields['accelerators'] = config.pop('accelerators') if config.get('accelerator_args') is not None: @@ -759,6 +779,7 @@ def add_if_not_none(key, value): add_if_not_none('cloud', str(self.cloud)) add_if_not_none('instance_type', self.instance_type) + add_if_not_none('cpus', self.cpus) add_if_not_none('accelerators', self.accelerators) add_if_not_none('accelerator_args', self.accelerator_args) @@ -818,6 +839,9 @@ def __setstate__(self, state): } state['_accelerators'] = accelerators + if version < 7: + self._cpus = None + image_id = state.get('_image_id', None) if isinstance(image_id, str): state['_image_id'] = {state.get('_region', None): image_id} diff --git a/sky/task.py b/sky/task.py index 613f23edb147..d9f4642232b3 100644 --- a/sky/task.py +++ b/sky/task.py @@ -448,6 +448,7 @@ def set_resources( """ if isinstance(resources, sky.Resources): resources = {resources} + # TODO(woosuk): Check if the resources are None. self.resources = resources return self diff --git a/sky/utils/schemas.py b/sky/utils/schemas.py index cdd1865aaaff..c71c83b0e8f6 100644 --- a/sky/utils/schemas.py +++ b/sky/utils/schemas.py @@ -25,6 +25,13 @@ def get_resources_schema(): 'zone': { 'type': 'string', }, + 'cpus': { + 'anyOf': [{ + 'type': 'string', + }, { + 'type': 'number', + }], + }, 'accelerators': { 'anyOf': [{ 'type': 'string', diff --git a/tests/test_optimizer_dryruns.py b/tests/test_optimizer_dryruns.py index 079d28232a02..16fd6c6e93d5 100644 --- a/tests/test_optimizer_dryruns.py +++ b/tests/test_optimizer_dryruns.py @@ -9,6 +9,15 @@ from sky import exceptions +def _test_parse_cpus(spec, expected_cpus): + with tempfile.NamedTemporaryFile('w') as f: + f.write(spec) + f.flush() + with sky.Dag(): + task = sky.Task.from_yaml(f.name) + assert list(task.resources)[0].cpus == expected_cpus + + def _test_parse_accelerators(spec, expected_accelerators): with tempfile.NamedTemporaryFile('w') as f: f.write(spec) @@ -90,6 +99,12 @@ def test_resources_gcp(monkeypatch): _test_resources_launch(monkeypatch, sky.GCP(), 'n1-standard-16') +def test_partial_cpus(monkeypatch): + _test_resources_launch(monkeypatch, cpus=4) + _test_resources_launch(monkeypatch, cpus='4') + _test_resources_launch(monkeypatch, cpus='7+') + + def test_partial_k80(monkeypatch): _test_resources_launch(monkeypatch, accelerators='K80') @@ -148,6 +163,41 @@ def test_clouds_not_enabled(monkeypatch): enabled_clouds=[sky.AWS()]) +def test_instance_type_mismatches_cpus(monkeypatch): + bad_instance_and_cpus = [ + # Actual: 8 + ('m6i.2xlarge', 4), + # Actual: 2 + ('c6i.large', 4), + ] + for instance, cpus in bad_instance_and_cpus: + with pytest.raises(ValueError) as e: + _test_resources_launch(monkeypatch, + sky.AWS(), + instance_type=instance, + cpus=cpus) + assert 'does not have the requested number of vCPUs' in str(e.value) + + +def test_instance_type_matches_cpus(monkeypatch): + _test_resources_launch(monkeypatch, + sky.AWS(), + instance_type='c6i.8xlarge', + cpus=32) + _test_resources_launch(monkeypatch, + sky.Azure(), + instance_type='Standard_E8s_v5', + cpus='8') + _test_resources_launch(monkeypatch, + sky.GCP(), + instance_type='n1-standard-8', + cpus='7+') + _test_resources_launch(monkeypatch, + sky.AWS(), + instance_type='g4dn.2xlarge', + cpus=8.0) + + def test_instance_type_mistmatches_accelerators(monkeypatch): bad_instance_and_accs = [ # Actual: V100 @@ -209,6 +259,13 @@ def test_infer_cloud_from_instance_type(monkeypatch): expected_cloud=sky.Azure()) +def test_invalid_cpus(monkeypatch): + for cloud in [sky.AWS(), sky.Azure(), sky.GCP(), None]: + with pytest.raises(ValueError) as e: + _test_resources(monkeypatch, cloud, cpus='invalid') + assert '"cpus" field should be' in str(e.value) + + def test_invalid_region(monkeypatch): for cloud in [sky.AWS(), sky.Azure(), sky.GCP()]: with pytest.raises(ValueError) as e: @@ -287,6 +344,23 @@ def test_valid_image(monkeypatch): ) +def test_parse_cpus_from_yaml(): + spec = textwrap.dedent("""\ + resources: + cpus: 1""") + _test_parse_cpus(spec, '1') + + spec = textwrap.dedent("""\ + resources: + cpus: 1.5""") + _test_parse_cpus(spec, '1.5') + + spec = textwrap.dedent("""\ + resources: + cpus: '3+' """) + _test_parse_cpus(spec, '3+') + + def test_parse_accelerators_from_yaml(): spec = textwrap.dedent("""\ resources: