Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
from airflow.api_fastapi.core_api.datamodels.ui.structure import StructureDataResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import requires_access_dag
from airflow.api_fastapi.core_api.services.ui.structure import get_upstream_assets
from airflow.api_fastapi.core_api.services.ui.structure import (
bind_output_assets_to_tasks,
get_upstream_assets,
)
from airflow.models.dag_version import DagVersion
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.dag_edges import dag_edges
Expand Down Expand Up @@ -139,4 +142,6 @@ def structure_data(

data["edges"] += start_edges + edges + end_edges

bind_output_assets_to_tasks(data["edges"], serialized_dag)

return StructureDataResponse(**data)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

from __future__ import annotations

from airflow.models.serialized_dag import SerializedDagModel


def get_upstream_assets(
asset_expression: dict, entry_node_ref: str, level: int = 0
Expand Down Expand Up @@ -112,3 +114,32 @@ def get_upstream_assets(
edges = edges + e

return nodes, edges


def bind_output_assets_to_tasks(edges: list[dict], serialized_dag: SerializedDagModel) -> None:
"""
Try to bind the downstream assets to the relevant task that produces them.

This function will mutate the `edges` in place.
"""
outlet_asset_references = serialized_dag.dag_model.task_outlet_asset_references

downstream_asset_related_edges = [edge for edge in edges if edge["target_id"].startswith("asset:")]

for edge in downstream_asset_related_edges:
asset_id = int(edge["target_id"].strip("asset:"))
try:
# Try to attach the outlet asset to the relevant task
outlet_asset_reference = next(
outlet_asset_reference
for outlet_asset_reference in outlet_asset_references
if outlet_asset_reference.asset_id == asset_id
)
edge["source_id"] = outlet_asset_reference.task_id
continue
except StopIteration:
# If no asset reference found, fallback to using the exit node reference
# This can happen because asset aliases are not yet handled, they do no populate
# the `outlet_asset_references` when resolved. Extra lookup is needed. Same for asset-name-ref and
# asset-uri-ref.
pass
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def test_should_return_200_with_asset(self, test_client, asset1_id, asset2_id, a
{
"is_setup_teardown": None,
"label": None,
"source_id": "task_2",
"source_id": "task_1",
"target_id": f"asset:{asset3_id}",
"is_source_asset": None,
},
Expand Down