Skip to content

Commit

Permalink
Refactor ExternalDagLink to not create ad hoc TaskInstances
Browse files Browse the repository at this point in the history
This operator link creates an ad hoc TaskInstance to build its output URL. Generally we moved away from this approach starting with apache#21285.

This PR updates the operator link to use `TaskInstance.get_task_instance()` instead.
  • Loading branch information
josh-fell committed Dec 9, 2023
1 parent 845bcd1 commit d3e0743
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions airflow/sensors/external_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
if TYPE_CHECKING:
from sqlalchemy.orm import Query, Session

from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context


Expand All @@ -57,10 +59,20 @@ class ExternalDagLink(BaseOperatorLink):

name = "External DAG"

def get_link(self, operator, dttm):
ti = TaskInstance(task=operator, execution_date=dttm)
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
ti = TaskInstance.get_task_instance(
dag_id=ti_key.dag_id, run_id=ti_key.run_id, task_id=ti_key.task_id, map_index=ti_key.map_index
)

if TYPE_CHECKING:
assert ti is not None

operator.render_template_fields(ti.get_template_context())
query = {"dag_id": operator.external_dag_id, "execution_date": dttm.isoformat()}
query = {
"dag_id": operator.external_dag_id, # type: ignore[attr-defined]
"execution_date": ti.execution_date.isoformat(), # type: ignore[union-attr]
}

return build_airflow_url_with_query(query)


Expand Down

0 comments on commit d3e0743

Please sign in to comment.