-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.7.3
What happened?
My team is working on upgrading an old system to the latest stable version of Airflow. We are currently stuck on the 2.7.3 upgrade. The first time a test is run in 2.7.3 that calls airflow/modles/dag.py _get_or_create_dagrun, it passes. All subsequent test runs fail with the following error:
sqlalchemy.exc.NoReferencedTableError: Foreign key associated with column 'dag_run_note.user_id' could not find table 'ab_user' with which to generate a foreign key to target column 'id'
What you think should happen instead?
The error occurs when running a dag with the same dag id and execution date more than once. This happens in our integration tests. The if dr: block below is run and the error occurs when session.commit() is called.
def _get_or_create_dagrun(...) -> DagRun:
dr: DagRun = session.scalar(
select(DagRun).where(DagRun.dag_id == dag.dag_id, DagRun.execution_date == execution_date)
)
if dr:
session.delete(dr)
session.commit() # this line
dr = ...
I believe that rerunning a dag id on an execution date should work or should raise an error based on overwriting a previous dag run.
How to reproduce
In our system this is reproducible in test on any dag with a hard coded execution date. Here is a sample setup. I can fill out classes if it is helpful.
dags/my_dag.py
def my_dag(dag_config: MyConfig) -> DAG:
with DAG(...) as dag:
my_operator = MyOperator(...)
other_operator = ...
return dag
test/integration/test_my_dag.py
# test setup
# ...
DAG_RUN_DATE = datetime(
year=2024,
month=4,
day=26,
hour=3,
minute=00,
tzinfo=pendulum.timezone("America/New_York")
dag_config = MyConfig(
dag_id="my_test_dag",
...
)
dag = my_dag(dag_config)
run_dag(dag, DAG_RUN_DATE)
# test assertions
# ...
test/helpers.py
from airflow.utils.session import provide_session
@provide_session
def run_dag(dag, date, session=None):
dag.test(
execution_date=date,
)
Operating System
CentOS 7
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==8.10.0
apache-airflow-providers-celery==3.4.1
apache-airflow-providers-cncf-kubernetes==7.8.0
apache-airflow-providers-common-sql==1.8.0
apache-airflow-providers-datadog==3.4.0
apache-airflow-providers-docker==3.8.0
apache-airflow-providers-ftp==3.6.0
apache-airflow-providers-hashicorp==3.5.0
apache-airflow-providers-http==4.6.0
apache-airflow-providers-imap==3.4.0
apache-airflow-providers-jenkins==3.4.0
apache-airflow-providers-microsoft-azure==8.1.0
apache-airflow-providers-opsgenie==5.2.0
apache-airflow-providers-postgres==5.7.1
apache-airflow-providers-redis==3.4.0
apache-airflow-providers-slack==8.3.0
apache-airflow-providers-sqlite==3.5.0
apache-airflow-providers-ssh==3.8.1
Deployment
Docker-Compose
Deployment details
No response
Anything else?
I have found a number of issues on this github page that target similar loading problems with the ab_user table. None of them mention running a dag twice as part of the reproduction steps.
- Quick Start documentation issue #34191
- sqlalchemy error when running CLI command
airflow tasks test#34109 - Clean Postgres database on Python3.10 gives foreign key error #34859
We wrote a patch to fix the issue. This patch will get us through the Airflow 2.7.3 upgrade so we can continue upgrading. We don't understand why there aren't other people with the same problem.
def patched_get_or_create_dagrun(...) -> DagRun:
# CHANGES
from airflow.auth.managers.fab.models import User
from sqlalchemy import select
# END OF CHANGES
dr: DagRun = session.scalar(
select(DagRun).where(DagRun.dag_id == dag.dag_id, DagRun.execution_date == execution_date)
)
if dr:
session.delete(dr)
session.commit() # this line
...
import airflow.models.dag as af_dag
af_dag._get_or_create_dagrun = patched_get_or_create_dagrun
Our integration tests generally upload data to an internal s3 analog and use the file path to partition the data based on the date. Making this system dynamic would be a pretty big rewrite so we are looking for options. Is there a standard practice for airflow integration testing with hard coded dates that we have missed? Are we doing something out of the ordinary for airflow?
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct