-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
3.0.2
If "Other Airflow 2 version" selected, which one?
No response
What happened?
Consider the following structure (see below for a full code example):
DAG
- Parent task group (expanded through dynamic task mapping)
-
- Child task group
-
-
- Task 1 (
alter_input)
- Task 1 (
-
-
-
- Task 2 (
print_task)
- Task 2 (
-
Task 2 receives as input the result from task 1. Both are decorated with @task, and the task groups with @task_group. The inner task group looks like this:
def inner_task_group(orig_input: str) -> None:
altered_input = alter_input(orig_input)
print_task(orig_input, altered_input)
Then the print task's logs show this:
No XCom value found; defaulting to None.: key="return_value": dag_id="test_dag__feat__nested_test_dag": task_id="expandable_task_group.inner_task_group.alter_input": run_id="manual__2025-07-04T14:56:47.815002+00:00": map_index=null: source="task"
Note how it says "map_index=null". As a result, altered_input is not passed correctly.
Everything works fine for tasks in the parent task group, and everything works fine if the parent is not expanded.
The same use case works with Cloud Composer using Airflow version 2.10.5-build.0.
What you think should happen instead?
The parameter should be passed correctly using task flow syntax. For this, the correct map index should be used to retrieve the input arguments from XCOM.
How to reproduce
Minimal non-working example:
import logging
from airflow.decorators import dag
from airflow.decorators import task
from airflow.decorators import task_group
@task
def get_params() -> list[str]:
return ["One", "Two", "Three"]
@task()
def alter_input(inp: str) -> str:
return f"{inp}_Altered"
@task
def print_task(orig_input: str, altered_input: str) -> None:
logging.error(f"Orig input: {orig_input}")
logging.error(f"Altered input: {altered_input}") # -> this is not passed
@task_group
def inner_task_group(orig_input: str) -> None:
altered_input = alter_input(orig_input)
print_task(orig_input, altered_input)
@task_group
def expandable_task_group(param: str) -> None:
tg = inner_task_group(param)
tg
@dag(dag_id="test_dag")
def test_dag() -> None:
params = get_params()
tg = expandable_task_group.expand(param=params)
tg
test_dag()
Operating System
Debian GNU/Linux 12 (bookworm)
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
Can't think of any relevant customization that I did there...
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct