Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Title: Error When Using Multiple DataSets as Triggers in Apache AirFlow #36837

Open
1 of 2 tasks
SoftDed opened this issue Jan 17, 2024 · 5 comments
Open
1 of 2 tasks
Assignees
Labels
area:core area:datasets Issues related to the datasets feature good first issue kind:bug This is a clearly a bug

Comments

@SoftDed
Copy link

SoftDed commented Jan 17, 2024

Apache Airflow version

2.8.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

When using two or more DataSets as triggers, an error occurs while accessing the source_task_instance object of DatasetEvent.

What you think should happen instead?

It should be possible to access all fields of every DatasetEvent.

How to reproduce

Create two test DAGs (producer and consumer) and link them with two DataSets.
Code:

from __future__ import annotations

from datetime import datetime

from airflow.datasets import Dataset
from airflow.decorators import task, dag

testDataSet = Dataset('/test')
testDataSet2 = Dataset('/test2')


@dag(
    dag_id='dag_producer',
    schedule=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
)
def dag_producer():
    @task(outlets=[testDataSet, testDataSet2])
    def test():
        return 'OK'

    test()


@dag(
    dag_id="dag_consumer",
    schedule=[testDataSet, testDataSet2],
    start_date=datetime(2021, 1, 1),
    catchup=False,

)
def dag_consumer():
    @task
    def print_triggering_dataset_events(triggering_dataset_events=None):
        for dataset, dataset_list in triggering_dataset_events.items():
            for dataset_event in dataset_list:
                print('Task result: ', dataset_event.source_task_instance.xcom_pull(task_ids='test'))

    print_triggering_dataset_events()


dag_consumer()
dag_producer()

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

airflow@4e6c155ede93:/opt/airflow$ pip freeze | grep provider
apache-airflow-providers-amazon==8.13.0
apache-airflow-providers-celery==3.5.0
apache-airflow-providers-cncf-kubernetes==7.11.0
apache-airflow-providers-common-io==1.1.0
apache-airflow-providers-common-sql==1.9.0
apache-airflow-providers-docker==3.8.2
apache-airflow-providers-elasticsearch==5.3.0
apache-airflow-providers-ftp==3.7.0
apache-airflow-providers-google==10.12.0
apache-airflow-providers-grpc==3.4.0
apache-airflow-providers-hashicorp==3.6.0
apache-airflow-providers-http==4.8.0
apache-airflow-providers-imap==3.5.0
apache-airflow-providers-microsoft-azure==8.4.0
apache-airflow-providers-mysql==5.5.0
apache-airflow-providers-odbc==4.2.0
apache-airflow-providers-openlineage==1.3.0
apache-airflow-providers-postgres==5.9.0
apache-airflow-providers-redis==3.5.0
apache-airflow-providers-sendgrid==3.4.0
apache-airflow-providers-sftp==4.8.0
apache-airflow-providers-slack==8.5.0
apache-airflow-providers-snowflake==5.2.0
apache-airflow-providers-sqlite==3.6.0
apache-airflow-providers-ssh==3.9.0

Deployment

Docker-Compose

Deployment details

No response

Anything else?

Error in log:

4e6c155ede93
*** Found local files:
***   * /opt/airflow/logs/dag_id=dag_consumer/run_id=dataset_triggered__2024-01-17T09:50:59.904054+00:004e6c155ede93
*** Found local files:
***   * /opt/airflow/logs/dag_id=dag_consumer/run_id=dataset_triggered__2024-01-17T10:15:09.140192+00:00/task_id=print_triggering_dataset_events/attempt=1.log
[2024-01-17, 18:15:10 +08] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dag_consumer.print_triggering_dataset_events dataset_triggered__2024-01-17T10:15:09.140192+00:00 [queued]>
[2024-01-17, 18:15:10 +08] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dag_consumer.print_triggering_dataset_events dataset_triggered__2024-01-17T10:15:09.140192+00:00 [queued]>
[2024-01-17, 18:15:10 +08] {taskinstance.py:2171} INFO - Starting attempt 1 of 1
[2024-01-17, 18:15:10 +08] {taskinstance.py:2192} INFO - Executing <Task(_PythonDecoratedOperator): print_triggering_dataset_events> on 2024-01-17 10:15:09.140192+00:00
[2024-01-17, 18:15:10 +08] {standard_task_runner.py:60} INFO - Started process 2656 to run task
[2024-01-17, 18:15:10 +08] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'dag_consumer', 'print_triggering_dataset_events', 'dataset_triggered__2024-01-17T10:15:09.140192+00:00', '--job-id', '1641', '--raw', '--subdir', 'DAGS_FOLDER/testdag.py', '--cfg-path', '/tmp/tmpeer76u97']
[2024-01-17, 18:15:10 +08] {standard_task_runner.py:88} INFO - Job 1641: Subtask print_triggering_dataset_events
[2024-01-17, 18:15:10 +08] {warnings.py:109} WARNING - /home/***/.local/lib/python3.8/site-packages/***/settings.py:195: DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config.
  SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN")
[2024-01-17, 18:15:10 +08] {task_command.py:423} INFO - Running <TaskInstance: dag_consumer.print_triggering_dataset_events dataset_triggered__2024-01-17T10:15:09.140192+00:00 [running]> on host 4e6c155ede93
[2024-01-17, 18:15:10 +08] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='dag_consumer' AIRFLOW_CTX_TASK_ID='print_triggering_dataset_events' AIRFLOW_CTX_EXECUTION_DATE='2024-01-17T10:15:09.140192+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='dataset_triggered__2024-01-17T10:15:09.140192+00:00'
[2024-01-17, 18:15:10 +08] {logging_mixin.py:188} INFO - Task result:  OK
[2024-01-17, 18:15:10 +08] {taskinstance.py:2699} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 242, in execute
    return_value = super().execute(context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 199, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 216, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/testdag.py", line 38, in print_triggering_dataset_events
    print('Task result: ', dataset_event.source_task_instance.xcom_pull(task_ids='test'))
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py", line 487, in __get__
    return self.impl.get(state, dict_)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py", line 959, in get
    value = self._fire_loader_callables(state, key, passive)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py", line 995, in _fire_loader_callables
    return self.callable_(state, passive)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/strategies.py", line 863, in _load_for_state
    raise orm_exc.DetachedInstanceError(
sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <DatasetEvent at 0x7f66e6521eb0> is not bound to a Session; lazy load operation of attribute 'source_task_instance' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
[2024-01-17, 18:15:10 +08] {taskinstance.py:1138} INFO - Marking task as FAILED. dag_id=dag_consumer, task_id=print_triggering_dataset_events, execution_date=20240117T101509, start_date=20240117T101510, end_date=20240117T101510
[2024-01-17, 18:15:10 +08] {standard_task_runner.py:107} ERROR - Failed to execute job 1641 for task print_triggering_dataset_events (Parent instance <DatasetEvent at 0x7f66e6521eb0> is not bound to a Session; lazy load operation of attribute 'source_task_instance' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3); 2656)
[2024-01-17, 18:15:10 +08] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-01-17, 18:15:10 +08] {taskinstance.py:3281} INFO - 0 downstream tasks scheduled from follow-on schedule check

In the UI

With two DataSet - error

2024-01-17_17-50-52
2024-01-17_17-51-09
2024-01-17_17-51-27

With one DataSet - OK

2024-01-17_17-55-05
2024-01-17_17-54-56
2024-01-17_17-54-37

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@SoftDed SoftDed added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jan 17, 2024
Copy link

boring-cyborg bot commented Jan 17, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@SoftDed
Copy link
Author

SoftDed commented Jan 19, 2024

I've found a workaround for this problem. I can manually create a session:

    def print_triggering_dataset_events(triggering_dataset_events=None, **kwargs):
        from airflow.utils.session import create_session

        with create_session() as session:    <-------------------------------------------------------------------------------------- here
            for dataset, dataset_list in triggering_dataset_events.items():
                for dataset_event in dataset_list:
                    print('Task result: ', dataset_event.source_task_instance.xcom_pull(task_ids='test', session=session))  <------- and here

@RNHTTR RNHTTR added area:datasets Issues related to the datasets feature good first issue and removed needs-triage label for new issues that we didn't triage yet labels Jan 20, 2024
@SoftDed
Copy link
Author

SoftDed commented Jan 23, 2024

Another observation.

I wrote my own xcom backend.
If I use S3Hook and fetch data from Connection in the metadata database, everything breaks.
However, if I retrieve data from environment variables, such a 'hack' works.

@boraberke
Copy link
Contributor

Hi @utkarsharma2, I would like to give this a try. Could you please assign me?

@boraberke
Copy link
Contributor

Hi @SoftDed,

I think the problem happens after xcom_pull is called. If one collect source_task_instances first and then calls xcom_pull, then it works without any issue:

    @task
    def print_triggering_dataset_events(triggering_dataset_events=None):
        task_instances = []
        for dataset, dataset_list in triggering_dataset_events.items():
            for dataset_event in dataset_list:
                task_instances.append(dataset_event.source_task_instance)
        for task_instance in task_instances:
            print('Task instance: ', task_instance.xcom_pull(task_ids='test'))

I am not sure how to fix this but wanted to share this observation as well.

@eladkal eladkal removed this from the Airflow 2.9.3 milestone Jun 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core area:datasets Issues related to the datasets feature good first issue kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

7 participants