Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1792,7 +1792,6 @@ def _start_queued_dagruns(self, session: Session) -> None:
"""Find DagRuns in queued state and decide moving them to running state."""
# added all() to save runtime, otherwise query is executed more than once
dag_runs: Collection[DagRun] = list(DagRun.get_queued_dag_runs_to_set_running(session))

query = (
select(
DagRun.dag_id,
Expand Down Expand Up @@ -1874,8 +1873,10 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun):
run_id,
)
continue
elif dag.max_active_runs:
if active_runs >= dag.max_active_runs:
elif dag_run.max_active_runs:
# Using dag_run.max_active_runs which links to DagModel to ensure we are checking
# against the most recent changes on the dag and not using stale serialized dag
if active_runs >= dag_run.max_active_runs:
# todo: delete all candidate dag runs for this dag from list right now
self.log.info(
"dag cannot be started due to dag max_active_runs constraint; "
Expand Down
99 changes: 99 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7125,6 +7125,105 @@ def test_schedule_dag_run_with_upstream_skip(dag_maker, session):
# dummy3 should be skipped because dummy1 is skipped.
assert tis[dummy3.task_id].state == State.SKIPPED

def test_start_queued_dagruns_uses_latest_max_active_runs_from_dag_model(self, dag_maker, session):
"""
Test that _start_queued_dagruns uses max_active_runs from DagModel (via dag_run)
instead of stale SerializedDAG max_active_runs.

This test verifies the fix where SerializedDAG may have stale max_active_runs,
but DagModel has the latest value updated by version changes(versioned bundles). The scheduler should
use the latest value from DagModel to respect user updates.
"""
# Create a DAG with max_active_runs=1 initially
with dag_maker(
dag_id="test_max_active_runs_stale_serialized",
max_active_runs=1,
session=session,
) as dag:
EmptyOperator(task_id="dummy_task")

dag_model = dag_maker.dag_model
assert dag_model.max_active_runs == 1

# Create a SerializedDAG (which will have max_active_runs=1)
# This simulates the SerializedDAG being created/updated from the DAG file
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)
self.job_runner._create_dag_runs([dag_model], session)

# Verify SerializedDAG has max_active_runs=1
dag_run_1 = (
session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.logical_date).first()
)
assert dag_run_1 is not None
serialized_dag = self.job_runner.scheduler_dag_bag.get_dag_for_run(dag_run_1, session=session)
assert serialized_dag is not None
assert serialized_dag.max_active_runs == 1

# Now update DagModel.max_active_runs to 2 (simulating a versioned bundle update)
# This is the latest value, but SerializedDAG still has the old value
dag_model.max_active_runs = 2
session.commit()
session.refresh(dag_model)

# Create 1 running dag run
dag_run_1.state = DagRunState.RUNNING
session.commit()

# Create 1 queued dag run
dag_run_2 = dag_maker.create_dagrun(
run_id="test_run_2",
state=DagRunState.QUEUED,
run_type=DagRunType.SCHEDULED,
session=session,
)

# Ensure dag_run_2 has the updated DagModel relationship loaded
# The association proxy dag_run.max_active_runs accesses dag_model.max_active_runs
# so we need to ensure the relationship is loaded
session.refresh(dag_run_2)

# Verify we have 1 running and 1 queued
running_count = (
session.query(DagRun)
.filter(DagRun.dag_id == dag.dag_id, DagRun.state == DagRunState.RUNNING)
.count()
)
queued_count = (
session.query(DagRun)
.filter(DagRun.dag_id == dag.dag_id, DagRun.state == DagRunState.QUEUED)
.count()
)
assert running_count == 1
assert queued_count == 1

# The SerializedDAG still has max_active_runs=1 (stale)
# But DagModel has max_active_runs=2 (latest)
assert serialized_dag.max_active_runs == 1
assert dag_model.max_active_runs == 2

# Call _start_queued_dagruns
# With the fix: Should start the queued run (using DagModel max_active_runs=2, active_runs=1 < 2)
# Without the fix: Would block the queued run (using SerializedDAG max_active_runs=1, active_runs=1 >= 1)
self.job_runner._start_queued_dagruns(session)
session.flush()

# Verify that the queued dag run started (proves it used DagModel.max_active_runs=2)
dag_run_2 = session.get(DagRun, dag_run_2.id)
assert dag_run_2.state == DagRunState.RUNNING, (
"The queued dag run should have started because DagModel.max_active_runs=2 "
"allows it (active_runs=1 < 2), even though SerializedDAG.max_active_runs=1 for that dagrun serdag version "
"would have blocked it."
)

# Verify we now have 2 running dag runs
running_count = (
session.query(DagRun)
.filter(DagRun.dag_id == dag.dag_id, DagRun.state == DagRunState.RUNNING)
.count()
)
assert running_count == 2


class TestSchedulerJobQueriesCount:
"""
Expand Down