Skip to content

map_index = null for passed parameters in tasks in child task group when parent task group is expanded #52881

@MarcusCramer91

Description

@MarcusCramer91

Apache Airflow version

3.0.2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Consider the following structure (see below for a full code example):
DAG

  • Parent task group (expanded through dynamic task mapping)
    • Child task group
      • Task 1 (alter_input)
      • Task 2 (print_task)

Task 2 receives as input the result from task 1. Both are decorated with @task, and the task groups with @task_group. The inner task group looks like this:

def inner_task_group(orig_input: str) -> None:
    altered_input = alter_input(orig_input)
    print_task(orig_input, altered_input)

Then the print task's logs show this:

No XCom value found; defaulting to None.: key="return_value": dag_id="test_dag__feat__nested_test_dag": task_id="expandable_task_group.inner_task_group.alter_input": run_id="manual__2025-07-04T14:56:47.815002+00:00": map_index=null: source="task"

Note how it says "map_index=null". As a result, altered_input is not passed correctly.

Everything works fine for tasks in the parent task group, and everything works fine if the parent is not expanded.

The same use case works with Cloud Composer using Airflow version 2.10.5-build.0.

What you think should happen instead?

The parameter should be passed correctly using task flow syntax. For this, the correct map index should be used to retrieve the input arguments from XCOM.

How to reproduce

Minimal non-working example:

import logging

from airflow.decorators import dag
from airflow.decorators import task
from airflow.decorators import task_group


@task
def get_params() -> list[str]:
    return ["One", "Two", "Three"]


@task()
def alter_input(inp: str) -> str:
    return f"{inp}_Altered"


@task
def print_task(orig_input: str, altered_input: str) -> None:
    logging.error(f"Orig input: {orig_input}")
    logging.error(f"Altered input: {altered_input}")  # -> this is not passed


@task_group
def inner_task_group(orig_input: str) -> None:
    altered_input = alter_input(orig_input)
    print_task(orig_input, altered_input)


@task_group
def expandable_task_group(param: str) -> None:
    tg = inner_task_group(param)
    tg


@dag(dag_id="test_dag")
def test_dag() -> None:
    params = get_params()
    tg = expandable_task_group.expand(param=params)
    tg


test_dag()

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

Can't think of any relevant customization that I did there...

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions