Skip to content

Trigger Rule ONE_FAILED does not work in task group with mapped tasks #34023

@benbuckman

Description

@benbuckman

Apache Airflow version

2.7.0

What happened

I have the following DAG:

from __future__ import annotations
from datetime import datetime

from airflow.decorators import dag, task, task_group
from airflow.utils.trigger_rule import TriggerRule

@task
def get_records() -> list[str]:
    return ["a", "b", "c"]


@task
def submit_job(record: str) -> None:
    pass

@task
def fake_sensor(record: str) -> bool:
    raise RuntimeError("boo")


@task
def deliver_record(record: str) -> None:
    pass


@task(trigger_rule=TriggerRule.ONE_FAILED)
def handle_failed_delivery(record: str) -> None:
    pass


@task_group(group_id="deliver_records")
def deliver_record_task_group(record: str):
    (
        submit_job(record=record)
        >> fake_sensor(record=record)
        >> deliver_record(record=record)
        >> handle_failed_delivery(record=record)
    )

@dag(
    dag_id="demo_trigger_one_failed",
    schedule=None,
    start_date=datetime(2023, 1, 1),
)
def demo_trigger_one_failed() -> None:
    records = get_records()
    deliver_record_task_group.expand(record=records)


demo_trigger_one_failed()
  • fake_sensor is simulating a task that raises an exception. (It could be a @task.sensor raising a AirflowSensorTimeout; it doesn't matter, the behavior is the same.)
  • handle_failed_delivery's TriggerRule.ONE_FAILED means it is supposed to run whenever any task upstream fails. So when fake_sensor fails, handle_failed_delivery should run.

But this does not work. handle_failed_delivery is skipped, and (based on the UI) it's skipped very early, before it can know if the upstream tasks have completed successfully or errored.

Here's what I see, progressively (see How to reproduce below for how I got this):

started ... skipped too early ... fake sensor about to fail... ... done, didn't run
Screenshot 2023-09-01 at 3 26 49 PM Screenshot 2023-09-01 at 3 26 50 PM Screenshot 2023-09-01 at 3 26 53 PM Screenshot 2023-09-01 at 3 26 56 PM

If I remove the task group and instead do,

@dag(
    dag_id="demo_trigger_one_failed",
    schedule=None,
    start_date=datetime(2023, 1, 1),
)
def demo_trigger_one_failed() -> None:
    records = get_records()
    (
        submit_job(record=records)
        >> fake_sensor.expand(record=records)
        >> deliver_record.expand(record=records)
        >> handle_failed_delivery.expand(record=records)
    )

then it does the right thing:

started ... waiting ... ... done, triggered correctly
Screenshot 2023-09-01 at 3 46 48 PM Screenshot 2023-09-01 at 3 46 50 PM Screenshot 2023-09-01 at 3 46 53 PM

What you think should happen instead

The behavior with the task group should be the same as without the task group: the handle_failed_delivery task with trigger_rule=TriggerRule.ONE_FAILED should be run when the upstream fake_sensor task fails.

How to reproduce

  1. Put the above DAG at a local path, /tmp/dags/demo_trigger_one_failed.py.

  2. docker run -it --rm --mount type=bind,source="/tmp/dags",target=/opt/airflow/dags -p 8080:8080 apache/airflow:2.7.0-python3.10 bash

  3. In the container:

    airflow db init
    airflow users create --role Admin --username airflow --email airflow --firstname airflow --lastname airflow --password airflow
    airflow scheduler --daemon
    airflow webserver
    
  4. Open http://localhost:8080 on the host. Login with airflow / airflow. Run the DAG.

I tested this with:

  • apache/airflow:2.6.2-python3.10
  • apache/airflow:2.6.3-python3.10
  • apache/airflow:2.7.0-python3.10

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

n/a

Deployment

Other Docker-based deployment

Deployment details

This can be reproduced using standalone Docker images, see Repro steps above.

Anything else

I wonder if this is related to (or fixed by?) #33446 -> #33732 ? (The latter was "added to the Airflow 2.7.1 milestone 3 days ago." I can try to install that pre-release code in the container and see if it's fixed.)
edit: nope, not fixed

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions