Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,87 @@ use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUp
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_update_entry_group]
:end-before: [END howto_operator_dataplex_catalog_update_entry_group]

.. _howto/operator:DataplexCatalogCreateEntryTypeOperator:

Create an EntryType
--------------------

To create an Entry Type in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryTypeOperator`
For more information about the available fields to pass when creating an Entry Type, visit `Entry Type resource configuration. <https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.entryTypes#EntryType>`__

A simple Entry Group configuration can look as followed:

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 0
:start-after: [START howto_dataplex_entry_type_configuration]
:end-before: [END howto_dataplex_entry_type_configuration]

With this configuration you can create an Entry Type resource:

:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryTypeOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_create_entry_type]
:end-before: [END howto_operator_dataplex_catalog_create_entry_type]

.. _howto/operator:DataplexCatalogDeleteEntryTypeOperator:

Delete an EntryType
--------------------

To delete an Entry Type in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDeleteEntryTypeOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_delete_entry_type]
:end-before: [END howto_operator_dataplex_catalog_delete_entry_type]

.. _howto/operator:DataplexCatalogListEntryTypesOperator:

List EntryTypes
----------------

To list all Entry Types in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogListEntryTypesOperator`.
This operator also supports filtering and ordering the result of the operation.

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_list_entry_types]
:end-before: [END howto_operator_dataplex_catalog_list_entry_types]

.. _howto/operator:DataplexCatalogGetEntryTypeOperator:

Get an EntryType
-----------------

To retrieve an Entry Group in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGetEntryTypeOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_get_entry_type]
:end-before: [END howto_operator_dataplex_catalog_get_entry_type]

.. _howto/operator:DataplexCatalogUpdateEntryTypeOperator:

Update an EntryType
--------------------

To update an Entry Type in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUpdateEntryTypeOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_update_entry_type]
:end-before: [END howto_operator_dataplex_catalog_update_entry_type]
2 changes: 2 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,8 @@ EntryGroup
EntryGroups
entrypoint
entrypoints
EntryType
EntryTypes
Enum
enum
enums
Expand Down
200 changes: 199 additions & 1 deletion providers/src/airflow/providers/google/cloud/hooks/dataplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
DataScan,
DataScanJob,
EntryGroup,
EntryType,
Lake,
Task,
Zone,
Expand All @@ -54,7 +55,10 @@
from google.api_core.operation import Operation
from google.api_core.retry import Retry
from google.api_core.retry_async import AsyncRetry
from google.cloud.dataplex_v1.services.catalog_service.pagers import ListEntryGroupsPager
from google.cloud.dataplex_v1.services.catalog_service.pagers import (
ListEntryGroupsPager,
ListEntryTypesPager,
)
from googleapiclient.discovery import Resource

PATH_DATA_SCAN = "projects/{project_id}/locations/{region}/dataScans/{data_scan_id}"
Expand Down Expand Up @@ -134,6 +138,200 @@ def wait_for_operation(self, timeout: float | None, operation: Operation):
error = operation.exception(timeout=timeout)
raise AirflowException(error)

@GoogleBaseHook.fallback_to_default_project_id
def create_entry_type(
self,
location: str,
entry_type_id: str,
entry_type_configuration: EntryType | dict,
project_id: str = PROVIDE_PROJECT_ID,
validate_only: bool = False,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Create an EntryType resource.

:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param entry_type_id: Required. EntryType identifier.
:param entry_type_configuration: Required. EntryType configuration body.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param validate_only: Optional. If set, performs request validation, but does not actually execute
the create request.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.create_entry_type(
request={
"parent": client.common_location_path(project_id, location),
"entry_type_id": entry_type_id,
"entry_type": entry_type_configuration,
"validate_only": validate_only,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def get_entry_type(
self,
location: str,
entry_type_id: str,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> EntryType:
"""
Get an EntryType resource.

:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param entry_type_id: Required. EntryGroup identifier.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.get_entry_type(
request={
"name": client.entry_type_path(project_id, location, entry_type_id),
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def delete_entry_type(
self,
location: str,
entry_type_id: str,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Delete an EntryType resource.

:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param entry_type_id: Required. EntryType identifier.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.delete_entry_type(
request={
"name": client.entry_type_path(project_id, location, entry_type_id),
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def list_entry_types(
self,
location: str,
filter_by: str | None = None,
order_by: str | None = None,
page_size: int | None = None,
page_token: str | None = None,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> ListEntryTypesPager:
"""
List EntryTypes resources from specific location.

:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param filter_by: Optional. Filter to apply on the list results.
:param order_by: Optional. Fields to order the results by.
:param page_size: Optional. Maximum number of EntryGroups to return on one page.
:param page_token: Optional. Token to retrieve the next page of results.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.list_entry_types(
request={
"parent": client.common_location_path(project_id, location),
"filter": filter_by,
"order_by": order_by,
"page_size": page_size,
"page_token": page_token,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def update_entry_type(
self,
location: str,
entry_type_id: str,
entry_type_configuration: dict | EntryType,
project_id: str = PROVIDE_PROJECT_ID,
update_mask: list[str] | FieldMask | None = None,
validate_only: bool | None = False,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Update an EntryType resource.

:param entry_type_id: Required. ID of the EntryType to update.
:param entry_type_configuration: Required. The updated configuration body of the EntryType.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param update_mask: Optional. Names of fields whose values to overwrite on an entry group.
If this parameter is absent or empty, all modifiable fields are overwritten. If such
fields are non-required and omitted in the request body, their values are emptied.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param validate_only: Optional. The service validates the request without performing any mutations.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
_entry_type = (
deepcopy(entry_type_configuration)
if isinstance(entry_type_configuration, dict)
else EntryType.to_dict(entry_type_configuration)
)
_entry_type["name"] = client.entry_type_path(project_id, location, entry_type_id)
return client.update_entry_type(
request={
"entry_type": _entry_type,
"update_mask": FieldMask(paths=update_mask) if type(update_mask) is list else update_mask,
"validate_only": validate_only,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def create_entry_group(
self,
Expand Down
49 changes: 49 additions & 0 deletions providers/src/airflow/providers/google/cloud/links/dataplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
DATAPLEX_CATALOG_ENTRY_GROUP_LINK = (
"/dataplex/projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}?project={project_id}"
)
DATAPLEX_CATALOG_ENTRY_TYPE_LINK = (
"/dataplex/projects/{project_id}/locations/{location}/entryTypes/{entry_type_id}?project={project_id}"
)
DATAPLEX_CATALOG_ENTRY_TYPES_LINK = "/dataplex/catalog/entry-types?project={project_id}"


class DataplexTaskLink(BaseGoogleLink):
Expand Down Expand Up @@ -150,3 +154,48 @@ def persist(
"project_id": task_instance.project_id,
},
)


class DataplexCatalogEntryTypeLink(BaseGoogleLink):
"""Helper class for constructing Dataplex Catalog EntryType link."""

name = "Dataplex Catalog EntryType"
key = "dataplex_catalog_entry_type_key"
format_str = DATAPLEX_CATALOG_ENTRY_TYPE_LINK

@staticmethod
def persist(
context: Context,
task_instance,
):
task_instance.xcom_push(
context=context,
key=DataplexCatalogEntryTypeLink.key,
value={
"entry_type_id": task_instance.entry_type_id,
"location": task_instance.location,
"project_id": task_instance.project_id,
},
)


class DataplexCatalogEntryTypesLink(BaseGoogleLink):
"""Helper class for constructing Dataplex Catalog EntryTypes link."""

name = "Dataplex Catalog EntryTypes"
key = "dataplex_catalog_entry_types_key"
format_str = DATAPLEX_CATALOG_ENTRY_TYPES_LINK

@staticmethod
def persist(
context: Context,
task_instance,
):
task_instance.xcom_push(
context=context,
key=DataplexCatalogEntryTypesLink.key,
value={
"location": task_instance.location,
"project_id": task_instance.project_id,
},
)
Loading