Skip to content

Conversation

@AmosG
Copy link

@AmosG AmosG commented Dec 7, 2025

Fix DAG processor crash on MySQL connection failure during import error recording

Fixes #59166

The DAG processor was crashing when MySQL connection failures occurred while
recording DAG import errors to the database. The root cause was missing
session.rollback() calls after caught exceptions, leaving the SQLAlchemy
session in an invalid state. When session.flush() was subsequently called,
it would raise a new exception that wasn't caught, causing the DAG processor
to crash and enter restart loops.

This issue was observed in production environments where the DAG processor
would restart 1,259 times in 4 days (~13 restarts/hour), leading to:

  • Connection pool exhaustion
  • Cascading failures across Airflow components
  • Import errors not being recorded in the UI
  • System instability

Changes

  • Add session.rollback() after caught exceptions in _update_import_errors()
  • Add session.rollback() after caught exceptions in _update_dag_warnings()
  • Wrap session.flush() in try-except with session.rollback() on failure
  • Add comprehensive unit tests for all failure scenarios
  • Update comments to clarify error handling behavior

Testing

Added 5 new unit tests in TestDagProcessorCrashFix class:

  • test_update_dag_parsing_results_handles_db_failure_gracefully
  • test_update_dag_parsing_results_handles_dag_warnings_db_failure_gracefully
  • test_update_dag_parsing_results_handles_session_flush_failure_gracefully
  • test_session_rollback_called_on_import_errors_failure
  • test_session_rollback_called_on_dag_warnings_failure

All tests pass and verify that:

  1. Database failures don't crash the DAG processor
  2. session.rollback() is called correctly on failures
  3. The processor continues gracefully after errors

Impact

The fix ensures the DAG processor gracefully handles database connection
failures and continues processing other DAGs instead of crashing, preventing
production outages from restart loops.

…or recording

The DAG processor was crashing when MySQL connection failures occurred while
recording DAG import errors to the database. The root cause was missing
session.rollback() calls after caught exceptions, leaving the SQLAlchemy
session in an invalid state. When session.flush() was subsequently called,
it would raise a new exception that wasn't caught, causing the DAG processor
to crash and enter restart loops.

This issue was observed in production environments where the DAG processor
would restart 1,259 times in 4 days (~13 restarts/hour), leading to:
- Connection pool exhaustion
- Cascading failures across Airflow components
- Import errors not being recorded in the UI
- System instability

Changes:
- Add session.rollback() after caught exceptions in _update_import_errors()
- Add session.rollback() after caught exceptions in _update_dag_warnings()
- Wrap session.flush() in try-except with session.rollback() on failure
- Add comprehensive unit tests for all failure scenarios
- Update comments to clarify error handling behavior

The fix ensures the DAG processor gracefully handles database connection
failures and continues processing other DAGs instead of crashing.
@potiuk potiuk added the backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch label Dec 8, 2025
@potiuk potiuk modified the milestone: Airflow 3.1.4 Dec 8, 2025
@potiuk
Copy link
Member

potiuk commented Dec 8, 2025

Thanks. Nice one.

@potiuk
Copy link
Member

potiuk commented Dec 8, 2025

From https://docs.sqlalchemy.org/en/20/orm/session_basics.html#flushing

When a failure occurs within a flush, in order to continue using that same Session, an explicit call to Session.rollback() is required after a flush fails, even though the underlying transaction will have been rolled back already (even if the database driver is technically in driver-level autocommit mode). This is so that the overall nesting pattern of so-called “subtransactions” is consistently maintained. The FAQ section “This Session’s transaction has been rolled back due to a previous exception during flush.” (or similar) contains a more detailed description of this behavior.

@wjddn279
Copy link
Contributor

Maybe, the root cause of MYSQL connection failure is #56879
Aside from import error recording, have you ever encountered any connection failures?

@AmosG
Copy link
Author

AmosG commented Dec 10, 2025

Maybe, the root cause of MYSQL connection failure is #56879 Aside from import error recording, have you ever encountered any connection failures?

totaly agree @wjddn279
Even more, the fix i suggested here kinda silence the issue you started dealing with cuz the inflight transactions rollback, so the connection close is handled in airflow in a way that crashs the service

@AmosG AmosG marked this pull request as draft December 10, 2025 07:52
@potiuk
Copy link
Member

potiuk commented Dec 10, 2025

Maybe, the root cause of MYSQL connection failure is #56879 Aside from import error recording, have you ever encountered any connection failures?

totaly agree @wjddn279 Even more, the fix i suggested here kinda silence the issue you started dealing with cuz the inflight transactions rollback, so the connection close is handled in airflow in a way that crashs the service

Yeah. Worth fixing it with gc freezing I think.

@john-rodriguez-mgni
Copy link

Do we know if the session.rollback() fix will be included in the upcoming 3.1.6 release?

@potiuk
Copy link
Member

