Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simultaneous tasks completion causing missing dataset triggered dag runs #35870

Open
1 of 2 tasks
boushphong opened this issue Nov 26, 2023 · 4 comments
Open
1 of 2 tasks
Labels
area:core area:datasets Issues related to the datasets feature kind:bug This is a clearly a bug

Comments

@boushphong
Copy link
Contributor

boushphong commented Nov 26, 2023

Apache Airflow version

2.7.3

What happened

When multiple airflow tasks finish at about the same time, and those tasks are also responsible for triggering other Dag via Dataset. There will be missing dataset triggered dag runs.

For example:
A Dag that has 2 tasks triggering another Dag via Dataset, there must be 2 dataset triggered dag runs for the triggered dag.
From my observation, if 2 tasks finishes at about the same time, there will be missing triggered dag runs, so there might be only 1 dag run will be triggered instead of 2.

What you think should happen instead

The number of dataset triggered dag runs has to be added up to the number of tasks (that triggers the dataset run) that finishes at the same time.

How to reproduce

Code to reproduce:

import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.datasets import Dataset

with DAG(
        "dataset_triggered_runs",
        start_date=datetime.datetime(2022, 1, 1),
        schedule="0 0 * * *",
        catchup=False,
) as dataset_triggered_runs:
    bash_sleep = BashOperator(
        task_id="bash_sleep", bash_command="sleep 3", outlets=[Dataset("test")]
    )

    bash_sleep_2 = BashOperator(
        task_id="bash_sleep_2", bash_command="sleep 5", outlets=[Dataset("test")]
    )

with DAG(
        "missing_dataset_triggered_runs",
        start_date=datetime.datetime(2022, 1, 1),
        schedule="0 0 * * *",
        catchup=False,
) as missing_dataset_triggered_runs:
    echo1 = BashOperator(
        task_id="bash_sleep", bash_command="echo 1", outlets=[Dataset("test")]
    )

    echo2 = BashOperator(
        task_id="bash_sleep_2", bash_command="echo 2", outlets=[Dataset("test")]
    )

with DAG(
        "trigger_v1",
        start_date=datetime.datetime(2022, 1, 1),
        schedule=[Dataset("test")],
) as dag2:
    dataset_trigger_task = EmptyOperator(task_id="empty_task_2")

The dataset_triggered_runs DAG have 2 tasks (that triggers dataset run) finishing at different time, and there are 2 dataset triggered dag runs, which is expected.

[2023-11-27T03:19:22.897+0700] {dagrun.py:653} INFO - Marking run <DagRun trigger_v1 @ 2023-11-26 20:19:21.217750+00:00: dataset_triggered__2023-11-26T20:19:21.217750+00:00, state:running, queued_at: 2023-11-26 20:19:21.809036+00:00. externally triggered: False> successful
[2023-11-27T03:19:22.897+0700] {dagrun.py:704} INFO - DagRun Finished: dag_id=trigger_v1, execution_date=2023-11-26 20:19:21.217750+00:00, run_id=dataset_triggered__2023-11-26T20:19:21.217750+00:00, run_start_date=2023-11-26 20:19:21.851541+00:00, run_end_date=2023-11-26 20:19:22.897392+00:00, run_duration=1.045851, state=success, external_trigger=False, run_type=dataset_triggered, data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26 00:00:00+00:00, dag_hash=f4426a33d21238a4f41898744b2ce017
[2023-11-27T03:19:23.490+0700] {dagrun.py:653} INFO - Marking run <DagRun dataset_triggered_runs @ 2023-11-26 20:19:17.627907+00:00: manual__2023-11-26T20:19:17.627907+00:00, state:running, queued_at: 2023-11-26 20:19:17.637778+00:00. externally triggered: True> successful
[2023-11-27T03:19:23.490+0700] {dagrun.py:704} INFO - DagRun Finished: dag_id=dataset_triggered_runs, execution_date=2023-11-26 20:19:17.627907+00:00, run_id=manual__2023-11-26T20:19:17.627907+00:00, run_start_date=2023-11-26 20:19:17.888672+00:00, run_end_date=2023-11-26 20:19:23.490855+00:00, run_duration=5.602183, state=success, external_trigger=True, run_type=manual, data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26 00:00:00+00:00, dag_hash=d60293c69d62b7a5aeb3cf3d884b9693
[2023-11-27T03:19:23.500+0700] {scheduler_job_runner.py:685} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='dataset_triggered_runs', task_id='bash_sleep_2', run_id='manual__2023-11-26T20:19:17.627907+00:00', try_number=1, map_index=-1)
[2023-11-27T03:19:23.503+0700] {scheduler_job_runner.py:722} INFO - TaskInstance Finished: dag_id=dataset_triggered_runs, task_id=bash_sleep_2, run_id=manual__2023-11-26T20:19:17.627907+00:00, map_index=-1, run_start_date=2023-11-26 20:19:18.027568+00:00, run_end_date=2023-11-26 20:19:23.206466+00:00, run_duration=5.178898, state=success, executor_state=success, try_number=1, max_tries=0, job_id=189, pool=default_pool, queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2023-11-26 20:19:17.914731+00:00, queued_by_job_id=1, pid=23848
[2023-11-27T03:19:23.678+0700] {dagrun.py:653} INFO - Marking run <DagRun trigger_v1 @ 2023-11-26 20:19:23.214909+00:00: dataset_triggered__2023-11-26T20:19:23.214909+00:00, state:running, queued_at: 2023-11-26 20:19:23.471484+00:00. externally triggered: False> successful
[2023-11-27T03:19:23.678+0700] {dagrun.py:704} INFO - DagRun Finished: dag_id=trigger_v1, execution_date=2023-11-26 20:19:23.214909+00:00, run_id=dataset_triggered__2023-11-26T20:19:23.214909+00:00, run_start_date=2023-11-26 20:19:23.479981+00:00, run_end_date=2023-11-26 20:19:23.678956+00:00, run_duration=0.198975, state=success, external_trigger=False, run_type=dataset_triggered, data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26 00:00:00+00:00, dag_hash=f4426a33d21238a4f41898744b2ce017

However, the missing_dataset_triggered_runs DAG have 2 tasks (that triggers dataset run) finishing at about the same time, and there is only 1 dataset triggered dag run, which is unexpected. This is very likely a bug.

[2023-11-27T03:21:42.147+0700] {scheduler_job_runner.py:722} INFO - TaskInstance Finished: dag_id=missing_dataset_triggered_runs, task_id=bash_sleep_2, run_id=scheduled__2023-11-25T00:00:00+00:00, map_index=-1, run_start_date=2023-11-26 20:21:41.184677+00:00, run_end_date=2023-11-26 20:21:41.356996+00:00, run_duration=0.172319, state=success, executor_state=success, try_number=1, max_tries=0, job_id=190, pool=default_pool, queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2023-11-26 20:21:41.048606+00:00, queued_by_job_id=1, pid=24351
[2023-11-27T03:21:42.147+0700] {scheduler_job_runner.py:722} INFO - TaskInstance Finished: dag_id=missing_dataset_triggered_runs, task_id=bash_sleep, run_id=scheduled__2023-11-25T00:00:00+00:00, map_index=-1, run_start_date=2023-11-26 20:21:41.184319+00:00, run_end_date=2023-11-26 20:21:41.345462+00:00, run_duration=0.161143, state=success, executor_state=success, try_number=1, max_tries=0, job_id=191, pool=default_pool, queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2023-11-26 20:21:41.048606+00:00, queued_by_job_id=1, pid=24350
[2023-11-27T03:21:43.175+0700] {dagrun.py:653} INFO - Marking run <DagRun trigger_v1 @ 2023-11-26 20:21:41.356742+00:00: dataset_triggered__2023-11-26T20:21:41.356742+00:00, state:running, queued_at: 2023-11-26 20:21:42.118410+00:00. externally triggered: False> successful
[2023-11-27T03:21:43.175+0700] {dagrun.py:704} INFO - DagRun Finished: dag_id=trigger_v1, execution_date=2023-11-26 20:21:41.356742+00:00, run_id=dataset_triggered__2023-11-26T20:21:41.356742+00:00, run_start_date=2023-11-26 20:21:42.126641+00:00, run_end_date=2023-11-26 20:21:43.175274+00:00, run_duration=1.048633, state=success, external_trigger=False, run_type=dataset_triggered, data_interval_start=2023-11-25 00:00:00+00:00, data_interval_end=2023-11-26 00:00:00+00:00, dag_hash=f4426a33d21238a4f41898744b2ce017

Operating System

Docker

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@boushphong boushphong added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Nov 26, 2023
@nathadfield
Copy link
Collaborator

@boushphong Thanks for logging and I have been able to replicate it. I see that you indicated you'd be willing to submit a PR, so shall I assign you to this issue?

@nathadfield nathadfield added pending-response area:datasets Issues related to the datasets feature labels Nov 27, 2023
@boushphong
Copy link
Contributor Author

Sure things. Let me work on this.

@nathadfield nathadfield removed pending-response needs-triage label for new issues that we didn't triage yet labels Nov 27, 2023
@boushphong
Copy link
Contributor Author

@nathadfield Just submitted a pull request. It's a DRAFT PR for now.
https://github.com/apache/airflow/pull/36032/files#diff-4253adbb36bfb93cb75ab00c7d509518134e5bf1ad16473b64a2a6d8fa456c92L208-L214

I went with the idea to remove primary key for the dataset_dag_run_queue table so that when we insert a new record in the table as in (code):

stmt = insert(DatasetDagRunQueue).values(dataset_id=dataset.id).on_conflict_do_nothing()

so that we won't face any conflict issue because if a Dag has multiple tasks updating the same Dataset, we would get a conflict because we insert 2 records but they'd conflict with each other due to the primary key constraint.

Just briefing my idea before committing more time to this solution. WDYT?
By the way, if I make changes to the model, Do I have to modify the migrations package and if so where would I have to look into.
Cheers!

@nathadfield
Copy link
Collaborator

@boushphong I'm probably not best to comment on this as I don't really know much about this aspect of Airflow. Perhaps tag some of the people who have also worked in this area on the PR?

@boushphong boushphong removed their assignment Apr 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core area:datasets Issues related to the datasets feature kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

2 participants