[Bug] Run airflow dags test
with DatasetAlias
/Dataset
raises sqlalchemy.orm.exc.FlushError
#42495
Open
Description
Apache Airflow version
2.10.0, 2.10,1, 2.10.2
If "Other Airflow 2 version" selected, which one?
No response
What happened?
Given the DAG:
from datetime import datetime
from airflow import DAG, Dataset
from airflow.datasets import DatasetAlias
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context
ALIAS_NAME = "some-alias"
class CustomOperator(BaseOperator):
def __init__(self, *args, **kwargs):
kwargs["outlets"] = [DatasetAlias(name=ALIAS_NAME)]
super().__init__(*args, **kwargs)
def execute(self, context: Context):
new_outlets = [Dataset("something")]
for outlet in new_outlets:
context["outlet_events"][ALIAS_NAME].add(outlet)
with DAG("dataset_alias_dag", start_date=datetime(2023, 4, 20)) as dag:
do_something = CustomOperator(task_id="do_something")
do_something
When I try to run:
airflow dags test dataset_alias_dag `date -Iseconds`
I get the error:
[2024-09-26T11:46:37.012+0300] {dag.py:3060} ERROR - Task failed; ti=<TaskInstance: dataset_alias_dag.do_something manual__2024-09-26T11:46:32+03:00 [success]>
Traceback (most recent call last):
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/models/dag.py", line 3053, in test
_run_task(
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/models/dag.py", line 4357, in _run_task
ti._run_raw_task(session=session, raise_on_defer=inline_trigger, mark_success=mark_success)
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/utils/session.py", line 94, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 2995, in _run_raw_task
return _run_raw_task(
^^^^^^^^^^^^^^
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 363, in _run_raw_task
ti._register_dataset_changes(events=context["outlet_events"], session=session)
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3058, in _register_dataset_changes
dataset_manager.register_dataset_change(
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/api_internal/internal_api_call.py", line 139, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/utils/session.py", line 94, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/airflow/datasets/manager.py", line 145, in register_dataset_change
session.flush()
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
self._flush(objects)
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
with util.safe_reraise():
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
flush_context.execute()
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
rec.execute(self)
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 579, in execute
self.dependency_processor.process_saves(uow, states)
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/dependency.py", line 1136, in process_saves
if not self._synchronize(
^^^^^^^^^^^^^^^^^^
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/dependency.py", line 1252, in _synchronize
self._verify_canload(child)
File "/Users/tati/Library/Application Support/hatch/env/virtual/astronomer-cosmos/4VBJdS-x/tests.py3.11-2.10/lib/python3.11/site-packages/sqlalchemy/orm/dependency.py", line 257, in _verify_canload
raise exc.FlushError(
sqlalchemy.orm.exc.FlushError: Can't flush None value found in collection DatasetModel.aliases
This DAG successfully executes when not being triggered via the dags test
command.
What you think should happen instead?
I should be able to run dags test
for this DAG without seeing this error message.
How to reproduce
Already described.
Operating System
Any
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
It is not happening during deployment (tested in Astronomer, and it worked fine).
The issue happens when running the airflow dags test
command locally
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct