Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import sqlalchemy_jsonfield
import uuid6
from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select, tuple_, update
from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, exists, select, tuple_, update
from sqlalchemy.orm import backref, foreign, relationship
from sqlalchemy.sql.expression import func, literal
from sqlalchemy_utils import UUIDType
Expand All @@ -43,6 +43,7 @@
from airflow.models.dag_version import DagVersion
from airflow.models.dagcode import DagCode
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.sdk.definitions.asset import AssetUniqueKey
from airflow.serialization.dag_dependency import DagDependency
from airflow.serialization.serialized_objects import LazyDeserializedDAG, SerializedDAG
Expand Down Expand Up @@ -423,7 +424,13 @@ def write_dag(
log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
return False

if dag_version and not dag_version.task_instances:
has_task_instances: bool = False
if dag_version:
has_task_instances = bool(
session.scalar(select(exists().where(TaskInstance.dag_version_id == dag_version.id)))
)

if dag_version and not has_task_instances:
# This is for dynamic DAGs that the hashes changes often. We should update
# the serialized dag, the dag_version and the dag_code instead of a new version
# if the dag_version is not associated with any task instances
Expand Down