Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2730,6 +2730,14 @@ def tg2(inp):
:return: Specific map index or map indexes to pull, or ``None`` if we
want to "whole" return value (i.e. no mapped task groups involved).
"""
# This value should never be None since we already know the current task
# is in a mapped task group, and should have been expanded, despite that,
# we need to check that it is not None to satisfy Mypy.
# But this value can be 0 when we expand an empty list, for that it is
# necessary to check that ti_count is not 0 to avoid dividing by 0.
if not ti_count:
return None

# Find the innermost common mapped task group between the current task
# If the current task and the referenced task does not have a common
# mapped task group, the two are in different task mapping contexts
Expand All @@ -2738,12 +2746,6 @@ def tg2(inp):
if common_ancestor is None:
return None

# This value should never be None since we already know the current task
# is in a mapped task group, and should have been expanded. The check
# exists mainly to satisfy Mypy.
if ti_count is None:
return None

# At this point we know the two tasks share a mapped task group, and we
# should use a "partial" value. Let's break down the mapped ti count
# between the ancestor and further expansion happened inside it.
Expand Down
28 changes: 28 additions & 0 deletions tests/ti_deps/deps/test_trigger_rule_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,34 @@ def _one_scheduling_decision_iteration() -> dict[tuple[str, int], TaskInstance]:
assert sorted(tis) == [("t3", -1)]


def test_upstream_in_mapped_group_when_mapped_tasks_list_is_empty(dag_maker, session):
from airflow.decorators import task, task_group

with dag_maker(session=session):

@task
def t(x):
return x

@task_group
def tg(x):
t1 = t.override(task_id="t1")(x=x)
return t.override(task_id="t2")(x=t1)

t2 = tg.expand(x=[])
t.override(task_id="t3")(x=t2)

dr: DagRun = dag_maker.create_dagrun()

def _one_scheduling_decision_iteration() -> dict[tuple[str, int], TaskInstance]:
decision = dr.task_instance_scheduling_decisions(session=session)
return {(ti.task_id, ti.map_index): ti for ti in decision.schedulable_tis}

# should return an empty dict
tis = _one_scheduling_decision_iteration()
assert tis == {}


def test_mapped_task_check_before_expand(dag_maker, session):
with dag_maker(session=session):

Expand Down