Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9b0a1a2
Refactor deprecated SQLA models/test_serialized_dag.py
rich7420 Dec 15, 2025
ef4e7e1
Refactor deprecated SQLA models/test_pool.py
rich7420 Dec 15, 2025
33151c3
Refactor deprecated SQLA models/test_trigger.py
rich7420 Dec 15, 2025
1425f83
Refactor deprecated SQLA models/test_callback.py
rich7420 Dec 15, 2025
f95e80c
Refactor deprecated SQLA models/test_xcom.py
rich7420 Dec 15, 2025
2f1e8b7
Refactor deprecated SQLA models/test_cleartasks.py
rich7420 Dec 15, 2025
6416731
Refactor deprecated SQLA models/test_dagrun.py
rich7420 Dec 15, 2025
f78fec3
Refactor deprecated SQLA test_log_handlers.py
rich7420 Dec 15, 2025
bdc4d6a
fix error
rich7420 Dec 15, 2025
d98f35d
Refactor deprecated SQLA utils/test_state.py
rich7420 Dec 15, 2025
516b864
fix error
rich7420 Dec 15, 2025
7815f6e
Merge branch 'main' into sqla2-59402-10
rich7420 Dec 16, 2025
f5a879a
remove redundant parts
rich7420 Dec 16, 2025
75cd94a
Merge branch 'main' into sqla2-59402-7
rich7420 Dec 16, 2025
387c02b
Merge branch 'main' into sqla2-59402-7
rich7420 Dec 17, 2025
78ba885
change to where()
rich7420 Dec 17, 2025
f23255c
Merge branch 'sqla2-59-2' into multi-59402
rich7420 Dec 17, 2025
e02151f
Merge branch 'sqla2-59402-3' into multi-59402
rich7420 Dec 17, 2025
36b86ab
Merge branch 'sqla2-59402-4' into multi-59402
rich7420 Dec 17, 2025
fc66f2e
Merge branch 'sqla2-59402-5' into multi-59402
rich7420 Dec 17, 2025
f275664
Merge branch 'sqla2-59402-6' into multi-59402
rich7420 Dec 17, 2025
71dc13c
Merge branch 'sqla2-59402-7' into multi-59402
rich7420 Dec 17, 2025
eaad826
Merge branch 'sqla2-59402-9' into multi-59402
rich7420 Dec 17, 2025
ee62673
Merge branch 'sqla2-59402-10' into multi-59402
rich7420 Dec 17, 2025
44161a7
fix pre-commit error
rich7420 Dec 17, 2025
1e954bf
fix error
rich7420 Dec 17, 2025
e8330ca
remove redundant commit()
rich7420 Dec 18, 2025
f4fdea1
Merge branch 'main' into multi-59402
rich7420 Dec 18, 2025
24e5da0
Merge branch 'main' into multi-59402
rich7420 Dec 19, 2025
f2947bb
Merge branch 'main' into multi-59402
potiuk Dec 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,13 @@ repos:
^airflow-ctl.*\.py$|
^airflow-core/src/airflow/models/.*\.py$|
^airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py$|
^airflow-core/tests/unit/models/test_serialized_dag.py$|
^airflow-core/tests/unit/models/test_pool.py$|
^airflow-core/tests/unit/models/test_trigger.py$|
^airflow-core/tests/unit/models/test_callback.py$|
^airflow-core/tests/unit/models/test_cleartasks.py$|
^airflow-core/tests/unit/models/test_xcom.py$|
^airflow-core/tests/unit/models/test_dagrun.py$|
^airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_sources.py$|
^airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py$|
^airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py$|
Expand All @@ -444,6 +451,8 @@ repos:
^airflow-core/tests/unit/models/test_dagwarning.py$|
^airflow-core/tests/integration/otel/test_otel.py$|
^airflow-core/tests/unit/utils/test_db_cleanup.py$|
^airflow-core/tests/unit/utils/test_state.py$|
^airflow-core/tests/unit/utils/test_log_handlers.py$|
^dev/airflow_perf/scheduler_dag_execution_timing.py$|
^providers/celery/.*\.py$|
^providers/cncf/kubernetes/.*\.py$|
Expand Down
5 changes: 3 additions & 2 deletions airflow-core/tests/unit/models/test_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import pytest
from sqlalchemy import select

