Skip to content

Adding ModelEvaluationJob and Pipeline Based Service #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion google/cloud/aiplatform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,19 @@
from google.cloud.aiplatform.metadata import metadata
from google.cloud.aiplatform.models import Endpoint
from google.cloud.aiplatform.models import Model
from google.cloud.aiplatform.model_evaluation import ModelEvaluation
from google.cloud.aiplatform.model_evaluation import (
ModelEvaluation,
ModelEvaluationJob,
)
from google.cloud.aiplatform.jobs import (
BatchPredictionJob,
CustomJob,
HyperparameterTuningJob,
)
from google.cloud.aiplatform.pipeline_jobs import PipelineJob
from google.cloud.aiplatform._pipeline_based_service import (
_VertexAiPipelineBasedService,
)
from google.cloud.aiplatform.tensorboard import (
Tensorboard,
TensorboardExperiment,
Expand Down Expand Up @@ -115,12 +121,14 @@
"HyperparameterTuningJob",
"Model",
"ModelEvaluation",
"ModelEvaluationJob",
"PipelineJob",
"TabularDataset",
"Tensorboard",
"TensorboardExperiment",
"TensorboardRun",
"TextDataset",
"TimeSeriesDataset",
"_VertexAiPipelineBasedService",
"VideoDataset",
)
22 changes: 22 additions & 0 deletions google/cloud/aiplatform/_pipeline_based_service/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-

# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from google.cloud.aiplatform._pipeline_based_service.pipeline_based_service import (
_VertexAiPipelineBasedService,
)

__all__ = "_VertexAiPipelineBasedService"
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
# -*- coding: utf-8 -*-

# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import abc
from google.auth import credentials as auth_credentials

from google.cloud.aiplatform import base
from google.cloud.aiplatform import utils
from google.cloud.aiplatform import pipeline_jobs
from google.cloud.aiplatform.utils import yaml_utils

from google.cloud.aiplatform.compat.types import (
pipeline_job_v1 as gca_pipeline_job_v1,
)

from typing import (
Any,
Dict,
Optional,
List,
)

_LOGGER = base.Logger(__name__)


class _VertexAiPipelineBasedService(base.VertexAiStatefulResource):
"""Base class for Vertex AI Pipeline based services."""

client_class = utils.PipelineJobClientWithOverride
_resource_noun = "pipelineJob"
_delete_method = "delete_pipeline_job"
_getter_method = "get_pipeline_job"
_list_method = "list_pipeline_jobs"
Copy link
Owner Author

@sararob sararob Apr 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should maybe override this to do a client-side filter

_parse_resource_name_method = "parse_pipeline_job_path"
_format_resource_name_method = "pipeline_job_path"

_valid_done_states = pipeline_jobs._PIPELINE_COMPLETE_STATES

@property
@classmethod
@abc.abstractmethod
def _template_ref(self) -> str:
"""The pipeline template URL for this service."""
pass

@property
@abc.abstractmethod
def _metadata_output_artifact(self) -> Optional[str]:
"""The ML Metadata output artifact resource URI from the completed pipeline run."""
pass

@property
def backing_pipeline_job(self) -> pipeline_jobs.PipelineJob:
"""The PipelineJob associated with the resource."""
return pipeline_jobs.PipelineJob.get(resource_name=self.resource_name)

@property
def pipeline_console_uri(self) -> str:
"""The console URI of the PipelineJob created by the service."""
if self.backing_pipeline_job:
return self.backing_pipeline_job._dashboard_uri()

@property
def state(self) -> Optional[str]:
"""The state of the Pipeline run associated with the service."""
if self.backing_pipeline_job:
return self.backing_pipeline_job.state
return None

def _validate_pipeline_template_matches_service(
self, pipeline_job: pipeline_jobs.PipelineJob
):
"""Utility function to validate that the passed in pipeline ID matches
the template of the Pipeline Based Service.
Raises:
ValueError: if the provided pipeline ID doesn't match the pipeline service.
"""
# TODO: should this validate the whole pipelineSpec or just the components level?
service_pipeline_json = yaml_utils.load_yaml(self._template_ref)[
"pipelineSpec"
]["components"]
current_pipeline_json = pipeline_job.to_dict()["pipelineSpec"]["components"]

if current_pipeline_json != service_pipeline_json:
raise ValueError(
f"The provided pipeline template is not compatible with {self.__class__.__name__}"
)

def __init__(
self,
pipeline_job_id: str,
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
):
"""Retrieves an existing Pipeline Based Service given the ID of the pipeline execution.
Example Usage:
pipeline_service = aiplatform._pipeline_based_service._VertexAiPipelineBasedService(
pipeline_job_id = "projects/123/locations/us-central1/pipelinesJobs/456"
)
pipeline_service = aiplatform.VertexAiPipelinebasedService(
pipeline_job_id = "456"
)
Args:
pipeline_job_id(str):
Required. A fully-qualified pipeline job run ID.
Example: "projects/123/locations/us-central1/pipelineJobs/456" or
"456" when project and location are initialized or passed.
project (str):
Optional. Project to retrieve pipeline job from. If not set, project
set in aiplatform.init will be used.
location (str):
Optional. Location to retrieve pipeline job from. If not set, location
set in aiplatform.init will be used.
credentials (auth_credentials.Credentials):
Optional. Custom credentials to use to retrieve this pipeline job. Overrides
credentials set in aiplatform.init.
Raises:
ValueError: if the pipeline template used in this PipelineJob is not consistent with the _template_ref defined on the subclass.
"""

super().__init__(
project=project,
location=location,
credentials=credentials,
resource_name=pipeline_job_id,
)

job_resource = pipeline_jobs.PipelineJob.get(resource_name=pipeline_job_id)

self._validate_pipeline_template_matches_service(job_resource)

self._gca_resource = gca_pipeline_job_v1.PipelineJob(name=pipeline_job_id)

@classmethod
def _create_and_submit_pipeline_job(
cls,
template_params: Dict[str, Any],
pipeline_root: str,
display_name: Optional[str] = None,
job_id: Optional[str] = None,
service_account: Optional[str] = None,
network: Optional[str] = None,
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
) -> "_VertexAiPipelineBasedService":
"""Create a new PipelineJob using the provided template and parameters.
Args:
template_params (Dict[str, Any]):
Required. The parameters to pass to the given pipeline template.
pipeline_root (str)
Required. The GCS directory to store the pipeline run output.
display_name (str)
Optional. The user-defined name of the PipelineJob created by this Pipeline Based Service.
job_id (str):
Optional. The unique ID of the job run.
If not specified, pipeline name + timestamp will be used.
service_account (str):
Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
network (str):
The full name of the Compute Engine network to which the job
should be peered. For example, projects/12345/global/networks/myVPC.
Private services access must already be configured for the network.
If left unspecified, the job is not peered with any network.
project (str):
Optional. The project to run this PipelineJob in. If not set,
the project set in aiplatform.init will be used.
location (str):
Optional. Location to create PipelineJob. If not set,
location set in aiplatform.init will be used.
credentials (auth_credentials.Credentials):
Optional. Custom credentials to use to create the PipelineJob.
Overrides credentials set in aiplatform.init.
Returns:
(VertexAiPipelineBasedService):
Instantiated representation of a Vertex AI Pipeline based service.
"""

if not display_name:
display_name = cls._generate_display_name()

self = cls._empty_constructor(
project=project,
location=location,
credentials=credentials,
)

service_pipeline_job = pipeline_jobs.PipelineJob(
display_name=display_name,
template_path=self._template_ref,
job_id=job_id,
pipeline_root=pipeline_root,
parameter_values=template_params,
project=project,
location=location,
credentials=credentials,
)

service_pipeline_job.submit(
service_account=service_account,
network=network,
)

self._gca_resource = self._get_gca_resource(service_pipeline_job.resource_name)

return self

@classmethod
def list(
cls,
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[str] = None,
) -> List["pipeline_jobs.PipelineJob"]:
"""Lists all PipelineJob resources associated with this Pipeline Based service.
Args:
project (str):
Optional. The project to retrieve the Pipeline Based Services from. If not set,
the project set in aiplatform.init will be used.
location (str):
Optional. Location to retrieve the Pipeline Based Services from. If not set,
location set in aiplatform.init will be used.
credentials (auth_credentials.Credentials):
Optional. Custom credentials to use to retrieve the Pipeline Based Services from.
Overrides credentials set in aiplatform.init.
Returns:
(List[PipelineJob]):
A list of PipelineJob resource objects.
"""
self = cls._empty_constructor(
project=project,
location=location,
credentials=credentials,
)

# TODO: this takes a long time for projects with many pipeline executions. Is there a faster way to do this?
all_pipeline_jobs = pipeline_jobs.PipelineJob.list(
project=project,
location=location,
credentials=credentials,
)

service_pipeline_jobs = []

for job in all_pipeline_jobs:
try:
self._validate_pipeline_template_matches_service(job)
service_pipeline_jobs.append(job)

finally:
return service_pipeline_jobs
8 changes: 7 additions & 1 deletion google/cloud/aiplatform/model_evaluation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,11 @@
#

from google.cloud.aiplatform.model_evaluation.model_evaluation import ModelEvaluation
from google.cloud.aiplatform.model_evaluation.model_evaluation_job import (
ModelEvaluationJob,
)

__all__ = ("ModelEvaluation",)
__all__ = (
"ModelEvaluation",
"ModelEvaluationJob",
)
6 changes: 4 additions & 2 deletions google/cloud/aiplatform/model_evaluation/model_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
#

from google.auth import credentials as auth_credentials
from google.cloud import aiplatform

from google.cloud.aiplatform import base
from google.cloud.aiplatform import utils
from google.cloud.aiplatform import models
from google.protobuf import struct_pb2

from typing import Optional
Expand Down Expand Up @@ -82,7 +82,7 @@ def __init__(

self._gca_resource = self._get_gca_resource(
resource_name=evaluation_name,
parent_resource_name_fields={models.Model._resource_noun: model_id}
parent_resource_name_fields={aiplatform.Model._resource_noun: model_id}
if model_id
else model_id,
)
Expand All @@ -91,3 +91,5 @@ def delete(self):
raise NotImplementedError(
"Deleting a model evaluation has not been implemented yet."
)

# TODO: decide if we should add coverage for the gapic import_model_evaluation method
Loading