Skip to content

Flaky dask backfill test in quarantine #32778

@potiuk

Description

@potiuk

Body

We have recently started to observe a very flaky

tests/executors/test_dask_executor.py::TestDaskExecutor::test_backfill_integration test - especially Python 3.8 + postgres 3.11 combo seems to trigger it easily -but not always.

Example of failure here:

https://github.com/apache/airflow/actions/runs/5632434844/job/15260418883?pr=32776

Example errors:

E       psycopg2.errors.DeadlockDetected: deadlock detected
E       DETAIL:  Process 604 waits for ShareLock on transaction 7154; blocked by process 690.
E       Process 690 waits for ShareLock on transaction 7152; blocked by process 604.
E       HINT:  See server log for query details.
E       CONTEXT:  while updating tuple (2,204) in relation "dag_run"

Details:

self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0x7fd29cc25880>
cursor = <cursor object at 0x7fd29c8589a0; closed: -1>
statement = 'UPDATE dag_run SET last_scheduling_decision=%(last_scheduling_decision)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s'
parameters = {'dag_run_id': 23, 'last_scheduling_decision': None, 'updated_at': datetime.datetime(2023, 7, 22, 19, 58, 26, 211427, tzinfo=Timezone('UTC'))}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0x7fd27524c9a0>
airflow/jobs/backfill_job_runner.py:914: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
airflow/utils/session.py:74: in wrapper
    return func(*args, **kwargs)
airflow/jobs/backfill_job_runner.py:802: in _execute_dagruns
    processed_dag_run_dates = self._process_backfill_task_instances(
airflow/jobs/backfill_job_runner.py:645: in _process_backfill_task_instances
    session.commit()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1454: in commit
    self._transaction.commit(_to_root=self.future)
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:832: in commit
    self._prepare_impl()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:811: in _prepare_impl
    self.session.flush()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3449: in flush
    self._flush(objects)
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3589: in _flush
    transaction.rollback(_capture_exception=True)
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:70: in __exit__
    compat.raise_(
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_
    raise exception
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3549: in _flush
    flush_context.execute()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:456: in execute
    rec.execute(self)
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:630: in execute
    util.preloaded.orm_persistence.save_obj(
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:237: in save_obj
    _emit_update_statements(
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:1001: in _emit_update_statements
    c = connection._execute_20(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1710: in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py:334: in _execute_on_connection
    return connection._execute_clauseelement(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1577: in _execute_clauseelement
    ret = self._execute_context(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1953: in _execute_context
    self._handle_dbapi_exception(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:2134: in _handle_dbapi_exception
    util.raise_(
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_
    raise exception
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1910: in _execute_context
    self.dialect.do_execute(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

Eventually failing

E                       sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.DeadlockDetected) deadlock detected
E                       DETAIL:  Process 604 waits for ShareLock on transaction 7154; blocked by process 690.
E                       Process 690 waits for ShareLock on transaction 7152; blocked by process 604.
E                       HINT:  See server log for query details.
E                       CONTEXT:  while updating tuple (2,204) in relation "dag_run"
E                       
E                       [SQL: UPDATE dag_run SET last_scheduling_decision=%(last_scheduling_decision)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s]
E                       [parameters: {'last_scheduling_decision': None, 'updated_at': datetime.datetime(2023, 7, 22, 19, 58, 26, 211427, tzinfo=Timezone('UTC')), 'dag_run_id': 23}]
E                       (Background on this error at: https://sqlalche.me/e/14/e3q8) (Background on this error at: https://sqlalche.me/e/14/7s2a)

Would be great to track it down.

Committer

  • I acknowledge that I am a maintainer/committer of the Apache Airflow project.

Metadata

Metadata

Assignees

No one assigned

    Labels

    QuarantineIssues that are occasionally failing and are quarantinedaffected_version:main_branchIssues Reported for main brancharea:Schedulerincluding HA (high availability) schedulerarea:backfillSpecifically for backfill relatedarea:corekind:metaHigh-level information important to the communitypriority:highHigh priority bug that should be patched quickly but does not require immediate new release

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions