-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.11.0
If "Other Airflow 2 version" selected, which one?
No response
What happened?
The mapping length in expandinput is determined incorrectly to be the number of returns from a task, instead of the number of elements in the sequence we map on (which is one of the values returned by the aforementioned task).
Note how all_lengths is always 2 regardless of the contents of value:
Depending on the relative size of the determined and actual map size, the dag either reports the error below, works correctly, or ends up not processing the later elements.
.../.venv/lib64/python3.11/site-packages/airflow/models/expandinput.py", line 197, in _expand_mapped_field
return value[found_index]
~~~~~^^^^^^^^^^^^^
IndexError: list index out of range
What you think should happen instead?
all_length should correctly reflect the number of elements we're mapping on.
How to reproduce
from __future__ import annotations
import pendulum
from airflow.decorators import task, task_group
from airflow.models.dag import DAG
TYPE_CHECKING: bool = False
if TYPE_CHECKING:
from datetime import datetime
from typing import Any
@task.python(multiple_outputs=True)
def determine_run_params() -> dict[str, Any]:
return {
"targets": [f"target{n}" for n in range(1)], # <<<<<<<<<< Modify for different results
"date": pendulum.now(),
# "str1": "1", # Uncomment these for different results
# "str2": "2",
# "str3": "3",
}
@task.python
def do_something(
target: str,
date: datetime,
) -> str:
return f"All good for {target} on {date}!"
@task_group
def process(
date: datetime,
target: str,
) -> None:
do_something(target, date)
with DAG(
"mcve",
start_date=pendulum.datetime(2020, 1, 1),
) as dag:
run_params = determine_run_params()
process.partial(date=run_params["date"]).expand(target=run_params["targets"])
if __name__ == "__main__":
dag.test()Workaround
Introduce an intermediate passthrough task:
@task.python
def unpack(something: list[str]) -> list[str]:
return something- process.partial(date=run_params["date"]).expand(target=run_params["targets"])
+ process.partial(date=run_params["date"]).expand(target=unpack(run_params["targets"])) Operating System
Linux
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
No response
Anything else?
When the number of map elements (length of targets) is equal to the number of outputs from determine_run_params, the code works correctly. Perhaps Airflow's tests related to this functionality are hitting this exact sweet spot.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct

