Skip to content

Map length determined incorrectly when mapping over an output of a multi-output task #51109

@Dev-iL

Description

@Dev-iL

Apache Airflow version

2.11.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

The mapping length in expandinput is determined incorrectly to be the number of returns from a task, instead of the number of elements in the sequence we map on (which is one of the values returned by the aforementioned task).

Note how all_lengths is always 2 regardless of the contents of value:

Image

Image

Depending on the relative size of the determined and actual map size, the dag either reports the error below, works correctly, or ends up not processing the later elements.

.../.venv/lib64/python3.11/site-packages/airflow/models/expandinput.py", line 197, in _expand_mapped_field
    return value[found_index]
           ~~~~~^^^^^^^^^^^^^
IndexError: list index out of range

What you think should happen instead?

all_length should correctly reflect the number of elements we're mapping on.

How to reproduce

from __future__ import annotations

import pendulum
from airflow.decorators import task, task_group
from airflow.models.dag import DAG

TYPE_CHECKING: bool = False
if TYPE_CHECKING:
    from datetime import datetime
    from typing import Any


@task.python(multiple_outputs=True)
def determine_run_params() -> dict[str, Any]:
    return {
        "targets": [f"target{n}" for n in range(1)],  # <<<<<<<<<< Modify for different results
        "date": pendulum.now(),
        # "str1": "1",  # Uncomment these for different results
        # "str2": "2",
        # "str3": "3",
    }


@task.python
def do_something(
    target: str,
    date: datetime,
) -> str:
    return f"All good for {target} on {date}!"


@task_group
def process(
    date: datetime,
    target: str,
) -> None:
    do_something(target, date)


with DAG(
    "mcve",
    start_date=pendulum.datetime(2020, 1, 1),
) as dag:
    run_params = determine_run_params()
    process.partial(date=run_params["date"]).expand(target=run_params["targets"])


if __name__ == "__main__":
    dag.test()

Workaround

Introduce an intermediate passthrough task:

@task.python
def unpack(something: list[str]) -> list[str]:
    return something
- process.partial(date=run_params["date"]).expand(target=run_params["targets"])
+ process.partial(date=run_params["date"]).expand(target=unpack(run_params["targets"])) 

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

When the number of map elements (length of targets) is equal to the number of outputs from determine_run_params, the code works correctly. Perhaps Airflow's tests related to this functionality are hitting this exact sweet spot.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions