From 1474d988fa63cbbb3b200634719bc245cab6a448 Mon Sep 17 00:00:00 2001 From: A Vertex SDK engineer Date: Wed, 3 Jul 2024 12:30:03 -0700 Subject: [PATCH] feat: Migrate DeploymentResourcePool and associated functionality to V1 namespace for GA launch of model co-hosting. PiperOrigin-RevId: 649164615 --- google/cloud/aiplatform/compat/__init__.py | 8 + .../aiplatform/compat/services/__init__.py | 4 + .../cloud/aiplatform/compat/types/__init__.py | 4 + google/cloud/aiplatform/models.py | 702 +++++++++++++++--- google/cloud/aiplatform/preview/models.py | 4 +- google/cloud/aiplatform/utils/__init__.py | 7 +- .../test_deployment_resource_pools.py | 12 +- tests/unit/aiplatform/test_endpoints.py | 77 +- tests/unit/aiplatform/test_models.py | 76 +- 9 files changed, 785 insertions(+), 109 deletions(-) diff --git a/google/cloud/aiplatform/compat/__init__.py b/google/cloud/aiplatform/compat/__init__.py index b3fee8f643..2c460ddcc1 100644 --- a/google/cloud/aiplatform/compat/__init__.py +++ b/google/cloud/aiplatform/compat/__init__.py @@ -80,6 +80,9 @@ types.dataset_service = types.dataset_service_v1beta1 types.deployed_model_ref = types.deployed_model_ref_v1beta1 types.deployment_resource_pool = types.deployment_resource_pool_v1beta1 + types.deployment_resource_pool_service = ( + types.deployment_resource_pool_service_v1beta1 + ) types.encryption_spec = types.encryption_spec_v1beta1 types.endpoint = types.endpoint_v1beta1 types.endpoint_service = types.endpoint_service_v1beta1 @@ -159,6 +162,9 @@ if DEFAULT_VERSION == V1: services.dataset_service_client = services.dataset_service_client_v1 + services.deployment_resource_pool_service_client = ( + services.deployment_resource_pool_service_client_v1 + ) services.endpoint_service_client = services.endpoint_service_client_v1 services.feature_online_store_admin_service_client = ( services.feature_online_store_admin_service_client_v1 @@ -205,6 +211,8 @@ types.dataset = types.dataset_v1 types.dataset_service = types.dataset_service_v1 types.deployed_model_ref = types.deployed_model_ref_v1 + types.deployment_resource_pool = types.deployment_resource_pool_v1 + types.deployment_resource_pool_service = types.deployment_resource_pool_service_v1 types.encryption_spec = types.encryption_spec_v1 types.endpoint = types.endpoint_v1 types.endpoint_service = types.endpoint_service_v1 diff --git a/google/cloud/aiplatform/compat/services/__init__.py b/google/cloud/aiplatform/compat/services/__init__.py index 497e762403..f87ac22475 100644 --- a/google/cloud/aiplatform/compat/services/__init__.py +++ b/google/cloud/aiplatform/compat/services/__init__.py @@ -116,6 +116,9 @@ from google.cloud.aiplatform_v1.services.dataset_service import ( client as dataset_service_client_v1, ) +from google.cloud.aiplatform_v1.services.deployment_resource_pool_service import ( + client as deployment_resource_pool_service_client_v1, +) from google.cloud.aiplatform_v1.services.endpoint_service import ( client as endpoint_service_client_v1, ) @@ -180,6 +183,7 @@ __all__ = ( # v1 dataset_service_client_v1, + deployment_resource_pool_service_client_v1, endpoint_service_client_v1, feature_online_store_service_client_v1, feature_online_store_admin_service_client_v1, diff --git a/google/cloud/aiplatform/compat/types/__init__.py b/google/cloud/aiplatform/compat/types/__init__.py index 20bf0de92a..5fa73056bc 100644 --- a/google/cloud/aiplatform/compat/types/__init__.py +++ b/google/cloud/aiplatform/compat/types/__init__.py @@ -124,6 +124,8 @@ dataset_service as dataset_service_v1, deployed_index_ref as matching_engine_deployed_index_ref_v1, deployed_model_ref as deployed_model_ref_v1, + deployment_resource_pool as deployment_resource_pool_v1, + deployment_resource_pool_service as deployment_resource_pool_service_v1, encryption_spec as encryption_spec_v1, endpoint as endpoint_v1, endpoint_service as endpoint_service_v1, @@ -206,6 +208,8 @@ dataset_v1, dataset_service_v1, deployed_model_ref_v1, + deployment_resource_pool_v1, + deployment_resource_pool_service_v1, encryption_spec_v1, endpoint_v1, endpoint_service_v1, diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index 12c65b82f7..fd2ff225cf 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import itertools import json import pathlib import re @@ -53,8 +54,13 @@ from google.cloud.aiplatform.utils import _ipython_utils from google.cloud.aiplatform import model_evaluation from google.cloud.aiplatform.compat.services import endpoint_service_client +from google.cloud.aiplatform.compat.services import ( + deployment_resource_pool_service_client, +) from google.cloud.aiplatform.compat.types import ( + deployment_resource_pool as gca_deployment_resource_pool_compat, + deployed_model_ref as gca_deployed_model_ref_compat, encryption_spec as gca_encryption_spec, endpoint as gca_endpoint_compat, explanation as gca_explanation_compat, @@ -166,6 +172,393 @@ class Prediction(NamedTuple): explanations: Optional[Sequence[gca_explanation_compat.Explanation]] = None +class DeploymentResourcePool(base.VertexAiResourceNounWithFutureManager): + client_class = utils.DeploymentResourcePoolClientWithOverride + _resource_noun = "deploymentResourcePools" + _getter_method = "get_deployment_resource_pool" + _list_method = "list_deployment_resource_pools" + _delete_method = "delete_deployment_resource_pool" + _parse_resource_name_method = "parse_deployment_resource_pool_path" + _format_resource_name_method = "deployment_resource_pool_path" + + def __init__( + self, + deployment_resource_pool_name: str, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + ): + """Retrieves a DeploymentResourcePool. + + Args: + deployment_resource_pool_name (str): + Required. The fully-qualified resource name or ID of the + deployment resource pool. Example: + "projects/123/locations/us-central1/deploymentResourcePools/456" + or "456" when project and location are initialized or passed. + project (str): + Optional. Project containing the deployment resource pool to + retrieve. If not set, the project given to `aiplatform.init` + will be used. + location (str): + Optional. Location containing the deployment resource pool to + retrieve. If not set, the location given to `aiplatform.init` + will be used. + credentials: Optional[auth_credentials.Credentials]=None, + Custom credentials to use to retrieve the deployment resource + pool. If not set, the credentials given to `aiplatform.init` + will be used. + """ + + super().__init__( + project=project, + location=location, + credentials=credentials, + resource_name=deployment_resource_pool_name, + ) + + deployment_resource_pool_name = utils.full_resource_name( + resource_name=deployment_resource_pool_name, + resource_noun=self._resource_noun, + parse_resource_name_method=self._parse_resource_name, + format_resource_name_method=self._format_resource_name, + project=project, + location=location, + ) + + self._gca_resource = self._get_gca_resource( + resource_name=deployment_resource_pool_name + ) + + @classmethod + def create( + cls, + deployment_resource_pool_id: str, + project: Optional[str] = None, + location: Optional[str] = None, + metadata: Sequence[Tuple[str, str]] = (), + credentials: Optional[auth_credentials.Credentials] = None, + machine_type: Optional[str] = None, + min_replica_count: int = 1, + max_replica_count: int = 1, + accelerator_type: Optional[str] = None, + accelerator_count: Optional[int] = None, + autoscaling_target_cpu_utilization: Optional[int] = None, + autoscaling_target_accelerator_duty_cycle: Optional[int] = None, + sync=True, + create_request_timeout: Optional[float] = None, + ) -> "DeploymentResourcePool": + """Creates a new DeploymentResourcePool. + + Args: + deployment_resource_pool_id (str): + Required. User-specified name for the new deployment resource + pool. + project (str): + Optional. Project containing the deployment resource pool to + retrieve. If not set, the project given to `aiplatform.init` + will be used. + location (str): + Optional. Location containing the deployment resource pool to + retrieve. If not set, the location given to `aiplatform.init` + will be used. + metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as + metadata. + credentials: Optional[auth_credentials.Credentials]=None, + Optional. Custom credentials to use to retrieve the deployment + resource pool. If not set, the credentials given to + `aiplatform.init` will be used. + machine_type (str): + Optional. Machine type to use for the deployment resource pool. + If not set, the default machine type of `n1-standard-2` is + used. + min_replica_count (int): + Optional. The minimum replica count of the new deployment + resource pool. Each replica serves a copy of each model deployed + on the deployment resource pool. If this value is less than + `max_replica_count`, then autoscaling is enabled, and the actual + number of replicas will be adjusted to bring resource usage in + line with the autoscaling targets. + max_replica_count (int): + Optional. The maximum replica count of the new deployment + resource pool. + accelerator_type (str): + Optional. Hardware accelerator type. Must also set accelerator_ + count if used. One of NVIDIA_TESLA_K80, NVIDIA_TESLA_P100, + NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, or + NVIDIA_TESLA_A100. + accelerator_count (int): + Optional. The number of accelerators attached to each replica. + autoscaling_target_cpu_utilization (int): + Optional. Target CPU utilization value for autoscaling. A + default value of 60 will be used if not specified. + autoscaling_target_accelerator_duty_cycle (int): + Optional. Target accelerator duty cycle percentage to use for + autoscaling. Must also set accelerator_type and accelerator + count if specified. A default value of 60 will be used if + accelerators are requested and this is not specified. + sync (bool): + Optional. Whether to execute this method synchronously. If + False, this method will be executed in a concurrent Future and + any downstream object will be immediately returned and synced + when the Future has completed. + create_request_timeout (float): + Optional. The create request timeout in seconds. + + Returns: + DeploymentResourcePool + """ + + api_client = cls._instantiate_client(location=location, credentials=credentials) + + project = project or initializer.global_config.project + location = location or initializer.global_config.location + + return cls._create( + api_client=api_client, + deployment_resource_pool_id=deployment_resource_pool_id, + project=project, + location=location, + metadata=metadata, + credentials=credentials, + machine_type=machine_type, + min_replica_count=min_replica_count, + max_replica_count=max_replica_count, + accelerator_type=accelerator_type, + accelerator_count=accelerator_count, + autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization, + autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle, + sync=sync, + create_request_timeout=create_request_timeout, + ) + + @classmethod + @base.optional_sync() + def _create( + cls, + api_client: deployment_resource_pool_service_client.DeploymentResourcePoolServiceClient, + deployment_resource_pool_id: str, + project: Optional[str] = None, + location: Optional[str] = None, + metadata: Sequence[Tuple[str, str]] = (), + credentials: Optional[auth_credentials.Credentials] = None, + machine_type: Optional[str] = None, + min_replica_count: int = 1, + max_replica_count: int = 1, + accelerator_type: Optional[str] = None, + accelerator_count: Optional[int] = None, + autoscaling_target_cpu_utilization: Optional[int] = None, + autoscaling_target_accelerator_duty_cycle: Optional[int] = None, + sync=True, + create_request_timeout: Optional[float] = None, + ) -> "DeploymentResourcePool": + """Creates a new DeploymentResourcePool. + + Args: + api_client (DeploymentResourcePoolServiceClient): + Required. DeploymentResourcePoolServiceClient used to make the + underlying CreateDeploymentResourcePool API call. + deployment_resource_pool_id (str): + Required. User-specified name for the new deployment resource + pool. + project (str): + Optional. Project containing the deployment resource pool to + retrieve. If not set, the project given to `aiplatform.init` + will be used. + location (str): + Optional. Location containing the deployment resource pool to + retrieve. If not set, the location given to `aiplatform.init` + will be used. + metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as + metadata. + credentials: Optional[auth_credentials.Credentials]=None, + Optional. Custom credentials to use to retrieve the deployment + resource pool. If not set, the credentials given to + `aiplatform.init` will be used. + machine_type (str): + Optional. Machine type to use for the deployment resource pool. + If not set, the default machine type of `n1-standard-2` is + used. + min_replica_count (int): + Optional. The minimum replica count of the new deployment + resource pool. Each replica serves a copy of each model deployed + on the deployment resource pool. If this value is less than + `max_replica_count`, then autoscaling is enabled, and the actual + number of replicas will be adjusted to bring resource usage in + line with the autoscaling targets. + max_replica_count (int): + Optional. The maximum replica count of the new deployment + resource pool. + accelerator_type (str): + Optional. Hardware accelerator type. Must also set accelerator_ + count if used. One of NVIDIA_TESLA_K80, NVIDIA_TESLA_P100, + NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, or + NVIDIA_TESLA_A100. + accelerator_count (int): + Optional. The number of accelerators attached to each replica. + autoscaling_target_cpu_utilization (int): + Optional. Target CPU utilization value for autoscaling. A + default value of 60 will be used if not specified. + autoscaling_target_accelerator_duty_cycle (int): + Optional. Target accelerator duty cycle percentage to use for + autoscaling. Must also set accelerator_type and accelerator + count if specified. A default value of 60 will be used if + accelerators are requested and this is not specified. + sync (bool): + Optional. Whether to execute this method synchronously. If + False, this method will be executed in a concurrent Future and + any downstream object will be immediately returned and synced + when the Future has completed. + create_request_timeout (float): + Optional. The create request timeout in seconds. + + Returns: + DeploymentResourcePool + """ + + parent = initializer.global_config.common_location_path( + project=project, location=location + ) + + dedicated_resources = gca_machine_resources_compat.DedicatedResources( + min_replica_count=min_replica_count, + max_replica_count=max_replica_count, + ) + + machine_spec = gca_machine_resources_compat.MachineSpec( + machine_type=machine_type + ) + + if autoscaling_target_cpu_utilization: + autoscaling_metric_spec = ( + gca_machine_resources_compat.AutoscalingMetricSpec( + metric_name=( + "aiplatform.googleapis.com/prediction/online/cpu/utilization" + ), + target=autoscaling_target_cpu_utilization, + ) + ) + dedicated_resources.autoscaling_metric_specs.extend( + [autoscaling_metric_spec] + ) + + if accelerator_type and accelerator_count: + utils.validate_accelerator_type(accelerator_type) + machine_spec.accelerator_type = accelerator_type + machine_spec.accelerator_count = accelerator_count + + if autoscaling_target_accelerator_duty_cycle: + autoscaling_metric_spec = gca_machine_resources_compat.AutoscalingMetricSpec( + metric_name="aiplatform.googleapis.com/prediction/online/accelerator/duty_cycle", + target=autoscaling_target_accelerator_duty_cycle, + ) + dedicated_resources.autoscaling_metric_specs.extend( + [autoscaling_metric_spec] + ) + + dedicated_resources.machine_spec = machine_spec + + gapic_drp = gca_deployment_resource_pool_compat.DeploymentResourcePool( + dedicated_resources=dedicated_resources + ) + + operation_future = api_client.create_deployment_resource_pool( + parent=parent, + deployment_resource_pool=gapic_drp, + deployment_resource_pool_id=deployment_resource_pool_id, + metadata=metadata, + timeout=create_request_timeout, + ) + + _LOGGER.log_create_with_lro(cls, operation_future) + + created_drp = operation_future.result() + + _LOGGER.log_create_complete(cls, created_drp, "deployment resource pool") + + return cls._construct_sdk_resource_from_gapic( + gapic_resource=created_drp, + project=project, + location=location, + credentials=credentials, + ) + + def query_deployed_models( + self, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + ) -> List[gca_deployed_model_ref_compat.DeployedModelRef]: + """Lists the deployed models using this resource pool. + + Args: + project (str): + Optional. Project to retrieve list from. If not set, project + set in aiplatform.init will be used. + location (str): + Optional. Location to retrieve list from. If not set, location + set in aiplatform.init will be used. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to retrieve list. Overrides + credentials set in aiplatform.init. + + Returns: + List of DeployedModelRef objects containing the endpoint ID and + deployed model ID of the deployed models using this resource pool. + """ + location = location or initializer.global_config.location + api_client = DeploymentResourcePool._instantiate_client( + location=location, credentials=credentials + ) + response = api_client.query_deployed_models( + deployment_resource_pool=self.resource_name + ) + return list( + itertools.chain(page.deployed_model_refs for page in response.pages) + ) + + @classmethod + def list( + cls, + filter: Optional[str] = None, + order_by: Optional[str] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + ) -> List["models.DeploymentResourcePool"]: + """Lists the deployment resource pools. + + filter (str): + Optional. An expression for filtering the results of the request. + For field names both snake_case and camelCase are supported. + order_by (str): + Optional. A comma-separated list of fields to order by, sorted in + ascending order. Use "desc" after a field name for descending. + Supported fields: `display_name`, `create_time`, `update_time` + project (str): + Optional. Project to retrieve list from. If not set, project + set in aiplatform.init will be used. + location (str): + Optional. Location to retrieve list from. If not set, location + set in aiplatform.init will be used. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to retrieve list. Overrides + credentials set in aiplatform.init. + + Returns: + List of deployment resource pools. + """ + return cls._list( + filter=filter, + order_by=order_by, + project=project, + location=location, + credentials=credentials, + ) + + class Endpoint(base.VertexAiResourceNounWithFutureManager, base.PreviewMixin): client_class = utils.EndpointClientWithOverride _resource_noun = "endpoints" @@ -705,12 +1098,13 @@ def _unallocate_traffic( @staticmethod def _validate_deploy_args( - min_replica_count: int, - max_replica_count: int, + min_replica_count: Optional[int], + max_replica_count: Optional[int], accelerator_type: Optional[str], deployed_model_display_name: Optional[str], traffic_split: Optional[Dict[str, int]], traffic_percentage: Optional[int], + deployment_resource_pool: Optional[DeploymentResourcePool], ): """Helper method to validate deploy arguments. @@ -753,15 +1147,45 @@ def _validate_deploy_args( not be provided. Traffic of previously deployed models at the endpoint will be scaled down to accommodate new deployed model's traffic. Should not be provided if traffic_split is provided. + deployment_resource_pool (DeploymentResourcePool): Optional. + Resource pool where the model will be deployed. All models that + are deployed to the same DeploymentResourcePool will be hosted in + a shared model server. If provided, will override replica count + arguments. Raises: ValueError: if Min or Max replica is negative. Traffic percentage > 100 or < 0. Or if traffic_split does not sum to 100. """ - if min_replica_count < 0: - raise ValueError("Min replica cannot be negative.") - if max_replica_count < 0: - raise ValueError("Max replica cannot be negative.") + if deployment_resource_pool: + # Validate that replica count and deployment resource pool are not + # both specified. + if ( + min_replica_count + and min_replica_count != 1 + or max_replica_count + and max_replica_count != 1 + ): + raise ValueError( + "Ignoring explicitly specified replica counts, " + "since deployment_resource_pool was also given." + ) + if accelerator_type: + raise ValueError( + "Conflicting deployment parameters were given." + "deployment_resource_pool may not be specified at the same" + "time as accelerator_type. " + ) + else: + # Validate that a non-negative replica count is given, and validate + # the accelerator type. + if min_replica_count < 0: + raise ValueError("Min replica cannot be negative.") + if max_replica_count < 0: + raise ValueError("Max replica cannot be negative.") + if accelerator_type: + utils.validate_accelerator_type(accelerator_type) + if deployed_model_display_name is not None: utils.validate_display_name(deployed_model_display_name) @@ -777,10 +1201,6 @@ def _validate_deploy_args( "Sum of all traffic within traffic split needs to be 100." ) - # Raises ValueError if invalid accelerator - if accelerator_type: - utils.validate_accelerator_type(accelerator_type) - def deploy( self, model: "Model", @@ -805,6 +1225,7 @@ def deploy( autoscaling_target_accelerator_duty_cycle: Optional[int] = None, enable_access_logging=False, disable_container_logging: bool = False, + deployment_resource_pool: Optional[DeploymentResourcePool] = None, ) -> None: """Deploys a Model to the Endpoint. @@ -893,6 +1314,11 @@ def deploy( disable_container_logging (bool): If True, container logs from the deployed model will not be written to Cloud Logging. Defaults to False. + deployment_resource_pool (DeploymentResourcePool): + Resource pool where the model will be deployed. All models that + are deployed to the same DeploymentResourcePool will be hosted in + a shared model server. If provided, will override replica count + arguments. """ self._sync_gca_resource_if_skipped() @@ -903,6 +1329,7 @@ def deploy( deployed_model_display_name=deployed_model_display_name, traffic_split=traffic_split, traffic_percentage=traffic_percentage, + deployment_resource_pool=deployment_resource_pool, ) explanation_spec = _explanation_utils.create_and_validate_explanation_spec( @@ -930,6 +1357,7 @@ def deploy( autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle, enable_access_logging=enable_access_logging, disable_container_logging=disable_container_logging, + deployment_resource_pool=deployment_resource_pool, ) @base.optional_sync() @@ -954,6 +1382,7 @@ def _deploy( autoscaling_target_accelerator_duty_cycle: Optional[int] = None, enable_access_logging=False, disable_container_logging: bool = False, + deployment_resource_pool: Optional[DeploymentResourcePool] = None, ) -> None: """Deploys a Model to the Endpoint. @@ -1036,6 +1465,11 @@ def _deploy( disable_container_logging (bool): If True, container logs from the deployed model will not be written to Cloud Logging. Defaults to False. + deployment_resource_pool (DeploymentResourcePool): + Resource pool where the model will be deployed. All models that + are deployed to the same DeploymentResourcePool will be hosted in + a shared model server. If provided, will override replica count + arguments. """ _LOGGER.log_action_start_against_resource( f"Deploying Model {model.resource_name} to", "", self @@ -1064,6 +1498,7 @@ def _deploy( autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle, enable_access_logging=enable_access_logging, disable_container_logging=disable_container_logging, + deployment_resource_pool=deployment_resource_pool, ) _LOGGER.log_action_completed_against_resource("model", "deployed", self) @@ -1095,6 +1530,7 @@ def _deploy_call( autoscaling_target_accelerator_duty_cycle: Optional[int] = None, enable_access_logging=False, disable_container_logging: bool = False, + deployment_resource_pool: Optional[DeploymentResourcePool] = None, ) -> None: """Helper method to deploy model to endpoint. @@ -1184,126 +1620,178 @@ def _deploy_call( disable_container_logging (bool): If True, container logs from the deployed model will not be written to Cloud Logging. Defaults to False. + deployment_resource_pool (DeploymentResourcePool): + Resource pool where the model will be deployed. All models that + are deployed to the same DeploymentResourcePool will be hosted in + a shared model server. If provided, will override replica count + arguments. Raises: ValueError: If only `accelerator_type` or `accelerator_count` is specified. ValueError: If model does not support deployment. ValueError: If there is not current traffic split and traffic percentage is not 0 or 100. + ValueError: If `deployment_resource_pool` and a custom machine spec + are both present. + ValueError: If both `explanation_spec` and `deployment_resource_pool` + are present. """ - service_account = service_account or initializer.global_config.service_account - max_replica_count = max(min_replica_count, max_replica_count) + if deployment_resource_pool: + deployed_model = gca_endpoint_compat.DeployedModel( + model=model.versioned_resource_name, + display_name=deployed_model_display_name, + service_account=service_account, + disable_container_logging=disable_container_logging, + ) - if bool(accelerator_type) != bool(accelerator_count): - raise ValueError( - "Both `accelerator_type` and `accelerator_count` should be specified or None." + supports_shared_resources = ( + gca_model_compat.Model.DeploymentResourcesType.SHARED_RESOURCES + in model.supported_deployment_resources_types ) - if autoscaling_target_accelerator_duty_cycle is not None and ( - not accelerator_type or not accelerator_count - ): - raise ValueError( - "Both `accelerator_type` and `accelerator_count` should be set " - "when specifying autoscaling_target_accelerator_duty_cycle`" + if not supports_shared_resources: + raise ValueError( + "`deployment_resource_pool` may only be specified for models " + " which support shared resources." + ) + + provided_custom_machine_spec = ( + machine_type + or accelerator_type + or accelerator_count + or autoscaling_target_accelerator_duty_cycle + or autoscaling_target_cpu_utilization ) - deployed_model = gca_endpoint_compat.DeployedModel( - model=model.versioned_resource_name, - display_name=deployed_model_display_name, - service_account=service_account, - enable_access_logging=enable_access_logging, - disable_container_logging=disable_container_logging, - ) + if provided_custom_machine_spec: + raise ValueError( + "Conflicting parameters in deployment request. " + "The machine_type, accelerator_type and accelerator_count," + "autoscaling_target_accelerator_duty_cycle," + "autoscaling_target_cpu_utilization parameters may not be set " + "when `deployment_resource_pool` is specified." + ) - supports_automatic_resources = ( - gca_model_compat.Model.DeploymentResourcesType.AUTOMATIC_RESOURCES - in model.supported_deployment_resources_types - ) - supports_dedicated_resources = ( - gca_model_compat.Model.DeploymentResourcesType.DEDICATED_RESOURCES - in model.supported_deployment_resources_types - ) - provided_custom_machine_spec = ( - machine_type - or accelerator_type - or accelerator_count - or autoscaling_target_accelerator_duty_cycle - or autoscaling_target_cpu_utilization - ) + deployed_model.shared_resources = deployment_resource_pool.resource_name - # If the model supports both automatic and dedicated deployment resources, - # decide based on the presence of machine spec customizations - use_dedicated_resources = supports_dedicated_resources and ( - not supports_automatic_resources or provided_custom_machine_spec - ) + if explanation_spec: + raise ValueError( + "Model explanation is not supported for deployments using " + "shared resources." + ) + else: + max_replica_count = max(min_replica_count, max_replica_count) - if provided_custom_machine_spec and not use_dedicated_resources: - _LOGGER.info( - "Model does not support dedicated deployment resources. " - "The machine_type, accelerator_type and accelerator_count," - "autoscaling_target_accelerator_duty_cycle," - "autoscaling_target_cpu_utilization parameters are ignored." - ) + if bool(accelerator_type) != bool(accelerator_count): + raise ValueError( + "Both `accelerator_type` and `accelerator_count` should be specified or None." + ) - if use_dedicated_resources and not machine_type: - machine_type = _DEFAULT_MACHINE_TYPE - _LOGGER.info(f"Using default machine_type: {machine_type}") + if autoscaling_target_accelerator_duty_cycle is not None and ( + not accelerator_type or not accelerator_count + ): + raise ValueError( + "Both `accelerator_type` and `accelerator_count` should be set " + "when specifying autoscaling_target_accelerator_duty_cycle`" + ) - if use_dedicated_resources: - dedicated_resources = gca_machine_resources_compat.DedicatedResources( - min_replica_count=min_replica_count, - max_replica_count=max_replica_count, + deployed_model = gca_endpoint_compat.DeployedModel( + model=model.versioned_resource_name, + display_name=deployed_model_display_name, + service_account=service_account, + enable_access_logging=enable_access_logging, + disable_container_logging=disable_container_logging, ) - machine_spec = gca_machine_resources_compat.MachineSpec( - machine_type=machine_type + supports_automatic_resources = ( + gca_model_compat.Model.DeploymentResourcesType.AUTOMATIC_RESOURCES + in model.supported_deployment_resources_types + ) + supports_dedicated_resources = ( + gca_model_compat.Model.DeploymentResourcesType.DEDICATED_RESOURCES + in model.supported_deployment_resources_types + ) + provided_custom_machine_spec = ( + machine_type + or accelerator_type + or accelerator_count + or autoscaling_target_accelerator_duty_cycle + or autoscaling_target_cpu_utilization ) - if autoscaling_target_cpu_utilization: - autoscaling_metric_spec = gca_machine_resources_compat.AutoscalingMetricSpec( - metric_name="aiplatform.googleapis.com/prediction/online/cpu/utilization", - target=autoscaling_target_cpu_utilization, + # If the model supports both automatic and dedicated deployment resources, + # decide based on the presence of machine spec customizations + use_dedicated_resources = supports_dedicated_resources and ( + not supports_automatic_resources or provided_custom_machine_spec + ) + + if provided_custom_machine_spec and not use_dedicated_resources: + _LOGGER.info( + "Model does not support dedicated deployment resources. " + "The machine_type, accelerator_type and accelerator_count," + "autoscaling_target_accelerator_duty_cycle," + "autoscaling_target_cpu_utilization parameters are ignored." ) - dedicated_resources.autoscaling_metric_specs.extend( - [autoscaling_metric_spec] + + if use_dedicated_resources and not machine_type: + machine_type = _DEFAULT_MACHINE_TYPE + _LOGGER.info(f"Using default machine_type: {machine_type}") + + if use_dedicated_resources: + dedicated_resources = gca_machine_resources_compat.DedicatedResources( + min_replica_count=min_replica_count, + max_replica_count=max_replica_count, ) - if accelerator_type and accelerator_count: - utils.validate_accelerator_type(accelerator_type) - machine_spec.accelerator_type = accelerator_type - machine_spec.accelerator_count = accelerator_count + machine_spec = gca_machine_resources_compat.MachineSpec( + machine_type=machine_type + ) - if autoscaling_target_accelerator_duty_cycle: + if autoscaling_target_cpu_utilization: autoscaling_metric_spec = gca_machine_resources_compat.AutoscalingMetricSpec( - metric_name="aiplatform.googleapis.com/prediction/online/accelerator/duty_cycle", - target=autoscaling_target_accelerator_duty_cycle, + metric_name="aiplatform.googleapis.com/prediction/online/cpu/utilization", + target=autoscaling_target_cpu_utilization, ) dedicated_resources.autoscaling_metric_specs.extend( [autoscaling_metric_spec] ) - if tpu_topology is not None: - machine_spec.tpu_topology = tpu_topology - - dedicated_resources.machine_spec = machine_spec - deployed_model.dedicated_resources = dedicated_resources - - elif supports_automatic_resources: - deployed_model.automatic_resources = ( - gca_machine_resources_compat.AutomaticResources( - min_replica_count=min_replica_count, - max_replica_count=max_replica_count, + if accelerator_type and accelerator_count: + utils.validate_accelerator_type(accelerator_type) + machine_spec.accelerator_type = accelerator_type + machine_spec.accelerator_count = accelerator_count + + if autoscaling_target_accelerator_duty_cycle: + autoscaling_metric_spec = gca_machine_resources_compat.AutoscalingMetricSpec( + metric_name="aiplatform.googleapis.com/prediction/online/accelerator/duty_cycle", + target=autoscaling_target_accelerator_duty_cycle, + ) + dedicated_resources.autoscaling_metric_specs.extend( + [autoscaling_metric_spec] + ) + + if tpu_topology is not None: + machine_spec.tpu_topology = tpu_topology + + dedicated_resources.machine_spec = machine_spec + deployed_model.dedicated_resources = dedicated_resources + + elif supports_automatic_resources: + deployed_model.automatic_resources = ( + gca_machine_resources_compat.AutomaticResources( + min_replica_count=min_replica_count, + max_replica_count=max_replica_count, + ) + ) + else: + _LOGGER.warning( + "Model does not support deployment. " + "See https://cloud.google.com/vertex-ai/docs/reference/rpc/google.cloud.aiplatform.v1#google.cloud.aiplatform.v1.Model.FIELDS.repeated.google.cloud.aiplatform.v1.Model.DeploymentResourcesType.google.cloud.aiplatform.v1.Model.supported_deployment_resources_types" ) - ) - else: - _LOGGER.warning( - "Model does not support deployment. " - "See https://cloud.google.com/vertex-ai/docs/reference/rpc/google.cloud.aiplatform.v1#google.cloud.aiplatform.v1.Model.FIELDS.repeated.google.cloud.aiplatform.v1.Model.DeploymentResourcesType.google.cloud.aiplatform.v1.Model.supported_deployment_resources_types" - ) - deployed_model.explanation_spec = explanation_spec + deployed_model.explanation_spec = explanation_spec # Checking if traffic percentage is valid # TODO(b/221059294) PrivateEndpoint should support traffic split @@ -3165,6 +3653,7 @@ def deploy( deployed_model_display_name=deployed_model_display_name, traffic_split=traffic_split, traffic_percentage=traffic_percentage, + deployment_resource_pool=None, ) explanation_spec = _explanation_utils.create_and_validate_explanation_spec( @@ -4229,6 +4718,7 @@ def deploy( private_service_connect_config: Optional[ PrivateEndpoint.PrivateServiceConnectConfig ] = None, + deployment_resource_pool: Optional[DeploymentResourcePool] = None, ) -> Union[Endpoint, PrivateEndpoint]: """Deploys model to endpoint. Endpoint will be created if unspecified. @@ -4339,6 +4829,11 @@ def deploy( private_service_connect_config (PrivateEndpoint.PrivateServiceConnectConfig): If true, the endpoint can be accessible via [Private Service Connect](https://cloud.google.com/vpc/docs/private-service-connect). Cannot be set together with network. + deployment_resource_pool (DeploymentResourcePool): + Resource pool where the model will be deployed. All models that + are deployed to the same DeploymentResourcePool will be hosted in + a shared model server. If provided, will override replica count + arguments. Returns: endpoint (Union[Endpoint, PrivateEndpoint]): @@ -4356,10 +4851,17 @@ def deploy( deployed_model_display_name=deployed_model_display_name, traffic_split=traffic_split, traffic_percentage=traffic_percentage, + deployment_resource_pool=deployment_resource_pool, ) if isinstance(endpoint, PrivateEndpoint): - if endpoint.network and traffic_split: + if deployment_resource_pool: + raise ValueError( + "Model co-hosting is not supported for PrivateEndpoint. " + "Try calling deploy() without providing `deployment_resource_pool`." + ) + + if traffic_split and endpoint.network: raise ValueError( "Traffic splitting is not yet supported for PSA based PrivateEndpoint. " "Try calling deploy() without providing `traffic_split`. " @@ -4395,6 +4897,7 @@ def deploy( enable_access_logging=enable_access_logging, disable_container_logging=disable_container_logging, private_service_connect_config=private_service_connect_config, + deployment_resource_pool=deployment_resource_pool, ) @base.optional_sync(return_input_arg="endpoint", bind_future_to_self=False) @@ -4424,6 +4927,7 @@ def _deploy( private_service_connect_config: Optional[ PrivateEndpoint.PrivateServiceConnectConfig ] = None, + deployment_resource_pool: Optional[DeploymentResourcePool] = None, ) -> Union[Endpoint, PrivateEndpoint]: """Deploys model to endpoint. Endpoint will be created if unspecified. @@ -4527,6 +5031,11 @@ def _deploy( private_service_connect_config (PrivateEndpoint.PrivateServiceConnectConfig): If true, the endpoint can be accessible via [Private Service Connect](https://cloud.google.com/vpc/docs/private-service-connect). Cannot be set together with network. + deployment_resource_pool (DeploymentResourcePool): + Optional. Resource pool where the model will be deployed. All models that + are deployed to the same DeploymentResourcePool will be hosted in + a shared model server. If provided, will override replica count + arguments. Returns: endpoint (Union[Endpoint, PrivateEndpoint]): @@ -4580,6 +5089,7 @@ def _deploy( autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle, enable_access_logging=enable_access_logging, disable_container_logging=disable_container_logging, + deployment_resource_pool=deployment_resource_pool, ) _LOGGER.log_action_completed_against_resource("model", "deployed", endpoint) diff --git a/google/cloud/aiplatform/preview/models.py b/google/cloud/aiplatform/preview/models.py index a307c79047..6d176812cb 100644 --- a/google/cloud/aiplatform/preview/models.py +++ b/google/cloud/aiplatform/preview/models.py @@ -23,6 +23,7 @@ from google.cloud import aiplatform from google.cloud.aiplatform import base +from google.cloud.aiplatform import compat from google.cloud.aiplatform import initializer from google.cloud.aiplatform import models from google.cloud.aiplatform import utils @@ -115,7 +116,7 @@ def __init__( project=project, location=location, ) - + self.api_client = self.api_client.select_version(compat.V1BETA1) self._gca_resource = self._get_gca_resource( resource_name=deployment_resource_pool_name ) @@ -979,7 +980,6 @@ def _deploy_call( enable_container_logging=not disable_container_logging, ) - _LOGGER.info(model.supported_deployment_resources_types) supports_shared_resources = ( gca_model_compat.Model.DeploymentResourcesType.SHARED_RESOURCES in model.supported_deployment_resources_types diff --git a/google/cloud/aiplatform/utils/__init__.py b/google/cloud/aiplatform/utils/__init__.py index 371602efa5..ea269dea71 100644 --- a/google/cloud/aiplatform/utils/__init__.py +++ b/google/cloud/aiplatform/utils/__init__.py @@ -70,6 +70,7 @@ ) from google.cloud.aiplatform.compat.services import ( dataset_service_client_v1, + deployment_resource_pool_service_client_v1, endpoint_service_client_v1, feature_online_store_admin_service_client_v1, feature_online_store_service_client_v1, @@ -549,12 +550,16 @@ class DatasetClientWithOverride(ClientWithOverride): class DeploymentResourcePoolClientWithOverride(ClientWithOverride): _is_temporary = True - _default_version = compat.V1BETA1 + _default_version = compat.DEFAULT_VERSION _version_map = ( ( compat.V1BETA1, deployment_resource_pool_service_client_v1beta1.DeploymentResourcePoolServiceClient, ), + ( + compat.V1, + deployment_resource_pool_service_client_v1.DeploymentResourcePoolServiceClient, + ), ) diff --git a/tests/unit/aiplatform/test_deployment_resource_pools.py b/tests/unit/aiplatform/test_deployment_resource_pools.py index 24347562c2..4c07e788d9 100644 --- a/tests/unit/aiplatform/test_deployment_resource_pools.py +++ b/tests/unit/aiplatform/test_deployment_resource_pools.py @@ -26,18 +26,18 @@ from google.cloud import aiplatform from google.cloud.aiplatform import base from google.cloud.aiplatform import initializer -from google.cloud.aiplatform.preview import models +from google.cloud.aiplatform import models from google.cloud.aiplatform.compat.services import ( - deployment_resource_pool_service_client_v1beta1 as deployment_resource_pool_service_client, + deployment_resource_pool_service_client, ) from google.cloud.aiplatform.compat.types import ( - deployed_model_ref_v1beta1 as gca_deployed_model_ref, - deployment_resource_pool_v1beta1 as gca_deployment_resource_pool, - deployment_resource_pool_service_v1beta1 as gca_deployment_resource_pool_service, + deployed_model_ref as gca_deployed_model_ref, + deployment_resource_pool as gca_deployment_resource_pool, + deployment_resource_pool_service as gca_deployment_resource_pool_service, endpoint as gca_endpoint, - machine_resources_v1beta1 as gca_machine_resources, + machine_resources as gca_machine_resources, ) diff --git a/tests/unit/aiplatform/test_endpoints.py b/tests/unit/aiplatform/test_endpoints.py index 4185594440..8232b63345 100644 --- a/tests/unit/aiplatform/test_endpoints.py +++ b/tests/unit/aiplatform/test_endpoints.py @@ -30,6 +30,7 @@ from google.cloud.aiplatform import models from google.cloud.aiplatform import utils from google.cloud.aiplatform.compat.services import ( + deployment_resource_pool_service_client_v1, deployment_resource_pool_service_client_v1beta1, endpoint_service_client, endpoint_service_client_v1beta1, @@ -40,6 +41,7 @@ prediction_service_client_v1beta1, ) from google.cloud.aiplatform.compat.types import ( + deployment_resource_pool_v1 as gca_deployment_resource_pool_v1, deployment_resource_pool_v1beta1 as gca_deployment_resource_pool_v1beta1, encryption_spec as gca_encryption_spec, endpoint_service_v1beta1 as gca_endpoint_service_v1beta1, @@ -670,7 +672,7 @@ def predict_async_client_explain_mock(): @pytest.fixture -def get_drp_mock(): +def preview_get_drp_mock(): with mock.patch.object( deployment_resource_pool_service_client_v1beta1.DeploymentResourcePoolServiceClient, "get_deployment_resource_pool", @@ -706,6 +708,43 @@ def get_drp_mock(): yield get_drp_mock +@pytest.fixture +def get_drp_mock(): + with mock.patch.object( + deployment_resource_pool_service_client_v1.DeploymentResourcePoolServiceClient, + "get_deployment_resource_pool", + ) as get_drp_mock: + machine_spec = gca_machine_resources.MachineSpec( + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + ) + + autoscaling_metric_specs = [ + gca_machine_resources.AutoscalingMetricSpec( + metric_name=_TEST_METRIC_NAME_CPU_UTILIZATION, target=70 + ), + gca_machine_resources.AutoscalingMetricSpec( + metric_name=_TEST_METRIC_NAME_GPU_UTILIZATION, target=70 + ), + ] + + dedicated_resources = gca_machine_resources.DedicatedResources( + machine_spec=machine_spec, + min_replica_count=10, + max_replica_count=20, + autoscaling_metric_specs=autoscaling_metric_specs, + ) + + get_drp_mock.return_value = ( + gca_deployment_resource_pool_v1.DeploymentResourcePool( + name=_TEST_DRP_NAME, + dedicated_resources=dedicated_resources, + ) + ) + yield get_drp_mock + + """ ---------------------------------------------------------------------------- Private Endpoint Fixtures @@ -1860,7 +1899,9 @@ def test_deploy_disable_container_logging(self, deploy_model_mock, sync): timeout=None, ) - @pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock", "get_drp_mock") + @pytest.mark.usefixtures( + "get_endpoint_mock", "get_model_mock", "preview_get_drp_mock" + ) @pytest.mark.parametrize("sync", [True, False]) def test_preview_deploy_with_deployment_resource_pool( self, preview_deploy_model_mock, sync @@ -1895,6 +1936,38 @@ def test_preview_deploy_with_deployment_resource_pool( timeout=None, ) + @pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock", "get_drp_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_deploy_with_deployment_resource_pool(self, deploy_model_mock, sync): + test_endpoint = models.Endpoint(_TEST_ENDPOINT_NAME) + test_model = models.Model(_TEST_ID) + test_model._gca_resource.supported_deployment_resources_types.append( + aiplatform.gapic.Model.DeploymentResourcesType.SHARED_RESOURCES, + ) + test_drp = models.DeploymentResourcePool(_TEST_DRP_NAME) + + test_endpoint.deploy( + model=test_model, + deployment_resource_pool=test_drp, + sync=sync, + deploy_request_timeout=None, + ) + if not sync: + test_endpoint.wait() + + deployed_model = gca_endpoint.DeployedModel( + shared_resources=_TEST_DRP_NAME, + model=test_model.resource_name, + display_name=None, + ) + deploy_model_mock.assert_called_once_with( + endpoint=test_endpoint.resource_name, + deployed_model=deployed_model, + traffic_split={"0": 100}, + metadata=(), + timeout=None, + ) + @pytest.mark.parametrize( "model1, model2, model3, percent", [ diff --git a/tests/unit/aiplatform/test_models.py b/tests/unit/aiplatform/test_models.py index 987a7234e1..7e3fe8944b 100644 --- a/tests/unit/aiplatform/test_models.py +++ b/tests/unit/aiplatform/test_models.py @@ -43,6 +43,7 @@ from google.cloud.aiplatform.preview import models as preview_models from google.cloud.aiplatform.compat.services import ( + deployment_resource_pool_service_client, deployment_resource_pool_service_client_v1beta1, endpoint_service_client, endpoint_service_client_v1beta1, @@ -53,6 +54,7 @@ from google.cloud.aiplatform.compat.types import ( batch_prediction_job as gca_batch_prediction_job, + deployment_resource_pool as gca_deployment_resource_pool, deployment_resource_pool_v1beta1 as gca_deployment_resource_pool_v1beta1, encryption_spec as gca_encryption_spec, endpoint as gca_endpoint, @@ -1116,7 +1118,7 @@ def mock_request_urlopen(job_spec_json): @pytest.fixture -def get_drp_mock(): +def preview_get_drp_mock(): with mock.patch.object( deployment_resource_pool_service_client_v1beta1.DeploymentResourcePoolServiceClient, "get_deployment_resource_pool", @@ -1152,6 +1154,41 @@ def get_drp_mock(): yield get_drp_mock +@pytest.fixture +def get_drp_mock(): + with mock.patch.object( + deployment_resource_pool_service_client.DeploymentResourcePoolServiceClient, + "get_deployment_resource_pool", + ) as get_drp_mock: + machine_spec = gca_machine_resources.MachineSpec( + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + ) + + autoscaling_metric_specs = [ + gca_machine_resources.AutoscalingMetricSpec( + metric_name=_TEST_METRIC_NAME_CPU_UTILIZATION, target=70 + ), + gca_machine_resources.AutoscalingMetricSpec( + metric_name=_TEST_METRIC_NAME_GPU_UTILIZATION, target=70 + ), + ] + + dedicated_resources = gca_machine_resources.DedicatedResources( + machine_spec=machine_spec, + min_replica_count=10, + max_replica_count=20, + autoscaling_metric_specs=autoscaling_metric_specs, + ) + + get_drp_mock.return_value = gca_deployment_resource_pool.DeploymentResourcePool( + name=_TEST_DRP_NAME, + dedicated_resources=dedicated_resources, + ) + yield get_drp_mock + + @pytest.fixture def preview_deploy_model_mock(): with mock.patch.object( @@ -2264,7 +2301,10 @@ def test_deploy_disable_container_logging(self, deploy_model_mock, sync): ) @pytest.mark.usefixtures( - "get_model_mock", "get_drp_mock", "create_endpoint_mock", "get_endpoint_mock" + "get_model_mock", + "preview_get_drp_mock", + "create_endpoint_mock", + "get_endpoint_mock", ) @pytest.mark.parametrize("sync", [True, False]) def test_preview_deploy_with_deployment_resource_pool( @@ -2298,6 +2338,38 @@ def test_preview_deploy_with_deployment_resource_pool( timeout=None, ) + @pytest.mark.usefixtures( + "get_model_mock", "get_drp_mock", "create_endpoint_mock", "get_endpoint_mock" + ) + @pytest.mark.parametrize("sync", [True, False]) + def test_deploy_with_deployment_resource_pool(self, deploy_model_mock, sync): + test_model = models.Model(_TEST_ID) + test_model._gca_resource.supported_deployment_resources_types.append( + aiplatform.gapic.Model.DeploymentResourcesType.SHARED_RESOURCES, + ) + test_drp = models.DeploymentResourcePool(_TEST_DRP_NAME) + + test_endpoint = test_model.deploy( + deployment_resource_pool=test_drp, + sync=sync, + deploy_request_timeout=None, + ) + if not sync: + test_endpoint.wait() + + deployed_model = gca_endpoint.DeployedModel( + shared_resources=_TEST_DRP_NAME, + model=test_model.resource_name, + display_name=None, + ) + deploy_model_mock.assert_called_once_with( + endpoint=test_endpoint.resource_name, + deployed_model=deployed_model, + traffic_split={"0": 100}, + metadata=(), + timeout=None, + ) + @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.usefixtures("get_model_mock", "get_batch_prediction_job_mock") def test_init_aiplatform_with_encryption_key_name_and_batch_predict_gcs_source_and_dest(