Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
BigQueryIntervalCheckTrigger,
BigQueryValueCheckTrigger,
)
from airflow.providers.google.cloud.utils.bigquery import convert_job_id

if TYPE_CHECKING:
from google.cloud.bigquery import UnknownJob
Expand Down Expand Up @@ -90,8 +91,8 @@ def get_link(
*,
ti_key: TaskInstanceKey,
):
job_id = XCom.get_value(key="job_id", ti_key=ti_key)
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ""
job_id_path = XCom.get_value(key="job_id_path", ti_key=ti_key)
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id_path) if job_id_path else ""


@attr.s(auto_attribs=True)
Expand All @@ -110,7 +111,7 @@ def get_link(
*,
ti_key: TaskInstanceKey,
):
job_ids = XCom.get_value(key="job_id", ti_key=ti_key)
job_ids = XCom.get_value(key="job_id_path", ti_key=ti_key)
if not job_ids:
return None
if len(job_ids) < self.index:
Expand Down Expand Up @@ -1184,7 +1185,11 @@ def execute(self, context: Context):
]
else:
raise AirflowException(f"argument 'sql' of type {type(str)} is neither a string nor an iterable")
context["task_instance"].xcom_push(key="job_id", value=job_id)
project_id = self.hook.project_id
if project_id:
job_id_path = convert_job_id(job_id=job_id, project_id=project_id, location=self.location)
context["task_instance"].xcom_push(key="job_id_path", value=job_id_path)
return job_id

def on_kill(self) -> None:
super().on_kill()
Expand Down Expand Up @@ -2727,9 +2732,11 @@ def execute(self, context: Any):
persist_kwargs["dataset_id"] = table["datasetId"]
persist_kwargs["project_id"] = table["projectId"]
BigQueryTableLink.persist(**persist_kwargs)

self.job_id = job.job_id
context["ti"].xcom_push(key="job_id", value=self.job_id)
project_id = self.project_id or self.hook.project_id
if project_id:
job_id_path = convert_job_id(job_id=job_id, project_id=project_id, location=self.location)
context["ti"].xcom_push(key="job_id_path", value=job_id_path)
# Wait for the job to complete
if not self.deferrable:
job.result(timeout=self.result_timeout, retry=self.result_retry)
Expand Down
17 changes: 17 additions & 0 deletions airflow/providers/google/cloud/utils/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations

from typing import Any


