Skip to content

Rerunning a dag in Airflow 2.7.3 causes a missing table issue #41894

@jeremiahishere

Description

@jeremiahishere

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.7.3

What happened?

My team is working on upgrading an old system to the latest stable version of Airflow. We are currently stuck on the 2.7.3 upgrade. The first time a test is run in 2.7.3 that calls airflow/modles/dag.py _get_or_create_dagrun, it passes. All subsequent test runs fail with the following error:

sqlalchemy.exc.NoReferencedTableError: Foreign key associated with column 'dag_run_note.user_id' could not find table 'ab_user' with which to generate a foreign key to target column 'id'

What you think should happen instead?

The error occurs when running a dag with the same dag id and execution date more than once. This happens in our integration tests. The if dr: block below is run and the error occurs when session.commit() is called.

def _get_or_create_dagrun(...) -> DagRun:
    dr: DagRun = session.scalar(
        select(DagRun).where(DagRun.dag_id == dag.dag_id, DagRun.execution_date == execution_date)
    )
    if dr:
        session.delete(dr)
        session.commit() # this line
    dr = ...

I believe that rerunning a dag id on an execution date should work or should raise an error based on overwriting a previous dag run.

How to reproduce

In our system this is reproducible in test on any dag with a hard coded execution date. Here is a sample setup. I can fill out classes if it is helpful.

dags/my_dag.py

def my_dag(dag_config: MyConfig) -> DAG:
    with DAG(...) as dag:
        my_operator = MyOperator(...)

        other_operator = ...

        return dag

test/integration/test_my_dag.py

# test setup
# ...

DAG_RUN_DATE = datetime(
    year=2024,
    month=4,
    day=26,
    hour=3,
    minute=00,
    tzinfo=pendulum.timezone("America/New_York")

dag_config = MyConfig(
  dag_id="my_test_dag",
  ...
)

dag = my_dag(dag_config)
run_dag(dag, DAG_RUN_DATE)

# test assertions
# ...

test/helpers.py

from airflow.utils.session import provide_session
@provide_session
def run_dag(dag, date, session=None):
    dag.test(
        execution_date=date,
    )

Operating System

CentOS 7

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.10.0
apache-airflow-providers-celery==3.4.1
apache-airflow-providers-cncf-kubernetes==7.8.0
apache-airflow-providers-common-sql==1.8.0
apache-airflow-providers-datadog==3.4.0
apache-airflow-providers-docker==3.8.0
apache-airflow-providers-ftp==3.6.0
apache-airflow-providers-hashicorp==3.5.0
apache-airflow-providers-http==4.6.0
apache-airflow-providers-imap==3.4.0
apache-airflow-providers-jenkins==3.4.0
apache-airflow-providers-microsoft-azure==8.1.0
apache-airflow-providers-opsgenie==5.2.0
apache-airflow-providers-postgres==5.7.1
apache-airflow-providers-redis==3.4.0
apache-airflow-providers-slack==8.3.0
apache-airflow-providers-sqlite==3.5.0
apache-airflow-providers-ssh==3.8.1

Deployment

Docker-Compose

Deployment details

No response

Anything else?

I have found a number of issues on this github page that target similar loading problems with the ab_user table. None of them mention running a dag twice as part of the reproduction steps.

We wrote a patch to fix the issue. This patch will get us through the Airflow 2.7.3 upgrade so we can continue upgrading. We don't understand why there aren't other people with the same problem.

def patched_get_or_create_dagrun(...) -> DagRun:
    # CHANGES
    from airflow.auth.managers.fab.models import User
    from sqlalchemy import select
    # END OF CHANGES

    dr: DagRun = session.scalar(
        select(DagRun).where(DagRun.dag_id == dag.dag_id, DagRun.execution_date == execution_date)
    )
    if dr:
        session.delete(dr)
        session.commit() # this line
    ...

import airflow.models.dag as af_dag
af_dag._get_or_create_dagrun = patched_get_or_create_dagrun

Our integration tests generally upload data to an internal s3 analog and use the file path to partition the data based on the date. Making this system dynamic would be a pretty big rewrite so we are looking for options. Is there a standard practice for airflow integration testing with hard coded dates that we have missed? Are we doing something out of the ordinary for airflow?

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions