Skip to content

Scheduler crashes w/ divide by 0 error when expanding with empty list #30647

@luederm

Description

@luederm

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

I have a task which returns a list of items. Sometimes this list can be empty. A downstream taskgroup is expanded with this list. When the list is empty, the scheduler crashes with a ZeroDivisionError.
This is in version 2.5.1.

Traceback (most recent call last):
  File "/data/srv/airflow_builds/43ad18c873a6c1dab54c69896feb242823302142/dags/pipeline_input_to_s3.py", line 136, in <module>
    dag.test()
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/models/dag.py", line 2533, in test
    schedulable_tis, _ = dr.update_state(session=session)
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/models/dagrun.py", line 563, in update_state
    info = self.task_instance_scheduling_decisions(session)
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/models/dagrun.py", line 710, in task_instance_scheduling_decisions
    schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/models/dagrun.py", line 793, in _get_ready_tis
    if not schedulable.are_dependencies_met(session=session, dep_context=dep_context):
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1070, in are_dependencies_met
    for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, session=session):
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1091, in get_failed_dep_statuses
    for dep_status in dep.get_dep_statuses(self, session, dep_context):
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 107, in get_dep_statuses
    yield from self._get_dep_statuses(ti, session, cxt)
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 93, in _get_dep_statuses
    yield from self._evaluate_trigger_rule(ti=ti, dep_context=dep_context, session=session)
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 219, in _evaluate_trigger_rule
    .filter(or_(*_iter_upstream_conditions()))
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 191, in _iter_upstream_conditions
    map_indexes = _get_relevant_upstream_map_indexes(upstream_id)
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 138, in _get_relevant_upstream_map_indexes
    return ti.get_relevant_upstream_map_indexes(
  File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2652, in get_relevant_upstream_map_indexes
    ancestor_map_index = self.map_index * ancestor_ti_count // ti_count
ZeroDivisionError: integer division or modulo by zero

What you think should happen instead

Downstream tasks should be skipped.

How to reproduce

Expand a task group with an empty list.

Operating System

Mac+linux

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions