Skip to content

Commit

Permalink
Fix query and add created_at to dag_version table
Browse files Browse the repository at this point in the history
  • Loading branch information
ephraimbuddy committed Oct 18, 2024
1 parent 1664adc commit 55e1257
Show file tree
Hide file tree
Showing 6 changed files with 964 additions and 935 deletions.
3 changes: 3 additions & 0 deletions airflow/migrations/versions/0039_3_0_0_add_dag_versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

from airflow.migrations.db_types import StringID
from airflow.models.base import naming_convention
from airflow.utils import timezone
from airflow.utils.sqlalchemy import UtcDateTime

# revision identifiers, used by Alembic.
revision = "2b47dc6bc8df"
Expand All @@ -54,6 +56,7 @@ def upgrade():
sa.Column("dag_id", StringID(), nullable=False),
sa.Column("dag_code_id", sa.Integer(), nullable=True),
sa.Column("serialized_dag_id", sa.Integer(), nullable=True),
sa.Column("created_at", UtcDateTime(), nullable=False, default=timezone.utcnow),
sa.ForeignKeyConstraint(("dag_id",), ["dag.dag_id"], name=op.f("dag_version_dag_id_fkey")),
sa.PrimaryKeyConstraint("id", name=op.f("dag_version_pkey")),
)
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3103,9 +3103,9 @@ def dag_ready(dag_id: str, cond: BaseAsset, statuses: dict) -> bool | None:
dag_statuses = {}
for dag_id, records in by_dag.items():
dag_statuses[dag_id] = {x.dataset.uri: True for x in records}
ser_dags = session.scalars(
select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys()))
).all()
ser_dags = SerializedDagModel.get_latest_serdags_of_given_dags(
list(dag_statuses.keys()), session=session
)
for ser_dag in ser_dags:
dag_id = ser_dag.dag_id
statuses = dag_statuses[dag_id]
Expand Down
5 changes: 4 additions & 1 deletion airflow/models/dag_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
from sqlalchemy.orm import relationship

from airflow.models.base import Base, StringID
from airflow.utils import timezone
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime

if TYPE_CHECKING:
from sqlalchemy.orm import Session
Expand All @@ -50,6 +52,7 @@ class DagVersion(Base):
serialized_dag = relationship("SerializedDagModel", back_populates="dag_version", uselist=False)
dag_runs = relationship("DagRun", back_populates="dag_version")
task_instances = relationship("TaskInstance", back_populates="dag_version")
created_at = Column(UtcDateTime, default=timezone.utcnow)

def __init__(
self,
Expand Down Expand Up @@ -96,7 +99,7 @@ def write_dag(
):
"""Write a new DagVersion into database."""
existing_dag_version = session.scalar(
select(cls).where(cls.dag_id == dag_id).order_by(cls.version_number.desc()).limit(1)
select(cls).where(cls.dag_id == dag_id).order_by(cls.created_at.desc()).limit(1)
)
version_number = 1

Expand Down
19 changes: 19 additions & 0 deletions airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,25 @@ def write_dag(
def latest_item_select_object(cls, dag_id):
return select(cls).where(cls.dag_id == dag_id).order_by(cls.id.desc()).limit(1)

@classmethod
@provide_session
def get_latest_serdags_of_given_dags(cls, dag_ids: list[str], session: Session = NEW_SESSION):
latest_serialized_dag_subquery = (
session.query(cls.dag_id, func.max(cls.id).label("max_id"))
.filter(cls.dag_id.in_(dag_ids))
.group_by(cls.dag_id)
.subquery()
)
serialized_dags = session.scalars(
select(cls)
.join(
latest_serialized_dag_subquery,
(cls.id == latest_serialized_dag_subquery.c.max_id),
)
.where(cls.dag_id.in_(dag_ids))
).all()
return serialized_dags

@classmethod
@provide_session
def read_all_dags(cls, session: Session = NEW_SESSION) -> dict[str, SerializedDAG]:
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
cfd8d2c4c55ec368afcf35aba9b8b86d46820e1e7c63507fda2b8aebaa05f88b
9b93baf975587d81480850dab7cc4f33e3a9db9d707c57ce1835d0d7ad8d9c74
Loading

0 comments on commit 55e1257

Please sign in to comment.