Title: Error When Using Multiple DataSets as Triggers in Apache AirFlow #36837
Open
Description
Apache Airflow version
2.8.0
If "Other Airflow 2 version" selected, which one?
No response
What happened?
When using two or more DataSets as triggers, an error occurs while accessing the source_task_instance
object of DatasetEvent
.
What you think should happen instead?
It should be possible to access all fields of every DatasetEvent
.
How to reproduce
Create two test DAGs (producer and consumer) and link them with two DataSets.
Code:
from __future__ import annotations
from datetime import datetime
from airflow.datasets import Dataset
from airflow.decorators import task, dag
testDataSet = Dataset('/test')
testDataSet2 = Dataset('/test2')
@dag(
dag_id='dag_producer',
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
)
def dag_producer():
@task(outlets=[testDataSet, testDataSet2])
def test():
return 'OK'
test()
@dag(
dag_id="dag_consumer",
schedule=[testDataSet, testDataSet2],
start_date=datetime(2021, 1, 1),
catchup=False,
)
def dag_consumer():
@task
def print_triggering_dataset_events(triggering_dataset_events=None):
for dataset, dataset_list in triggering_dataset_events.items():
for dataset_event in dataset_list:
print('Task result: ', dataset_event.source_task_instance.xcom_pull(task_ids='test'))
print_triggering_dataset_events()
dag_consumer()
dag_producer()
Operating System
Debian GNU/Linux 12 (bookworm)
Versions of Apache Airflow Providers
airflow@4e6c155ede93:/opt/airflow$ pip freeze | grep provider
apache-airflow-providers-amazon==8.13.0
apache-airflow-providers-celery==3.5.0
apache-airflow-providers-cncf-kubernetes==7.11.0
apache-airflow-providers-common-io==1.1.0
apache-airflow-providers-common-sql==1.9.0
apache-airflow-providers-docker==3.8.2
apache-airflow-providers-elasticsearch==5.3.0
apache-airflow-providers-ftp==3.7.0
apache-airflow-providers-google==10.12.0
apache-airflow-providers-grpc==3.4.0
apache-airflow-providers-hashicorp==3.6.0
apache-airflow-providers-http==4.8.0
apache-airflow-providers-imap==3.5.0
apache-airflow-providers-microsoft-azure==8.4.0
apache-airflow-providers-mysql==5.5.0
apache-airflow-providers-odbc==4.2.0
apache-airflow-providers-openlineage==1.3.0
apache-airflow-providers-postgres==5.9.0
apache-airflow-providers-redis==3.5.0
apache-airflow-providers-sendgrid==3.4.0
apache-airflow-providers-sftp==4.8.0
apache-airflow-providers-slack==8.5.0
apache-airflow-providers-snowflake==5.2.0
apache-airflow-providers-sqlite==3.6.0
apache-airflow-providers-ssh==3.9.0
Deployment
Docker-Compose
Deployment details
No response
Anything else?
Error in log:
4e6c155ede93
*** Found local files:
*** * /opt/airflow/logs/dag_id=dag_consumer/run_id=dataset_triggered__2024-01-17T09:50:59.904054+00:004e6c155ede93
*** Found local files:
*** * /opt/airflow/logs/dag_id=dag_consumer/run_id=dataset_triggered__2024-01-17T10:15:09.140192+00:00/task_id=print_triggering_dataset_events/attempt=1.log
[2024-01-17, 18:15:10 +08] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dag_consumer.print_triggering_dataset_events dataset_triggered__2024-01-17T10:15:09.140192+00:00 [queued]>
[2024-01-17, 18:15:10 +08] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dag_consumer.print_triggering_dataset_events dataset_triggered__2024-01-17T10:15:09.140192+00:00 [queued]>
[2024-01-17, 18:15:10 +08] {taskinstance.py:2171} INFO - Starting attempt 1 of 1
[2024-01-17, 18:15:10 +08] {taskinstance.py:2192} INFO - Executing <Task(_PythonDecoratedOperator): print_triggering_dataset_events> on 2024-01-17 10:15:09.140192+00:00
[2024-01-17, 18:15:10 +08] {standard_task_runner.py:60} INFO - Started process 2656 to run task
[2024-01-17, 18:15:10 +08] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'dag_consumer', 'print_triggering_dataset_events', 'dataset_triggered__2024-01-17T10:15:09.140192+00:00', '--job-id', '1641', '--raw', '--subdir', 'DAGS_FOLDER/testdag.py', '--cfg-path', '/tmp/tmpeer76u97']
[2024-01-17, 18:15:10 +08] {standard_task_runner.py:88} INFO - Job 1641: Subtask print_triggering_dataset_events
[2024-01-17, 18:15:10 +08] {warnings.py:109} WARNING - /home/***/.local/lib/python3.8/site-packages/***/settings.py:195: DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config.
SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN")
[2024-01-17, 18:15:10 +08] {task_command.py:423} INFO - Running <TaskInstance: dag_consumer.print_triggering_dataset_events dataset_triggered__2024-01-17T10:15:09.140192+00:00 [running]> on host 4e6c155ede93
[2024-01-17, 18:15:10 +08] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='dag_consumer' AIRFLOW_CTX_TASK_ID='print_triggering_dataset_events' AIRFLOW_CTX_EXECUTION_DATE='2024-01-17T10:15:09.140192+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='dataset_triggered__2024-01-17T10:15:09.140192+00:00'
[2024-01-17, 18:15:10 +08] {logging_mixin.py:188} INFO - Task result: OK
[2024-01-17, 18:15:10 +08] {taskinstance.py:2699} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 242, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 199, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 216, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/testdag.py", line 38, in print_triggering_dataset_events
print('Task result: ', dataset_event.source_task_instance.xcom_pull(task_ids='test'))
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py", line 487, in __get__
return self.impl.get(state, dict_)
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py", line 959, in get
value = self._fire_loader_callables(state, key, passive)
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py", line 995, in _fire_loader_callables
return self.callable_(state, passive)
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/strategies.py", line 863, in _load_for_state
raise orm_exc.DetachedInstanceError(
sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <DatasetEvent at 0x7f66e6521eb0> is not bound to a Session; lazy load operation of attribute 'source_task_instance' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
[2024-01-17, 18:15:10 +08] {taskinstance.py:1138} INFO - Marking task as FAILED. dag_id=dag_consumer, task_id=print_triggering_dataset_events, execution_date=20240117T101509, start_date=20240117T101510, end_date=20240117T101510
[2024-01-17, 18:15:10 +08] {standard_task_runner.py:107} ERROR - Failed to execute job 1641 for task print_triggering_dataset_events (Parent instance <DatasetEvent at 0x7f66e6521eb0> is not bound to a Session; lazy load operation of attribute 'source_task_instance' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3); 2656)
[2024-01-17, 18:15:10 +08] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-01-17, 18:15:10 +08] {taskinstance.py:3281} INFO - 0 downstream tasks scheduled from follow-on schedule check
In the UI
With two DataSet - error
With one DataSet - OK
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct