-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Labels
Can't ReproduceThe problem cannot be reproducedThe problem cannot be reproducedarea:corekind:bugThis is a clearly a bugThis is a clearly a bug
Description
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
I have a task which returns a list of items. Sometimes this list can be empty. A downstream taskgroup is expanded with this list. When the list is empty, the scheduler crashes with a ZeroDivisionError.
This is in version 2.5.1.
Traceback (most recent call last):
File "/data/srv/airflow_builds/43ad18c873a6c1dab54c69896feb242823302142/dags/pipeline_input_to_s3.py", line 136, in <module>
dag.test()
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/utils/session.py", line 75, in wrapper
return func(*args, session=session, **kwargs)
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/models/dag.py", line 2533, in test
schedulable_tis, _ = dr.update_state(session=session)
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/models/dagrun.py", line 563, in update_state
info = self.task_instance_scheduling_decisions(session)
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/models/dagrun.py", line 710, in task_instance_scheduling_decisions
schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/models/dagrun.py", line 793, in _get_ready_tis
if not schedulable.are_dependencies_met(session=session, dep_context=dep_context):
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1070, in are_dependencies_met
for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, session=session):
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1091, in get_failed_dep_statuses
for dep_status in dep.get_dep_statuses(self, session, dep_context):
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 107, in get_dep_statuses
yield from self._get_dep_statuses(ti, session, cxt)
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 93, in _get_dep_statuses
yield from self._evaluate_trigger_rule(ti=ti, dep_context=dep_context, session=session)
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 219, in _evaluate_trigger_rule
.filter(or_(*_iter_upstream_conditions()))
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 191, in _iter_upstream_conditions
map_indexes = _get_relevant_upstream_map_indexes(upstream_id)
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 138, in _get_relevant_upstream_map_indexes
return ti.get_relevant_upstream_map_indexes(
File "/data/srv/pyenv/versions/3.9.9/envs/43ad18c873a6c1dab54c69896feb242823302142/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2652, in get_relevant_upstream_map_indexes
ancestor_map_index = self.map_index * ancestor_ti_count // ti_count
ZeroDivisionError: integer division or modulo by zero
What you think should happen instead
Downstream tasks should be skipped.
How to reproduce
Expand a task group with an empty list.
Operating System
Mac+linux
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
Can't ReproduceThe problem cannot be reproducedThe problem cannot be reproducedarea:corekind:bugThis is a clearly a bugThis is a clearly a bug