def bq_cast(string_field: str, bq_type: str) -> None | int | float | bool | str:
"""
Expand All @@ -34,3 +36,18 @@ def bq_cast(string_field: str, bq_type: str) -> None | int | float | bool | str:
return string_field == "true"
else:
return string_field


def convert_job_id(job_id: str | list[str], project_id: str, location: str | None) -> Any:
"""
Helper method that converts to path: project_id:location:job_id
:param project_id: Required. The ID of the Google Cloud project where workspace located.
:param location: Optional. The ID of the Google Cloud region where workspace located.
:param job_id: Required. The ID of the job.
:return: str or list[str] of project_id:location:job_id.
"""
location = location if location else "US"
if isinstance(job_id, list):
return [f"{project_id}:{location}:{i}" for i in job_id]
else:
return f"{project_id}:{location}:{job_id}"
4 changes: 2 additions & 2 deletions tests/api_connexion/endpoints/test_extra_link_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def test_should_raise_403_forbidden(self):
@mock_plugin_manager(plugins=[])
def test_should_respond_200(self):
XCom.set(
key="job_id",
key="job_id_path",
value="TEST_JOB_ID",
task_id="TEST_SINGLE_QUERY",
dag_id=self.dag.dag_id,
Expand Down Expand Up @@ -171,7 +171,7 @@ def test_should_respond_200_missing_xcom(self):
@mock_plugin_manager(plugins=[])
def test_should_respond_200_multiple_links(self):
XCom.set(
key="job_id",
key="job_id_path",
value=["TEST_JOB_ID_1", "TEST_JOB_ID_2"],
task_id="TEST_MULTIPLE_QUERY",
dag_id=self.dag.dag_id,
Expand Down
46 changes: 27 additions & 19 deletions tests/providers/google/cloud/operators/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@
}
TEST_TABLE = "test-table"
GCP_CONN_ID = "google_cloud_default"
TEST_JOB_ID_1 = "test-job-id"
TEST_JOB_ID_2 = "test-123"
TEST_FULL_JOB_ID = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_1}"
TEST_FULL_JOB_ID_2 = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_2}"


class TestBigQueryCreateEmptyTableOperator:
Expand Down Expand Up @@ -673,10 +677,10 @@ def test_bigquery_operator_extra_serialized_field_when_single_query(
# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleLink)

ti.xcom_push("job_id", 12345)
ti.xcom_push("job_id_path", TEST_FULL_JOB_ID)

url = simple_task.get_extra_links(ti, BigQueryConsoleLink.name)
assert url == "https://console.cloud.google.com/bigquery?j=12345"
assert url == f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"

@pytest.mark.need_serialized_dag
def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
Expand Down Expand Up @@ -711,17 +715,18 @@ def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleIndexableLink)

job_id = ["123", "45"]
ti.xcom_push(key="job_id", value=job_id)
ti.xcom_push(key="job_id_path", value=[TEST_FULL_JOB_ID, TEST_FULL_JOB_ID_2])

assert {"BigQuery Console #1", "BigQuery Console #2"} == simple_task.operator_extra_link_dict.keys()

assert "https://console.cloud.google.com/bigquery?j=123" == simple_task.get_extra_links(
ti, "BigQuery Console #1"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== simple_task.get_extra_links(ti, "BigQuery Console #1")
)

assert "https://console.cloud.google.com/bigquery?j=45" == simple_task.get_extra_links(
ti, "BigQuery Console #2"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
== simple_task.get_extra_links(ti, "BigQuery Console #2")
)

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
Expand All @@ -740,7 +745,9 @@ def test_bigquery_operator_extra_link_when_missing_job_id(

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_bigquery_operator_extra_link_when_single_query(
self, mock_hook, create_task_instance_of_operator
self,
mock_hook,
create_task_instance_of_operator,
):
ti = create_task_instance_of_operator(
BigQueryExecuteQueryOperator,
Expand All @@ -751,11 +758,11 @@ def test_bigquery_operator_extra_link_when_single_query(
)
bigquery_task = ti.task

job_id = "12345"
ti.xcom_push(key="job_id", value=job_id)
ti.xcom_push(key="job_id_path", value=TEST_FULL_JOB_ID)

assert f"https://console.cloud.google.com/bigquery?j={job_id}" == bigquery_task.get_extra_links(
ti, BigQueryConsoleLink.name
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== bigquery_task.get_extra_links(ti, BigQueryConsoleLink.name)
)

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
Expand All @@ -771,17 +778,18 @@ def test_bigquery_operator_extra_link_when_multiple_query(
)
bigquery_task = ti.task

job_id = ["123", "45"]
ti.xcom_push(key="job_id", value=job_id)
ti.xcom_push(key="job_id_path", value=[TEST_FULL_JOB_ID, TEST_FULL_JOB_ID_2])

assert {"BigQuery Console #1", "BigQuery Console #2"} == bigquery_task.operator_extra_link_dict.keys()

assert "https://console.cloud.google.com/bigquery?j=123" == bigquery_task.get_extra_links(
ti, "BigQuery Console #1"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== bigquery_task.get_extra_links(ti, "BigQuery Console #1")
)

assert "https://console.cloud.google.com/bigquery?j=45" == bigquery_task.get_extra_links(
ti, "BigQuery Console #2"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
== bigquery_task.get_extra_links(ti, "BigQuery Console #2")
)


Expand Down