Skip to content

[Bug] Run airflow dags test with DatasetAlias/Dataset raises sqlalchemy.orm.exc.FlushError #42495

Open
@tatiana

Description

Apache Airflow version

2.10.0, 2.10,1, 2.10.2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Given the DAG:

from datetime import datetime

from airflow import DAG, Dataset
from airflow.datasets import DatasetAlias
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context


ALIAS_NAME = "some-alias"


class CustomOperator(BaseOperator):

    def __init__(self, *args, **kwargs):
        kwargs["outlets"] = [DatasetAlias(name=ALIAS_NAME)]
        super().__init__(*args, **kwargs)

    def execute(self, context: Context):
        new_outlets = [Dataset("something")]
        for outlet in new_outlets:
            context["outlet_events"][ALIAS_NAME].add(outlet)


with DAG("dataset_alias_dag", start_date=datetime(2023, 4, 20)) as dag:
    do_something = CustomOperator(task_id="do_something")
    do_something

When I try to run:

airflow dags test dataset_alias_dag  `date -Iseconds`

I get the error:

[2024-09-26T11:46:37.012+0300] {dag.py:3060} ERROR - Task failed; ti=<TaskInstance: dataset_alias_dag.do_something manual__2024-09-26T11:46:32+03:00 [success]>
Traceback (most recent call last):
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/models/dag.py", line 3053, in test
    _run_task(
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/models/dag.py", line 4357, in _run_task
    ti._run_raw_task(session=session, raise_on_defer=inline_trigger, mark_success=mark_success)
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/utils/session.py", line 94, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2995, in _run_raw_task
    return _run_raw_task(
           ^^^^^^^^^^^^^^
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 363, in _run_raw_task
    ti._register_dataset_changes(events=context["outlet_events"], session=session)
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3058, in _register_dataset_changes
    dataset_manager.register_dataset_change(
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/api_internal/internal_api_call.py", line 139, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/utils/session.py", line 94, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/datasets/manager.py", line 145, in register_dataset_change
    session.flush()
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 579, in execute
    self.dependency_processor.process_saves(uow, states)
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/dependency.py", line 1136, in process_saves
    if not self._synchronize(
           ^^^^^^^^^^^^^^^^^^
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/dependency.py", line 1252, in _synchronize
    self._verify_canload(child)
  File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/dependency.py", line 257, in _verify_canload
    raise exc.FlushError(
sqlalchemy.orm.exc.FlushError: Can't flush None value found in collection DatasetModel.aliases

This DAG successfully executes when not being triggered via the dags test command.

What you think should happen instead?

I should be able to run dags test for this DAG without seeing this error message.

How to reproduce

Already described.

Operating System

Any

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

It is not happening during deployment (tested in Astronomer, and it worked fine).
The issue happens when running the airflow dags test command locally

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Assignees

No one assigned

    Labels

    area:corearea:datasetsIssues related to the datasets featurekind:bugThis is a clearly a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions