Skip to content

Dynamic Task Mapping skips tasks before upstream has started #28973

@inytar

Description

@inytar

Apache Airflow version

2.5.0

What happened

In some cases we are seeing dynamic mapped task being skipped before upstream tasks have started & the dynamic count for the task can be calculated. We see this both locally in a with the LocalExecutor & on our cluster with the KubernetesExecutor.

To trigger the issue we need multiple dynamic tasks merging into a upstream task, see the images below for example. If there is no merging the tasks run as expected. The tasks also need to not know the number of dynamic tasks that will be created on DAG start, for example by chaining in an other dynamic task output.

screenshot_2023-01-16_at_14-57-23_test_skip_-graph-_airflow
screenshot_2023-01-16_at_14-56-44_test_skip_-graph-_airflow

If the DAG, task, or upstream tasks are cleared the skipped task runs as expected.

The issue exists both on airflow 2.4.x & 2.5.0.

Happy to help debug this further & answer any questions!

What you think should happen instead

The tasks should run after upstream tasks are done.

How to reproduce

The following code is able to reproduce the issue on our side:

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.utils.task_group import TaskGroup
from airflow.operators.empty import EmptyOperator

# Only one chained tasks results in only 1 of the `skipped_tasks` skipping.
# Add in extra tasks results in both `skipped_tasks` skipping, but
# no earlier tasks are ever skipped.
CHAIN_TASKS = 1


@task()
def add(x, y):
    return x, y


with DAG(
    dag_id="test_skip",
    schedule=None,
    start_date=datetime(2023, 1, 13),
) as dag:

    init = EmptyOperator(task_id="init_task")
    final = EmptyOperator(task_id="final")

    for i in range(2):
        with TaskGroup(f"task_group_{i}") as tg:
            chain_task = [i]
            for j in range(CHAIN_TASKS):
                chain_task = add.partial(x=j).expand(y=chain_task)
            skipped_task = (
                add.override(task_id="skipped").partial(x=i).expand(y=chain_task)
            )

        # Task isn't skipped if final (merging task) is removed.
        init >> tg >> final

Operating System

MacOS

Versions of Apache Airflow Providers

This can be reproduced without any extra providers installed.

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions