Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
from airflow.providers.google.cloud.triggers.bigquery import BigQueryInsertJobTrigger
from airflow.utils.helpers import merge_dicts

if TYPE_CHECKING:
from google.api_core.retry import Retry
Expand Down Expand Up @@ -294,6 +295,8 @@ def __init__(
self.reattach_states: set[str] = reattach_states or set()
self.cancel_on_kill = cancel_on_kill

self.source_uris: list[str] = []

def _submit_job(
self,
hook: BigQueryHook,
Expand Down Expand Up @@ -731,3 +734,77 @@ def on_kill(self) -> None:
self.hook.cancel_job(job_id=self.job_id, location=self.location) # type: ignore[union-attr]
else:
self.log.info("Skipping to cancel job: %s.%s", self.location, self.job_id)

def get_openlineage_facets_on_complete(self, task_instance):
"""Implementing on_complete as we will include final BQ job id."""
from pathlib import Path

from openlineage.client.facet import (
ExternalQueryRunFacet,
SymlinksDatasetFacet,
SymlinksDatasetFacetIdentifiers,
)
from openlineage.client.run import Dataset

from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
from airflow.providers.google.cloud.utils.openlineage import (
get_facets_from_bq_table,
get_identity_column_lineage_facet,
)
from airflow.providers.openlineage.extractors import OperatorLineage

table_object = self.hook.get_client(self.hook.project_id).get_table(
self.destination_project_dataset_table
)

output_dataset_facets = get_facets_from_bq_table(table_object)

input_dataset_facets = {
"schema": output_dataset_facets["schema"],
}
input_datasets = []
for uri in sorted(self.source_uris):
bucket, blob = _parse_gcs_url(uri)
additional_facets = {}

if "*" in blob:
# If wildcard ("*") is used in gcs path, we want the name of dataset to be directory name,
# but we create a symlink to the full object path with wildcard.
additional_facets = {
"symlink": SymlinksDatasetFacet(
identifiers=[
SymlinksDatasetFacetIdentifiers(
namespace=f"gs://{bucket}", name=blob, type="file"
)
]
),
}
blob = Path(blob).parent.as_posix()
if blob == ".":
# blob path does not have leading slash, but we need root dataset name to be "/"
blob = "/"

dataset = Dataset(
namespace=f"gs://{bucket}",
name=blob,
facets=merge_dicts(input_dataset_facets, additional_facets),
)
input_datasets.append(dataset)

output_dataset_facets["columnLineage"] = get_identity_column_lineage_facet(
field_names=[field.name for field in table_object.schema], input_datasets=input_datasets
)

output_dataset = Dataset(
namespace="bigquery",
name=str(table_object.reference),
facets=output_dataset_facets,
)

run_facets = {}
if self.job_id:
run_facets = {
"externalQuery": ExternalQueryRunFacet(externalQueryId=self.job_id, source="bigquery"),
}

return OperatorLineage(inputs=input_datasets, outputs=[output_dataset], run_facets=run_facets)
Loading