potiuk commented Jan 21, 2026

Fix from #60505 merged - @john-rodriguez-mgni - you seem to be very eager to find out when things are released - I hope you will equally eagerly apply the test to your case and test the rcs when they are out (subscribe to devlist to find out).

@AmosG
Copy link
Author

AmosG commented Jan 27, 2026

so we are expecting this to be solved in 3.1.7
due to #60505

?

as i think i see this issue also on the api-server - causing endless loop unable to login

   |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3680, in get
    |     return self._get_impl(
    |            ^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3859, in _get_impl
    |     return db_load_fn(
    |            ^^^^^^^^^^^
    |     return connection._execute_clauseelement(
    |     conn = self._revalidate_connection()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 673, in _revalidate_connection
    |     self._invalid_transaction()
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 663, in _invalid_transaction
    |     raise exc.PendingRollbackError(
    | sqlalchemy.exc.PendingRollbackError: Can't reconnect until invalid transaction is rolled back.  Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1641, in _execute_clauseelement
    |     ret = self._execute_context(
    |           ^^^^^^^^^^^^^^^^^^^^^^
    |     conn = self._revalidate_connection()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 673, in _revalidate_connection
    |     self._invalid_transaction()
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 663, in _invalid_transaction
    |     raise exc.PendingRollbackError(
    | sqlalchemy.exc.PendingRollbackError: Can't reconnect until invalid transaction is rolled back.  Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)
    |     conn = self._revalidate_connection()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 673, in _revalidate_connection
    |     self._invalid_transaction()
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 663, in _invalid_transaction
    |     raise exc.PendingRollbackError(
    | sqlalchemy.exc.PendingRollbackError: Can't reconnect until invalid transaction is rolled back.  Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/loading.py", line 695, in load_on_pk_identity
    |     conn = self._revalidate_connection()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 673, in _revalidate_connection
    |     self._invalid_transaction()
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 663, in _invalid_transaction
    |     raise exc.PendingRollbackError(
    | sqlalchemy.exc.PendingRollbackError: Can't reconnect until invalid transaction is rolled back.  Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)
    |     session.execute(
    |     conn = self._revalidate_connection()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 673, in _revalidate_connection
    |     self._invalid_transaction()
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 663, in _invalid_transaction
    |     raise exc.PendingRollbackError(
    | sqlalchemy.exc.PendingRollbackError: Can't reconnect until invalid transaction is rolled back.  Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2351, in execute
    |     return self._execute_internal(
    |            ^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2249, in _execute_internal
    |     result: Result[Any] = compile_state_cls.orm_execute_statement(
    |                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/context.py", line 306, in orm_execute_statement
    |     result = conn.execute(
    |              ^^^^^^^^^^^^^
    |     conn = self._revalidate_connection()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 673, in _revalidate_connection
    |     self._invalid_transaction()
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 663, in _invalid_transaction
    |     raise exc.PendingRollbackError(
    | sqlalchemy.exc.PendingRollbackError: Can't reconnect until invalid transaction is rolled back.  Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1419, in execute
    |     return meth(
    |            ^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 527, in _execute_on_connection
    |     return connection._execute_clauseelement(
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |     conn = self._revalidate_connection()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 673, in _revalidate_connection
    |     self._invalid_transaction()
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 663, in _invalid_transaction
    |     raise exc.PendingRollbackError(
    | sqlalchemy.exc.PendingRollbackError: Can't reconnect until invalid transaction is rolled back.  Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1641, in _execute_clauseelement
    |     ret = self._execute_context(
    |           ^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1813, in _execute_context
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 527, in _execute_on_connection
    |     return connection._execute_clauseelement(
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1641, in _execute_clauseelement
    |     ret = self._execute_context(
    |           ^^^^^^^^^^^^^^^^^^^^^^
    |           ^^^^^^^^^^^^^^^^^^^^^^
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |     conn = self._revalidate_connection()
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1641, in _execute_clauseelement
    |     conn = self._revalidate_connection()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |     conn = self._revalidate_connection()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 673, in _revalidate_connection
    |     self._invalid_transaction()
    |     conn = self._revalidate_connection()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 673, in _revalidate_connection
    |     self._invalid_transaction()
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 663, in _invalid_transaction
    |     raise exc.PendingRollbackError(
    |     conn = self._revalidate_connection()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 673, in _revalidate_connection
    |     self._invalid_transaction()
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 663, in _invalid_transaction
    |     raise exc.PendingRollbackError(
    |     conn = self._revalidate_connection()
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 673, in _revalidate_connection
    |     self._invalid_transaction()
    |   File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 663, in _invalid_transaction
    |     raise exc.PendingRollbackError(
    | sqlalchemy.exc.PendingRollbackError: Can't reconnect until invalid transaction is rolled back.  Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)
    +------------------------------------
    ```

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:DAG-processing backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DAG Processor crashes on MySQL connection failure during import error recording

5 participants