Skip to content

Run DAG in isolated session #29803

@dinigo

Description

@dinigo

Apache Airflow version

2.5.1

What happened

Trying the new airflow.models.DAG.test function to run e2e tests on a DAG in a pytest fashion I find there's no way to force to write to a different db other than the configured one.

This should create an alchemy session for an inmemory db, initialise the db and then use it for the test

@fixture(scope="session")
def airflow_db():
    # in-memory database
    engine = create_engine(f"sqlite://")
    with Session(engine) as db_session:
        initdb(session=db_session, load_connections=False)
        yield db_session


def test_dag_runs_default(airflow_db):
    dag.test(session=airflow_db)

However initdb never receives the engine from settings that has been initialised before. It uses the engine from settings instead of the engine from the session.
https://github.com/apache/airflow/blob/main/airflow/utils/db.py#L694-L695

  with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
        Base.metadata.create_all(settings.engine)
        Model.metadata.create_all(settings.engine)

Then _create_flask_session_tbl() reads again the database from the config (which might be the same as when settings was initialised or not) and creates all Airflow tables in a database different from the provided in the session again.

What you think should happen instead

The sql alchemy base, models and airflow tables should be created in the database provided by the session.

In case the session is injected then, this will match the config. But if a session is provided, it should use this session instead

How to reproduce

This inits the db specified in the config (defaults to ${HOME}/airflow/airflow.db), then the test tries to use the in-memory one and breaks

@fixture(scope="session")
def airflow_db():
    # in-memory database
    engine = create_engine(f"sqlite://")
    with Session(engine) as db_session:
        initdb(session=db_session, load_connections=False)
        yield db_session


def test_dag_runs_default(airflow_db):
    dag.test(session=airflow_db)

Operating System

MacOs

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions