-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.7.0
What happened
I have the following DAG:
from __future__ import annotations
from datetime import datetime
from airflow.decorators import dag, task, task_group
from airflow.utils.trigger_rule import TriggerRule
@task
def get_records() -> list[str]:
return ["a", "b", "c"]
@task
def submit_job(record: str) -> None:
pass
@task
def fake_sensor(record: str) -> bool:
raise RuntimeError("boo")
@task
def deliver_record(record: str) -> None:
pass
@task(trigger_rule=TriggerRule.ONE_FAILED)
def handle_failed_delivery(record: str) -> None:
pass
@task_group(group_id="deliver_records")
def deliver_record_task_group(record: str):
(
submit_job(record=record)
>> fake_sensor(record=record)
>> deliver_record(record=record)
>> handle_failed_delivery(record=record)
)
@dag(
dag_id="demo_trigger_one_failed",
schedule=None,
start_date=datetime(2023, 1, 1),
)
def demo_trigger_one_failed() -> None:
records = get_records()
deliver_record_task_group.expand(record=records)
demo_trigger_one_failed()fake_sensoris simulating a task that raises an exception. (It could be a@task.sensorraising aAirflowSensorTimeout; it doesn't matter, the behavior is the same.)handle_failed_delivery'sTriggerRule.ONE_FAILEDmeans it is supposed to run whenever any task upstream fails. So whenfake_sensorfails,handle_failed_deliveryshould run.
But this does not work. handle_failed_delivery is skipped, and (based on the UI) it's skipped very early, before it can know if the upstream tasks have completed successfully or errored.
Here's what I see, progressively (see How to reproduce below for how I got this):
| started ... | skipped too early ... | fake sensor about to fail... | ... done, didn't run |
|---|---|---|---|
![]() |
![]() |
![]() |
![]() |
If I remove the task group and instead do,
@dag(
dag_id="demo_trigger_one_failed",
schedule=None,
start_date=datetime(2023, 1, 1),
)
def demo_trigger_one_failed() -> None:
records = get_records()
(
submit_job(record=records)
>> fake_sensor.expand(record=records)
>> deliver_record.expand(record=records)
>> handle_failed_delivery.expand(record=records)
)then it does the right thing:
| started ... | waiting ... | ... done, triggered correctly |
|---|---|---|
![]() |
![]() |
![]() |
What you think should happen instead
The behavior with the task group should be the same as without the task group: the handle_failed_delivery task with trigger_rule=TriggerRule.ONE_FAILED should be run when the upstream fake_sensor task fails.
How to reproduce
-
Put the above DAG at a local path,
/tmp/dags/demo_trigger_one_failed.py. -
docker run -it --rm --mount type=bind,source="/tmp/dags",target=/opt/airflow/dags -p 8080:8080 apache/airflow:2.7.0-python3.10 bash -
In the container:
airflow db init airflow users create --role Admin --username airflow --email airflow --firstname airflow --lastname airflow --password airflow airflow scheduler --daemon airflow webserver -
Open
http://localhost:8080on the host. Login withairflow/airflow. Run the DAG.
I tested this with:
apache/airflow:2.6.2-python3.10apache/airflow:2.6.3-python3.10apache/airflow:2.7.0-python3.10
Operating System
Debian GNU/Linux 11 (bullseye)
Versions of Apache Airflow Providers
n/a
Deployment
Other Docker-based deployment
Deployment details
This can be reproduced using standalone Docker images, see Repro steps above.
Anything else
I wonder if this is related to (or fixed by?) #33446 -> #33732 ? (The latter was "added to the Airflow 2.7.1 milestone 3 days ago." I can try to install that pre-release code in the container and see if it's fixed.)
edit: nope, not fixed
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct






