Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
d4c8020
feat(serialization): change dependency_id in DagDependency with asset…
Lee-W Mar 5, 2025
02d89d7
feat(dag_dependency): add label
Lee-W Mar 6, 2025
ce06144
test(serialization): fix test_derived_dag_deps_sensor
Lee-W Mar 6, 2025
b70e3af
test(serialization): fix test_dag_deps_assets_with_duplicate_asset
Lee-W Mar 6, 2025
4621e0c
test: fix test_derived_dag_deps_operator
Lee-W Mar 6, 2025
fcf165d
test(task_sdk): fix task_sdk tests due to _resolve_assets renaming
Lee-W Mar 6, 2025
377483d
test(api_fastapi/ui): fix test cases
Lee-W Mar 6, 2025
f1f4ff5
test(serialized_dag): fix test_order_of_deps_is_consistent
Lee-W Mar 6, 2025
423540e
feat(dot_renderer): support label
Lee-W Mar 6, 2025
5f8651e
refactor: refactor retreive_asset_models as resolve_assets_as_dag_dep…
Lee-W Mar 7, 2025
ed334e4
feat: remove db access in serialized object
Lee-W Mar 7, 2025
ddba315
feat(dag_dependency): improve dependency_type typing and split asset-…
Lee-W Mar 10, 2025
42517af
feat(serialized_dag): resolve dag dependency to include asset id
Lee-W Mar 11, 2025
a00ea54
refactor(serialized_dag): extract dag dependency resolving logic as a…
Lee-W Mar 19, 2025
9652506
test(dag_serialization): fix dag_serialization tests
Lee-W Mar 19, 2025
ec26c23
test(api_fastapi): update test cases with asset_id changes
Lee-W Mar 19, 2025
d450304
test(api_fastapi): fix test case with dynamic asset id
Lee-W Mar 19, 2025
b17ad64
fix(serialized_dag): fix postgrest dag dep handling
Lee-W Mar 20, 2025
453d756
Fix asset graph in UI
bbovenzi Mar 20, 2025
5c98ac5
test(api_fastapi): fix structure endpoint test case
Lee-W Mar 21, 2025
7b2de24
fix(api_fastapi): fix /ui/structure
Lee-W Mar 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/ui/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def get_dependencies(session: SessionDep, node_id: str | None = None) -> BaseGra
if dep.node_id not in nodes_dict:
nodes_dict[dep.node_id] = {
"id": dep.node_id,
"label": dep.dependency_id,
"label": dep.label,
"type": dep.dependency_type,
}

Expand Down
14 changes: 8 additions & 6 deletions airflow/api_fastapi/core_api/routes/ui/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,18 @@ def structure_data(
nodes.append(
{
"id": dependency.node_id,
"label": dependency.dependency_id,
"label": dependency.label,
"type": dependency.dependency_type,
}
)

upstream_asset_nodes, upstream_asset_edges = get_upstream_assets(
dag.timetable.asset_condition, entry_node_ref["id"]
)
if asset_expression := serialized_dag.dag_model.asset_expression:
upstream_asset_nodes, upstream_asset_edges = get_upstream_assets(
asset_expression, entry_node_ref["id"]
)
data["nodes"] += upstream_asset_nodes
data["edges"] = upstream_asset_edges

data["nodes"] += upstream_asset_nodes
data["edges"] = upstream_asset_edges + start_edges + edges + end_edges
data["edges"] += start_edges + edges + end_edges

return StructureDataResponse(**data)
80 changes: 50 additions & 30 deletions airflow/api_fastapi/core_api/services/ui/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,43 @@

from __future__ import annotations

from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAll, AssetAny, BaseAsset


def get_upstream_assets(
asset_condition: BaseAsset, entry_node_ref: str, level=0
asset_expression: dict, entry_node_ref: str, level: int = 0
) -> tuple[list[dict], list[dict]]:
edges: list[dict] = []
nodes: list[dict] = []
asset_condition_type: str | None = None

assets: list[Asset | AssetAlias] = []

nested_expression: AssetAll | AssetAny | None = None

if isinstance(asset_condition, AssetAny):
asset_condition_type = "or-gate"

elif isinstance(asset_condition, AssetAll):
asset_condition_type = "and-gate"

if hasattr(asset_condition, "objects"):
for obj in asset_condition.objects:
if isinstance(obj, (AssetAll, AssetAny)):
nested_expression = obj
elif isinstance(obj, (Asset, AssetAlias)):
assets.append(obj)
asset_expression_type: str | None = None

# include assets, asset-alias, asset-name-refs, asset-uri-refs
assets_info: list[dict] = []

nested_expression: dict = {}

expr_key = ""
if asset_expression.keys() == {"any"}:
asset_expression_type = "or-gate"
expr_key = "any"
elif asset_expression.keys() == {"all"}:
asset_expression_type = "and-gate"
expr_key = "all"

if expr_key in asset_expression:
asset_exprs: list[dict] = asset_expression[expr_key]
for expr in asset_exprs:
nested_expr_key = next(iter(expr.keys()))
if nested_expr_key in ("any", "all"):
nested_expression = expr
elif nested_expr_key in ("asset", "alias", "asset-name-ref", "asset-uri-ref"):
asset_info = expr[nested_expr_key]
asset_info["type"] = nested_expr_key if nested_expr_key != "alias" else "asset-alias"

assets_info.append(asset_info)
else:
raise TypeError(f"Unsupported type: {type(obj)}")
raise TypeError(f"Unsupported type: {expr.keys()}")

if asset_condition_type and assets:
asset_condition_id = f"{asset_condition_type}-{level}"
if asset_expression_type and assets_info:
asset_condition_id = f"{asset_expression_type}-{level}"
edges.append(
{
"source_id": asset_condition_id,
Expand All @@ -66,22 +72,36 @@ def get_upstream_assets(
"id": asset_condition_id,
"label": asset_condition_id,
"type": "asset-condition",
"asset_condition_type": asset_condition_type,
"asset_condition_type": asset_expression_type,
}
)

for asset in assets:
for asset in assets_info:
asset_type = asset["type"]

if asset_type == "asset":
source_id = str(asset["id"])
label = asset["name"]
elif asset_type == "asset-alias" or asset_type == "asset-name-ref":
source_id = asset["name"]
label = asset["name"]
elif asset_type == "asset-uri-ref":
source_id = asset["uri"]
label = asset["uri"]
else:
raise TypeError(f"Unsupported type: {asset_type}")

edges.append(
{
"source_id": asset.name,
"source_id": source_id,
"target_id": asset_condition_id,
}
)
nodes.append(
{
"id": asset.name,
"label": asset.name,
"type": "asset-alias" if isinstance(asset, AssetAlias) else "asset",
"id": source_id,
"label": label,
"type": asset_type,
}
)

Expand Down
200 changes: 195 additions & 5 deletions airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,28 @@

import logging
import zlib
from collections.abc import Iterable
from collections.abc import Iterable, Iterator, Sequence
from datetime import timedelta
from typing import TYPE_CHECKING, Any

import sqlalchemy_jsonfield
import uuid6
from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select
from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select, tuple_
from sqlalchemy.orm import backref, foreign, relationship
from sqlalchemy.sql.expression import func, literal
from sqlalchemy_utils import UUIDType

from airflow.exceptions import TaskNotFound
from airflow.models.asset import (
AssetAliasModel,
AssetModel,
)
from airflow.models.base import ID_LEN, Base
from airflow.models.dag import DagModel
from airflow.models.dag_version import DagVersion
from airflow.models.dagcode import DagCode
from airflow.models.dagrun import DagRun
from airflow.sdk.definitions.asset import AssetUniqueKey
from airflow.serialization.dag_dependency import DagDependency
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.settings import COMPRESS_SERIALIZED_DAGS, MIN_SERIALIZED_DAG_UPDATE_INTERVAL, json
Expand All @@ -58,6 +63,187 @@
log = logging.getLogger(__name__)


class _DagDependenciesResolver:
"""Resolver that resolves dag dependencies to include asset id and assets link to asset aliases."""

def __init__(self, dag_id_dependencies: Sequence[tuple[str, dict]], session: Session) -> None:
self.dag_id_dependencies = dag_id_dependencies
self.session = session

self.asset_key_to_id: dict[AssetUniqueKey, int] = {}
self.asset_ref_name_to_asset_id_name: dict[str, tuple[int, str]] = {}
self.asset_ref_uri_to_asset_id_name: dict[str, tuple[int, str]] = {}
self.alias_names_to_asset_ids_names: dict[str, list[tuple[int, str]]] = {}

def resolve(self) -> dict[str, list[DagDependency]]:
asset_names_uris, asset_ref_names, asset_ref_uris, asset_alias_names = self.collect_asset_info()

self.asset_key_to_id = self.collect_asset_key_to_ids(asset_names_uris)
self.asset_ref_name_to_asset_id_name = self.collect_asset_name_ref_to_ids_names(asset_ref_names)
self.asset_ref_uri_to_asset_id_name = self.collect_asset_uri_ref_to_ids_names(asset_ref_uris)
self.alias_names_to_asset_ids_names = self.collect_alias_to_assets(asset_alias_names)

dag_depdendencies_by_dag: dict[str, list[DagDependency]] = {}
for dag_id, deps_data in self.dag_id_dependencies:
dag_deps: list[DagDependency] = []
for dep_data in deps_data or {}:
dep_type = dep_data["dependency_type"]
if dep_type == "asset":
dag_deps.append(self.resolve_asset_dag_dep(dep_data))
elif dep_type == "asset-name-ref":
dag_deps.extend(self.resolve_asset_name_ref_dag_dep(dep_data))
elif dep_type == "asset-uri-ref":
dag_deps.extend(self.resolve_asset_uri_ref_dag_dep(dep_data))
elif dep_type == "asset-alias":
dag_deps.extend(self.resolve_asset_alias_dag_dep(dep_data))
else:
# Replace asset_key with asset id if it's in source or target
for node_key in ("source", "target"):
if dep_data[node_key].startswith("asset:"):
unique_key = AssetUniqueKey.from_str(dep_data[node_key].split(":")[1])
asset_id = self.asset_key_to_id[unique_key]
dep_data[node_key] = f"asset:{asset_id}"
break

dag_deps.append(DagDependency(**dep_data))

dag_depdendencies_by_dag[dag_id] = dag_deps
return dag_depdendencies_by_dag

def collect_asset_info(self) -> tuple[set, set, set, set]:
asset_names_uris: set[tuple[str, str]] = set()
asset_ref_names: set[str] = set()
asset_ref_uris: set[str] = set()
asset_alias_names: set[str] = set()
for _, deps_data in self.dag_id_dependencies:
for dep_data in deps_data or {}:
dep_type = dep_data["dependency_type"]
dep_id = dep_data["dependency_id"]
if dep_type == "asset":
unique_key = AssetUniqueKey.from_str(dep_id)
asset_names_uris.add((unique_key.name, unique_key.uri))
elif dep_type == "asset-name-ref":
asset_ref_names.add(dep_id)
elif dep_type == "asset-uri-ref":
asset_ref_uris.add(dep_id)
elif dep_type == "asset-alias":
asset_alias_names.add(dep_id)
return asset_names_uris, asset_ref_names, asset_ref_uris, asset_alias_names

def collect_asset_key_to_ids(self, asset_name_uris: set[tuple[str, str]]) -> dict[AssetUniqueKey, int]:
return {
AssetUniqueKey(name=name, uri=uri): asset_id
for name, uri, asset_id in self.session.execute(
select(AssetModel.name, AssetModel.uri, AssetModel.id).where(
tuple_(AssetModel.name, AssetModel.uri).in_(asset_name_uris)
)
)
}

def collect_asset_name_ref_to_ids_names(self, asset_ref_names) -> dict[str, tuple[int, str]]:
return {
name: (asset_id, name)
for name, asset_id in self.session.execute(
select(AssetModel.name, AssetModel.id).where(
AssetModel.name.in_(asset_ref_names), AssetModel.active.has()
)
)
}

def collect_asset_uri_ref_to_ids_names(self, asset_ref_uris) -> dict[str, tuple[int, str]]:
return {
uri: (asset_id, name)
for uri, name, asset_id in self.session.execute(
select(AssetModel.uri, AssetModel.name, AssetModel.id).where(
AssetModel.uri.in_(asset_ref_uris), AssetModel.active.has()
)
)
}

def collect_alias_to_assets(self, asset_alias_names) -> dict[str, list[tuple[int, str]]]:
return {
aam.name: [(am.id, am.name) for am in aam.assets]
for aam in self.session.scalars(
select(AssetAliasModel).where(AssetAliasModel.name.in_(asset_alias_names))
)
}

def resolve_asset_dag_dep(self, dep_data: dict) -> DagDependency:
dep_id = dep_data["dependency_id"]
unique_key = AssetUniqueKey.from_str(dep_id)
dep_data["dependency_id"] = str(self.asset_key_to_id[unique_key])
return DagDependency(**dep_data)

def resolve_asset_name_ref_dag_dep(self, dep_data) -> Sequence[DagDependency]:
dep_id = dep_data["dependency_id"]
is_source_ref = dep_data["source"] == "asest-name-ref"
asset_id, asset_name = self.asset_ref_name_to_asset_id_name[dep_id]
return [
# asset
DagDependency(
source="asset" if is_source_ref else f"asset-name-ref:{dep_id}",
target=f"asset-name-ref:{dep_id}" if is_source_ref else "asset",
label=asset_name,
dependency_type="asset",
dependency_id=str(asset_id),
),
# asset ref
DagDependency(
source=f"asset:{asset_id}" if is_source_ref else dep_data["source"],
target=dep_data["target"] if is_source_ref else f"asset:{asset_id}",
label=dep_id,
dependency_type="asset-name-ref",
dependency_id=dep_id,
),
]

def resolve_asset_uri_ref_dag_dep(self, dep_data: dict) -> Sequence[DagDependency]:
dep_id = dep_data["dependency_id"]
is_source_ref = dep_data["source"] == "asest-uri-ref"
asset_id, asset_name = self.asset_ref_uri_to_asset_id_name[dep_id]
return [
# asset
DagDependency(
source="asset" if is_source_ref else f"asset-uri-ref:{dep_id}",
target=f"asset-uri-ref:{dep_id}" if is_source_ref else "asset",
label=asset_name,
dependency_type="asset",
dependency_id=str(asset_id),
),
# asset ref
DagDependency(
source=f"asset:{asset_id}" if is_source_ref else dep_data["source"],
target=dep_data["target"] if is_source_ref else f"asset:{asset_id}",
label=dep_id,
dependency_type="asset-uri-ref",
dependency_id=dep_id,
),
]

def resolve_asset_alias_dag_dep(self, dep_data: dict) -> Iterator[DagDependency]:
dep_id = dep_data["dependency_id"]
for asset_id, asset_name in self.alias_names_to_asset_ids_names[dep_id]:
is_source_alias = dep_data["source"] == "asset-alias"
yield from [
# asset
DagDependency(
source="asset" if is_source_alias else f"asset-alias:{dep_id}",
target=f"asset-alias:{dep_id}" if is_source_alias else "asset",
label=asset_name,
dependency_type="asset",
dependency_id=str(asset_id),
),
# asset alias
DagDependency(
source=f"asset:{asset_id}" if is_source_alias else dep_data["source"],
target=dep_data["target"] if is_source_alias else f"asset:{asset_id}",
label=dep_id,
dependency_type="asset-alias",
dependency_id=dep_id,
),
]


class SerializedDagModel(Base):
"""
A table for serialized DAGs.
Expand Down Expand Up @@ -477,7 +663,7 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[
.join(cls.dag_model)
.where(DagModel.is_active)
)
iterator = ((dag_id, json.loads(deps_data) if deps_data else []) for dag_id, deps_data in query)
iterator = [(dag_id, json.loads(deps_data) if deps_data else []) for dag_id, deps_data in query]
else:
iterator = session.execute(
select(cls.dag_id, func.json_extract_path(cls._data, "dag", "dag_dependencies"))
Expand All @@ -488,8 +674,12 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[
)
.join(cls.dag_model)
.where(DagModel.is_active)
)
return {dag_id: [DagDependency(**d) for d in (deps_data or [])] for dag_id, deps_data in iterator}
).all()

resolver = _DagDependenciesResolver(dag_id_dependencies=iterator, session=session)
dag_depdendencies_by_dag = resolver.resolve()

return dag_depdendencies_by_dag

@staticmethod
@provide_session
Expand Down
Loading