Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions providers/google/docs/operators/cloud/vertex_ai.rst
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,19 @@ To delete experiment run you can use
:start-after: [START how_to_cloud_vertex_ai_delete_experiment_run_operator]
:end-before: [END how_to_cloud_vertex_ai_delete_experiment_run_operator]

Use Private Service Connect interface
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

You can configure Private Service Connect interface connections for
:class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.CreateCustomContainerTrainingJobOperator`,
:class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.CreateCustomPythonPackageTrainingJobOperator`,
:class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.CreateCustomTrainingJobOperator` and
:class:`~airflow.providers.google.cloud.operators.vertex_ai.ray.CreateRayClusterOperator`
operators in Vertex AI. For doing it you must first configure the PSC interface by following the provided
`documentation <https://cloud.google.com/vertex-ai/docs/general/vpc-psc-i-setup>`__.
Then, specify the PSC configuration in the ``psc_interface_config`` parameter.


Reference
^^^^^^^^^

Expand Down
2 changes: 1 addition & 1 deletion providers/google/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ dependencies = [
# google-cloud-aiplatform doesn't install ray for python 3.12 (issue: https://github.com/googleapis/python-aiplatform/issues/5252).
# Temporarily lock in ray 2.42.0 which is compatible with python 3.12 until linked issue is solved.
# Remove the ray dependency as well as google-cloud-bigquery-storage once linked issue is fixed
"google-cloud-aiplatform[evaluation]>=1.73.0",
"google-cloud-aiplatform[evaluation]>=1.98.0",
"ray[default]>=2.42.0 ; python_version < '3.13'",
"google-cloud-bigquery-storage>=2.31.0 ; python_version < '3.13'",
"google-cloud-alloydb>=0.4.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from google.cloud.aiplatform_v1.services.pipeline_service.pagers import (
ListTrainingPipelinesPager,
)
from google.cloud.aiplatform_v1.types import CustomJob, TrainingPipeline
from google.cloud.aiplatform_v1.types import CustomJob, PscInterfaceConfig, TrainingPipeline


class CustomJobHook(GoogleBaseHook, OperationHelper):
Expand Down Expand Up @@ -317,6 +317,7 @@ def _run_job(
is_default_version: bool | None = None,
model_version_aliases: list[str] | None = None,
model_version_description: str | None = None,
psc_interface_config: PscInterfaceConfig | None = None,
) -> tuple[models.Model | None, str, str]:
"""Run a training pipeline job and wait until its completion."""
model = job.run(
Expand Down Expand Up @@ -350,6 +351,7 @@ def _run_job(
is_default_version=is_default_version,
model_version_aliases=model_version_aliases,
model_version_description=model_version_description,
psc_interface_config=psc_interface_config,
)
training_id = self.extract_training_id(job.resource_name)
custom_job_id = self.extract_custom_job_id(
Expand Down Expand Up @@ -574,6 +576,7 @@ def create_custom_container_training_job(
timestamp_split_column_name: str | None = None,
tensorboard: str | None = None,
sync=True,
psc_interface_config: PscInterfaceConfig | None = None,
) -> tuple[models.Model | None, str, str]:
"""
Create Custom Container Training Job.
Expand Down Expand Up @@ -837,6 +840,8 @@ def create_custom_container_training_job(
:param sync: Whether to execute the AI Platform job synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
training.
"""
self._job = self.get_custom_container_training_job(
project=project_id,
Expand Down Expand Up @@ -896,6 +901,7 @@ def create_custom_container_training_job(
is_default_version=is_default_version,
model_version_aliases=model_version_aliases,
model_version_description=model_version_description,
psc_interface_config=psc_interface_config,
)

return model, training_id, custom_job_id
Expand Down Expand Up @@ -958,6 +964,7 @@ def create_custom_python_package_training_job(
model_version_aliases: list[str] | None = None,
model_version_description: str | None = None,
sync=True,
psc_interface_config: PscInterfaceConfig | None = None,
) -> tuple[models.Model | None, str, str]:
"""
Create Custom Python Package Training Job.
Expand Down Expand Up @@ -1220,6 +1227,8 @@ def create_custom_python_package_training_job(
:param sync: Whether to execute the AI Platform job synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
training.
"""
self._job = self.get_custom_python_package_training_job(
project=project_id,
Expand Down Expand Up @@ -1280,6 +1289,7 @@ def create_custom_python_package_training_job(
is_default_version=is_default_version,
model_version_aliases=model_version_aliases,
model_version_description=model_version_description,
psc_interface_config=psc_interface_config,
)

return model, training_id, custom_job_id
Expand Down Expand Up @@ -1342,6 +1352,7 @@ def create_custom_training_job(
timestamp_split_column_name: str | None = None,
tensorboard: str | None = None,
sync=True,
psc_interface_config: PscInterfaceConfig | None = None,
) -> tuple[models.Model | None, str, str]:
"""
Create Custom Training Job.
Expand Down Expand Up @@ -1604,6 +1615,8 @@ def create_custom_training_job(
:param sync: Whether to execute the AI Platform job synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
training.
"""
self._job = self.get_custom_training_job(
project=project_id,
Expand Down Expand Up @@ -1664,6 +1677,7 @@ def create_custom_training_job(
is_default_version=is_default_version,
model_version_aliases=model_version_aliases,
model_version_description=model_version_description,
psc_interface_config=psc_interface_config,
)

return model, training_id, custom_job_id
Expand Down Expand Up @@ -1725,6 +1739,7 @@ def submit_custom_container_training_job(
predefined_split_column_name: str | None = None,
timestamp_split_column_name: str | None = None,
tensorboard: str | None = None,
psc_interface_config: PscInterfaceConfig | None = None,
) -> CustomContainerTrainingJob:
"""
Create and submit a Custom Container Training Job pipeline, then exit without waiting for it to complete.
Expand Down Expand Up @@ -1985,6 +2000,8 @@ def submit_custom_container_training_job(
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
training.
"""
self._job = self.get_custom_container_training_job(
project=project_id,
Expand Down Expand Up @@ -2043,6 +2060,7 @@ def submit_custom_container_training_job(
model_version_aliases=model_version_aliases,
model_version_description=model_version_description,
sync=False,
psc_interface_config=psc_interface_config,
)
return self._job

Expand Down Expand Up @@ -2104,6 +2122,7 @@ def submit_custom_python_package_training_job(
is_default_version: bool | None = None,
model_version_aliases: list[str] | None = None,
model_version_description: str | None = None,
psc_interface_config: PscInterfaceConfig | None = None,
) -> CustomPythonPackageTrainingJob:
"""
Create and submit a Custom Python Package Training Job pipeline, then exit without waiting for it to complete.
Expand Down Expand Up @@ -2363,6 +2382,8 @@ def submit_custom_python_package_training_job(
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
training.
"""
self._job = self.get_custom_python_package_training_job(
project=project_id,
Expand Down Expand Up @@ -2422,6 +2443,7 @@ def submit_custom_python_package_training_job(
model_version_aliases=model_version_aliases,
model_version_description=model_version_description,
sync=False,
psc_interface_config=psc_interface_config,
)

return self._job
Expand Down Expand Up @@ -2484,6 +2506,7 @@ def submit_custom_training_job(
predefined_split_column_name: str | None = None,
timestamp_split_column_name: str | None = None,
tensorboard: str | None = None,
psc_interface_config: PscInterfaceConfig | None = None,
) -> CustomTrainingJob:
"""
Create and submit a Custom Training Job pipeline, then exit without waiting for it to complete.
Expand Down Expand Up @@ -2747,6 +2770,8 @@ def submit_custom_training_job(
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
training.
"""
self._job = self.get_custom_training_job(
project=project_id,
Expand Down Expand Up @@ -2806,6 +2831,7 @@ def submit_custom_training_job(
model_version_aliases=model_version_aliases,
model_version_description=model_version_description,
sync=False,
psc_interface_config=psc_interface_config,
)
return self._job

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
CustomPythonPackageTrainingJob,
CustomTrainingJob,
)
from google.cloud.aiplatform_v1.types import PscInterfaceConfig

from airflow.utils.context import Context

Expand Down Expand Up @@ -110,6 +111,7 @@ def __init__(
predefined_split_column_name: str | None = None,
timestamp_split_column_name: str | None = None,
tensorboard: str | None = None,
psc_interface_config: PscInterfaceConfig | None = None,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
Expand Down Expand Up @@ -166,6 +168,7 @@ def __init__(
self.predefined_split_column_name = predefined_split_column_name
self.timestamp_split_column_name = timestamp_split_column_name
self.tensorboard = tensorboard
self.psc_interface_config = psc_interface_config
# END Run param
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
Expand Down Expand Up @@ -473,6 +476,8 @@ class CreateCustomContainerTrainingJobOperator(CustomTrainingJobBaseOperator):
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
training.
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
Expand Down Expand Up @@ -586,6 +591,7 @@ def execute(self, context: Context):
timestamp_split_column_name=self.timestamp_split_column_name,
tensorboard=self.tensorboard,
sync=True,
psc_interface_config=self.psc_interface_config,
)

if model:
Expand Down Expand Up @@ -652,6 +658,7 @@ def invoke_defer(self, context: Context) -> None:
predefined_split_column_name=self.predefined_split_column_name,
timestamp_split_column_name=self.timestamp_split_column_name,
tensorboard=self.tensorboard,
psc_interface_config=self.psc_interface_config,
)
custom_container_training_job_obj.wait_for_resource_creation()
training_pipeline_id: str = custom_container_training_job_obj.name
Expand Down Expand Up @@ -931,6 +938,8 @@ class CreateCustomPythonPackageTrainingJobOperator(CustomTrainingJobBaseOperator
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
training.
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
Expand Down Expand Up @@ -1043,6 +1052,7 @@ def execute(self, context: Context):
timestamp_split_column_name=self.timestamp_split_column_name,
tensorboard=self.tensorboard,
sync=True,
psc_interface_config=self.psc_interface_config,
)

if model:
Expand Down Expand Up @@ -1110,6 +1120,7 @@ def invoke_defer(self, context: Context) -> None:
predefined_split_column_name=self.predefined_split_column_name,
timestamp_split_column_name=self.timestamp_split_column_name,
tensorboard=self.tensorboard,
psc_interface_config=self.psc_interface_config,
)
custom_python_training_job_obj.wait_for_resource_creation()
training_pipeline_id: str = custom_python_training_job_obj.name
Expand Down Expand Up @@ -1389,6 +1400,8 @@ class CreateCustomTrainingJobOperator(CustomTrainingJobBaseOperator):
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
:param psc_interface_config: Optional. Configuration for Private Service Connect interface used for
training.
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
Expand Down Expand Up @@ -1506,6 +1519,7 @@ def execute(self, context: Context):
timestamp_split_column_name=self.timestamp_split_column_name,
tensorboard=self.tensorboard,
sync=True,
psc_interface_config=None,
)

if model:
Expand Down Expand Up @@ -1573,6 +1587,7 @@ def invoke_defer(self, context: Context) -> None:
predefined_split_column_name=self.predefined_split_column_name,
timestamp_split_column_name=self.timestamp_split_column_name,
tensorboard=self.tensorboard,
psc_interface_config=self.psc_interface_config,
)
custom_training_job_obj.wait_for_resource_creation()
training_pipeline_id: str = custom_training_job_obj.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ def test_execute(self, mock_hook, mock_dataset):
is_default_version=None,
model_version_aliases=None,
model_version_description=None,
psc_interface_config=None,
)

@mock.patch(VERTEX_AI_PATH.format("custom_job.Dataset"))
Expand Down Expand Up @@ -407,6 +408,7 @@ def test_execute__parent_model_version_index_is_removed(self, mock_hook, mock_da
is_default_version=None,
model_version_aliases=None,
model_version_description=None,
psc_interface_config=None,
)

@mock.patch(VERTEX_AI_PATH.format("custom_job.CreateCustomContainerTrainingJobOperator.hook"))
Expand Down Expand Up @@ -648,6 +650,7 @@ def test_execute(self, mock_hook, mock_dataset):
timestamp_split_column_name=None,
tensorboard=None,
sync=True,
psc_interface_config=None,
)

@mock.patch(VERTEX_AI_PATH.format("custom_job.Dataset"))
Expand Down Expand Up @@ -736,6 +739,7 @@ def test_execute__parent_model_version_index_is_removed(self, mock_hook, mock_da
timestamp_split_column_name=None,
tensorboard=None,
sync=True,
psc_interface_config=None,
)

@mock.patch(VERTEX_AI_PATH.format("custom_job.CreateCustomPythonPackageTrainingJobOperator.hook"))
Expand Down Expand Up @@ -976,6 +980,7 @@ def test_execute(self, mock_hook, mock_dataset):
is_default_version=None,
model_version_aliases=None,
model_version_description=None,
psc_interface_config=None,
)

@mock.patch(VERTEX_AI_PATH.format("custom_job.Dataset"))
Expand Down Expand Up @@ -1057,6 +1062,7 @@ def test_execute__parent_model_version_index_is_removed(self, mock_hook, mock_da
is_default_version=None,
model_version_aliases=None,
model_version_description=None,
psc_interface_config=None,
)

@mock.patch(VERTEX_AI_PATH.format("custom_job.CreateCustomTrainingJobOperator.hook"))
Expand Down
Loading