from airflow.models import Trigger
from airflow.models.callback import (
Expand Down Expand Up @@ -118,7 +119,7 @@ def test_polymorphic_serde(self, session):
session.add(callback)
session.commit()

retrieved = session.query(Callback).filter_by(id=callback.id).one()
retrieved = session.scalar(select(Callback).where(Callback.id == callback.id))
assert isinstance(retrieved, TriggererCallback)
assert retrieved.fetch_method == CallbackFetchMethod.IMPORT_PATH
assert retrieved.data == TEST_ASYNC_CALLBACK.serialize()
Expand Down Expand Up @@ -188,7 +189,7 @@ def test_polymorphic_serde(self, session):
session.add(callback)
session.commit()

retrieved = session.query(Callback).filter_by(id=callback.id).one()
retrieved = session.scalar(select(Callback).where(Callback.id == callback.id))
assert isinstance(retrieved, ExecutorCallback)
assert retrieved.fetch_method == CallbackFetchMethod.IMPORT_PATH
assert retrieved.data == TEST_SYNC_CALLBACK.serialize()
Expand Down
37 changes: 18 additions & 19 deletions airflow-core/tests/unit/models/test_cleartasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import random

import pytest
from sqlalchemy import select
from sqlalchemy import func, select

from airflow.models.dag_version import DagVersion
from airflow.models.dagrun import DagRun
Expand Down Expand Up @@ -87,7 +87,7 @@ def test_clear_task_instances(self, dag_maker):
# this is equivalent to topological sort. It would not work in general case
# but it works for our case because we specifically constructed test DAGS
# in the way that those two sort methods are equivalent
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
qry = session.scalars(select(TI).where(TI.dag_id == dag.dag_id).order_by(TI.task_id)).all()
clear_task_instances(qry, session)

ti0.refresh_from_db(session)
Expand Down Expand Up @@ -121,7 +121,7 @@ def test_clear_task_instances_external_executor_id(self, dag_maker):
# this is equivalent to topological sort. It would not work in general case
# but it works for our case because we specifically constructed test DAGS
# in the way that those two sort methods are equivalent
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
qry = session.scalars(select(TI).where(TI.dag_id == dag.dag_id).order_by(TI.task_id)).all()
clear_task_instances(qry, session)

ti0.refresh_from_db()
Expand Down Expand Up @@ -186,12 +186,12 @@ def test_clear_task_instances_dr_state(self, state, last_scheduling, dag_maker):
# this is equivalent to topological sort. It would not work in general case
# but it works for our case because we specifically constructed test DAGS
# in the way that those two sort methods are equivalent
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
assert session.query(TaskInstanceHistory).count() == 0
qry = session.scalars(select(TI).where(TI.dag_id == dag.dag_id).order_by(TI.task_id)).all()
assert session.scalar(select(func.count()).select_from(TaskInstanceHistory)) == 0
clear_task_instances(qry, session, dag_run_state=state)
session.flush()
# 2 TIs were cleared so 2 history records should be created
assert session.query(TaskInstanceHistory).count() == 2
assert session.scalar(select(func.count()).select_from(TaskInstanceHistory)) == 2

session.refresh(dr)

Expand Down Expand Up @@ -229,7 +229,7 @@ def test_clear_task_instances_on_running_dr(self, state, dag_maker):
# this is equivalent to topological sort. It would not work in general case
# but it works for our case because we specifically constructed test DAGS
# in the way that those two sort methods are equivalent
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
qry = session.scalars(select(TI).where(TI.dag_id == dag.dag_id).order_by(TI.task_id)).all()
clear_task_instances(qry, session)
session.flush()

Expand Down Expand Up @@ -282,7 +282,7 @@ def test_clear_task_instances_on_finished_dr(self, state, last_scheduling, dag_m
# this is equivalent to topological sort. It would not work in general case
# but it works for our case because we specifically constructed test DAGS
# in the way that those two sort methods are equivalent
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
qry = session.scalars(select(TI).where(TI.dag_id == dag.dag_id).order_by(TI.task_id)).all()
clear_task_instances(qry, session)
session.flush()

Expand Down Expand Up @@ -394,7 +394,7 @@ def test_clear_task_instances_without_dag_param(self, dag_maker, session):
# this is equivalent to topological sort. It would not work in general case
# but it works for our case because we specifically constructed test DAGS
# in the way that those two sort methods are equivalent
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
qry = session.scalars(select(TI).where(TI.dag_id == dag.dag_id).order_by(TI.task_id)).all()
clear_task_instances(qry, session)

ti0.refresh_from_db(session=session)
Expand Down Expand Up @@ -477,20 +477,19 @@ def test_clear_task_instances_with_task_reschedule(self, dag_maker):
with create_session() as session:

def count_task_reschedule(ti):
return session.query(TaskReschedule).filter(TaskReschedule.ti_id == ti.id).count()
return session.scalar(
select(func.count()).select_from(TaskReschedule).where(TaskReschedule.ti_id == ti.id)
)

assert count_task_reschedule(ti0) == 1
assert count_task_reschedule(ti1) == 1
# we use order_by(task_id) here because for the test DAG structure of ours
# this is equivalent to topological sort. It would not work in general case
# but it works for our case because we specifically constructed test DAGS
# in the way that those two sort methods are equivalent
qry = (
session.query(TI)
.filter(TI.dag_id == dag.dag_id, TI.task_id == ti0.task_id)
.order_by(TI.task_id)
.all()
)
qry = session.scalars(
select(TI).where(TI.dag_id == dag.dag_id, TI.task_id == ti0.task_id).order_by(TI.task_id)
).all()
clear_task_instances(qry, session)
assert count_task_reschedule(ti0) == 0
assert count_task_reschedule(ti1) == 1
Expand Down Expand Up @@ -531,7 +530,7 @@ def test_task_instance_history_record(self, state, state_recorded, dag_maker):
ti1.state = state
session = dag_maker.session
session.flush()
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
qry = session.scalars(select(TI).where(TI.dag_id == dag.dag_id).order_by(TI.task_id)).all()
clear_task_instances(qry, session)
session.flush()

Expand Down Expand Up @@ -716,10 +715,10 @@ def test_clear_task_instances_with_run_on_latest_version(self, run_on_latest_ver
new_dag_version = DagVersion.get_latest_version(dag.dag_id)

assert old_dag_version.id != new_dag_version.id
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
qry = session.scalars(select(TI).where(TI.dag_id == dag.dag_id).order_by(TI.task_id)).all()
clear_task_instances(qry, session, run_on_latest_version=run_on_latest_version)
session.commit()
dr = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).one()
dr = session.scalar(select(DagRun).where(DagRun.dag_id == dag.dag_id))
if run_on_latest_version:
assert dr.created_dag_version_id == new_dag_version.id
assert dr.bundle_version == new_dag_version.bundle_version
Expand Down
Loading
Loading