Skip to content

Commit

Permalink
feat: Add OpenLineage support for some BQ operators (#45422)
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Muda <mudakacper@gmail.com>
  • Loading branch information
kacpermuda authored Jan 7, 2025
1 parent 1dcc29a commit 4c5d85a
Show file tree
Hide file tree
Showing 2 changed files with 299 additions and 34 deletions.
165 changes: 131 additions & 34 deletions providers/src/airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from google.api_core.exceptions import Conflict
from google.cloud.bigquery import DEFAULT_RETRY, CopyJob, ExtractJob, LoadJob, QueryJob, Row
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import RowIterator, Table, TableReference

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
Expand Down Expand Up @@ -1339,6 +1339,7 @@ def __init__(
self.cluster_fields = cluster_fields
self.table_resource = table_resource
self.impersonation_chain = impersonation_chain
self._table: Table | None = None
if exists_ok is not None:
warnings.warn(
"`exists_ok` parameter is deprecated, please use `if_exists`",
Expand Down Expand Up @@ -1369,6 +1370,7 @@ def execute(self, context: Context) -> None:

try:
self.log.info("Creating table")
# Save table as attribute for further use by OpenLineage
self._table = bq_hook.create_empty_table(
project_id=self.project_id,
dataset_id=self.dataset_id,
Expand Down Expand Up @@ -1414,19 +1416,22 @@ def execute(self, context: Context) -> None:

BigQueryTableLink.persist(**persist_kwargs)

def get_openlineage_facets_on_complete(self, task_instance):
def get_openlineage_facets_on_complete(self, _):
"""Implement _on_complete as we will use table resource returned by create method."""
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.google.cloud.openlineage.utils import (
BIGQUERY_NAMESPACE,
get_facets_from_bq_table,
)
from airflow.providers.openlineage.extractors import OperatorLineage

table_info = self._table.to_api_repr()["tableReference"]
table_id = ".".join((table_info["projectId"], table_info["datasetId"], table_info["tableId"]))
if not self._table:
self.log.debug("OpenLineage did not find `self._table` attribute.")
return OperatorLineage()

output_dataset = Dataset(
namespace=BIGQUERY_NAMESPACE,
name=table_id,
name=f"{self._table.project}.{self._table.dataset_id}.{self._table.table_id}",
facets=get_facets_from_bq_table(self._table),
)

Expand Down Expand Up @@ -1649,6 +1654,7 @@ def __init__(
self.encryption_configuration = encryption_configuration
self.location = location
self.impersonation_chain = impersonation_chain
self._table: Table | None = None

def execute(self, context: Context) -> None:
bq_hook = BigQueryHook(
Expand All @@ -1657,15 +1663,16 @@ def execute(self, context: Context) -> None:
impersonation_chain=self.impersonation_chain,
)
if self.table_resource:
# Save table as attribute for further use by OpenLineage
self._table = bq_hook.create_empty_table(
table_resource=self.table_resource,
)
BigQueryTableLink.persist(
context=context,
task_instance=self,
dataset_id=self._table.to_api_repr()["tableReference"]["datasetId"],
project_id=self._table.to_api_repr()["tableReference"]["projectId"],
table_id=self._table.to_api_repr()["tableReference"]["tableId"],
dataset_id=self._table.dataset_id,
project_id=self._table.project,
table_id=self._table.table_id,
)
return

Expand Down Expand Up @@ -1716,31 +1723,29 @@ def execute(self, context: Context) -> None:
"encryptionConfiguration": self.encryption_configuration,
}

self._table = bq_hook.create_empty_table(
table_resource=table_resource,
)
# Save table as attribute for further use by OpenLineage
self._table = bq_hook.create_empty_table(table_resource=table_resource)

BigQueryTableLink.persist(
context=context,
task_instance=self,
dataset_id=self._table.to_api_repr()["tableReference"]["datasetId"],
project_id=self._table.to_api_repr()["tableReference"]["projectId"],
table_id=self._table.to_api_repr()["tableReference"]["tableId"],
dataset_id=self._table.dataset_id,
project_id=self._table.project,
table_id=self._table.table_id,
)

def get_openlineage_facets_on_complete(self, task_instance):
def get_openlineage_facets_on_complete(self, _):
"""Implement _on_complete as we will use table resource returned by create method."""
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.google.cloud.openlineage.utils import (
BIGQUERY_NAMESPACE,
get_facets_from_bq_table,
)
from airflow.providers.openlineage.extractors import OperatorLineage

table_info = self._table.to_api_repr()["tableReference"]
table_id = ".".join((table_info["projectId"], table_info["datasetId"], table_info["tableId"]))
output_dataset = Dataset(
namespace=BIGQUERY_NAMESPACE,
name=table_id,
name=f"{self._table.project}.{self._table.dataset_id}.{self._table.table_id}",
facets=get_facets_from_bq_table(self._table),
)

Expand Down Expand Up @@ -2133,6 +2138,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.table_resource = table_resource
self.impersonation_chain = impersonation_chain
self._table: dict | None = None
super().__init__(**kwargs)

def execute(self, context: Context):
Expand All @@ -2141,7 +2147,8 @@ def execute(self, context: Context):
impersonation_chain=self.impersonation_chain,
)

table = bq_hook.update_table(
# Save table as attribute for further use by OpenLineage
self._table = bq_hook.update_table(
table_resource=self.table_resource,
fields=self.fields,
dataset_id=self.dataset_id,
Expand All @@ -2152,12 +2159,30 @@ def execute(self, context: Context):
BigQueryTableLink.persist(
context=context,
task_instance=self,
dataset_id=table["tableReference"]["datasetId"],
project_id=table["tableReference"]["projectId"],
table_id=table["tableReference"]["tableId"],
dataset_id=self._table["tableReference"]["datasetId"],
project_id=self._table["tableReference"]["projectId"],
table_id=self._table["tableReference"]["tableId"],
)

return self._table

def get_openlineage_facets_on_complete(self, _):
"""Implement _on_complete as we will use table resource returned by update method."""
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.google.cloud.openlineage.utils import (
BIGQUERY_NAMESPACE,
get_facets_from_bq_table,
)
from airflow.providers.openlineage.extractors import OperatorLineage

return table
table = Table.from_api_repr(self._table)
output_dataset = Dataset(
namespace=BIGQUERY_NAMESPACE,
name=f"{table.project}.{table.dataset_id}.{table.table_id}",
facets=get_facets_from_bq_table(table),
)

return OperatorLineage(outputs=[output_dataset])


class BigQueryUpdateDatasetOperator(GoogleCloudBaseOperator):
Expand Down Expand Up @@ -2291,15 +2316,47 @@ def __init__(
self.ignore_if_missing = ignore_if_missing
self.location = location
self.impersonation_chain = impersonation_chain
self.hook: BigQueryHook | None = None

def execute(self, context: Context) -> None:
self.log.info("Deleting: %s", self.deletion_dataset_table)
hook = BigQueryHook(
# Save hook as attribute for further use by OpenLineage
self.hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
location=self.location,
impersonation_chain=self.impersonation_chain,
)
hook.delete_table(table_id=self.deletion_dataset_table, not_found_ok=self.ignore_if_missing)
self.hook.delete_table(table_id=self.deletion_dataset_table, not_found_ok=self.ignore_if_missing)

def get_openlineage_facets_on_complete(self, _):
"""Implement _on_complete as we need default project_id from hook."""
from airflow.providers.common.compat.openlineage.facet import (
Dataset,
LifecycleStateChange,
LifecycleStateChangeDatasetFacet,
PreviousIdentifier,
)
from airflow.providers.google.cloud.openlineage.utils import BIGQUERY_NAMESPACE
from airflow.providers.openlineage.extractors import OperatorLineage

bq_table_id = str(
TableReference.from_string(self.deletion_dataset_table, default_project=self.hook.project_id)
)
ds = Dataset(
namespace=BIGQUERY_NAMESPACE,
name=bq_table_id,
facets={
"lifecycleStateChange": LifecycleStateChangeDatasetFacet(
lifecycleStateChange=LifecycleStateChange.DROP.value,
previousIdentifier=PreviousIdentifier(
namespace=BIGQUERY_NAMESPACE,
name=bq_table_id,
),
)
},
)

return OperatorLineage(inputs=[ds])


class BigQueryUpsertTableOperator(GoogleCloudBaseOperator):
Expand Down Expand Up @@ -2358,6 +2415,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.location = location
self.impersonation_chain = impersonation_chain
self._table: dict | None = None

def execute(self, context: Context) -> None:
self.log.info("Upserting Dataset: %s with table_resource: %s", self.dataset_id, self.table_resource)
Expand All @@ -2366,19 +2424,38 @@ def execute(self, context: Context) -> None:
location=self.location,
impersonation_chain=self.impersonation_chain,
)
table = hook.run_table_upsert(
# Save table as attribute for further use by OpenLineage
self._table = hook.run_table_upsert(
dataset_id=self.dataset_id,
table_resource=self.table_resource,
project_id=self.project_id,
)
BigQueryTableLink.persist(
context=context,
task_instance=self,
dataset_id=table["tableReference"]["datasetId"],
project_id=table["tableReference"]["projectId"],
table_id=table["tableReference"]["tableId"],
dataset_id=self._table["tableReference"]["datasetId"],
project_id=self._table["tableReference"]["projectId"],
table_id=self._table["tableReference"]["tableId"],
)

def get_openlineage_facets_on_complete(self, _):
"""Implement _on_complete as we will use table resource returned by upsert method."""
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.google.cloud.openlineage.utils import (
BIGQUERY_NAMESPACE,
get_facets_from_bq_table,
)
from airflow.providers.openlineage.extractors import OperatorLineage

table = Table.from_api_repr(self._table)
output_dataset = Dataset(
namespace=BIGQUERY_NAMESPACE,
name=f"{table.project}.{table.dataset_id}.{table.table_id}",
facets=get_facets_from_bq_table(table),
)

return OperatorLineage(outputs=[output_dataset])


class BigQueryUpdateTableSchemaOperator(GoogleCloudBaseOperator):
"""
Expand Down Expand Up @@ -2466,14 +2543,16 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self.location = location
self._table: dict | None = None
super().__init__(**kwargs)

def execute(self, context: Context):
bq_hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, location=self.location
)

table = bq_hook.update_table_schema(
# Save table as attribute for further use by OpenLineage
self._table = bq_hook.update_table_schema(
schema_fields_updates=self.schema_fields_updates,
include_policy_tags=self.include_policy_tags,
dataset_id=self.dataset_id,
Expand All @@ -2484,11 +2563,29 @@ def execute(self, context: Context):
BigQueryTableLink.persist(
context=context,
task_instance=self,
dataset_id=table["tableReference"]["datasetId"],
project_id=table["tableReference"]["projectId"],
table_id=table["tableReference"]["tableId"],
dataset_id=self._table["tableReference"]["datasetId"],
project_id=self._table["tableReference"]["projectId"],
table_id=self._table["tableReference"]["tableId"],
)
return table
return self._table

def get_openlineage_facets_on_complete(self, _):
"""Implement _on_complete as we will use table resource returned by update method."""
from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.google.cloud.openlineage.utils import (
BIGQUERY_NAMESPACE,
get_facets_from_bq_table,
)
from airflow.providers.openlineage.extractors import OperatorLineage

table = Table.from_api_repr(self._table)
output_dataset = Dataset(
namespace=BIGQUERY_NAMESPACE,
name=f"{table.project}.{table.dataset_id}.{table.table_id}",
facets=get_facets_from_bq_table(table),
)

return OperatorLineage(outputs=[output_dataset])


class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryInsertJobOperatorOpenLineageMixin):
Expand Down
Loading

0 comments on commit 4c5d85a

Please sign in to comment.