Skip to content

AirflowException Crashing the Scheduler During the scheduling loop (_verify_integrity_if_dag_changed) #27622

@ramziyassine

Description

@ramziyassine

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

Deployment

  • Airflow Version: 2.4.1
  • Infrastructure: AWS ECS
  • Number of DAG: 162
Version: [v2.4.1](https://pypi.python.org/pypi/apache-airflow/2.4.1)
Git Version: .release:2.4.1+7b979def75923ba28dd64e31e613043d29f34fce

The issue

We have seen this issue when the Scheduler is trying to schedule too many DAG (140+) around the same time

[2022-11-11T00:15:00.311+0000] {{dagbag.py:196}} WARNING - Serialized DAG mongodb-assistedbreakdown-jobs-processes no longer exists
[2022-11-11T00:15:00.312+0000] {{scheduler_job.py:763}} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 746, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 866, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 948, in _do_scheduling
    callback_to_run = self._schedule_dag_run(dag_run, session)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1292, in _schedule_dag_run
    self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1321, in _verify_integrity_if_dag_changed
    dag_run.verify_integrity(session=session)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 874, in verify_integrity
    dag = self.get_dag()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 484, in get_dag
    raise AirflowException(f"The DAG (.dag) for {self} needs to be set")
airflow.exceptions.AirflowException: The DAG (.dag) for <DagRun mongodb-assistedbreakdown-jobs-processes @ 2022-11-10 00:10:00+00:00: scheduled__2022-11-10T00:10:00+00:00, state:running, queued_at: 2022-11-11 00:10:09.363852+00:00. externally triggered: False> needs to be set

Main Cause

raise AirflowException(f"The DAG (.dag) for {self} needs to be set")

We believe this is happening here, airflow github

We saw a large amount of Connection hitting our airflow Database, but CPU was around 60%. Is there any workaround or configuration that can help the scheduler not crash when this happen?

What you think should happen instead

Can the scheduler be safe, or when it come back to reschedule the dags that got stuck

How to reproduce

No response

Operating System

Amazon Linux 2, Fargate deployment using the airflow Image

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

AWS ECS Fargate

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

affected_version:2.4Issues Reported for 2.4area:Schedulerincluding HA (high availability) schedulerkind:bugThis is a clearly a bug

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions