Skip to content

Task group expand fails on empty list at get_relevant_upstream_map_indexes #30073

@badrisrinivas9

Description

@badrisrinivas9

Apache Airflow version

2.5.1

What happened

Expanding of task group fails when the list is empty and there is a task which references mapped index in xcom pull of that group.

image

throws below error
Traceback (most recent call last):
File "/opt/bitnami/airflow/venv/bin/airflow", line 8, in
sys.exit(main())
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/main.py", line 39, in main
args.func(args)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 52, in command
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/cli.py", line 108, in wrapper
return f(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 73, in scheduler
_run_scheduler_job(args=args)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 43, in _run_scheduler_job
job.run()
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 258, in run
self._execute()
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 759, in _execute
self._run_scheduler_loop()
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 885, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 964, in _do_scheduling
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/retries.py", line 78, in wrapped_function
for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/tenacity/init.py", line 384, in iter
do = self.iter(retry_state=retry_state)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/tenacity/init.py", line 351, in iter
return fut.result()
File "/opt/bitnami/python/lib/python3.9/concurrent/futures/_base.py", line 439, in result
return self.__get_result()
File "/opt/bitnami/python/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/retries.py", line 87, in wrapped_function
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1253, in _schedule_all_dag_runs
callback_to_run = self._schedule_dag_run(dag_run, session)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1322, in _schedule_dag_run
schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/dagrun.py", line 563, in update_state
info = self.task_instance_scheduling_decisions(session)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/dagrun.py", line 710, in task_instance_scheduling_decisions
schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/dagrun.py", line 793, in _get_ready_tis
if not schedulable.are_dependencies_met(session=session, dep_context=dep_context):
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1070, in are_dependencies_met
for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, session=session):
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1091, in get_failed_dep_statuses
for dep_status in dep.get_dep_statuses(self, session, dep_context):
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 107, in get_dep_statuses
yield from self._get_dep_statuses(ti, session, cxt)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 93, in _get_dep_statuses
yield from self._evaluate_trigger_rule(ti=ti, dep_context=dep_context, session=session)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 219, in evaluate_trigger_rule
.filter(or
(
_iter_upstream_conditions()))
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 191, in _iter_upstream_conditions
map_indexes = _get_relevant_upstream_map_indexes(upstream_id)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 138, in _get_relevant_upstream_map_indexes
return ti.get_relevant_upstream_map_indexes(
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2652, in get_relevant_upstream_map_indexes
ancestor_map_index = self.map_index * ancestor_ti_count // ti_count

What you think should happen instead

In case of empty list all the task group should be skipped

How to reproduce

 from airflow.operators.bash import BashOperator
from airflow.operators.python import get_current_context
import pendulum

from airflow.decorators import dag, task, task_group
from airflow.operators.empty import EmptyOperator


@dag(dag_id="test", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
     schedule=None, catchup=False,
     render_template_as_native_obj=True
     )
def testdag():
    task1 =EmptyOperator(task_id="get_attribute_can_json_mapping")
    @task
    def lkp_schema_output_mapping(**context):
        return 1

    @task
    def task2(**context):
        return 2

    @task
    def task3(table_list, **context):
        return []

    [task2(), task1,
     group2.expand(file_name=task3(table_list=task2()))]


@task_group(
    group_id="group2"
)
def group2(file_name):
    @task
    def get_table_name(name):
        return "testing"

    table_name = get_table_name(file_name)

    run_this = BashOperator(
        task_id="run_this",
        bash_command="echo {{task_instance.xcom_pull(task_ids='copy_to_staging.get_table_name',"
                               "map_indexes=task_instance.map_index)}}",
    )

    table_name >> run_this




dag = testdag()

if __name__ == "__main__":
    dag.test()

Operating System

Debian GNU/Linux 11

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==7.1.0
apache-airflow-providers-apache-cassandra==3.1.0
apache-airflow-providers-apache-drill==2.3.1
apache-airflow-providers-apache-druid==3.3.1
apache-airflow-providers-apache-hdfs==3.2.0
apache-airflow-providers-apache-hive==5.1.1
apache-airflow-providers-apache-pinot==4.0.1
apache-airflow-providers-arangodb==2.1.0
apache-airflow-providers-celery==3.1.0
apache-airflow-providers-cloudant==3.1.0
apache-airflow-providers-cncf-kubernetes==5.1.1
apache-airflow-providers-common-sql==1.3.3
apache-airflow-providers-databricks==4.0.0
apache-airflow-providers-docker==3.4.0
apache-airflow-providers-elasticsearch==4.3.3
apache-airflow-providers-exasol==4.1.3
apache-airflow-providers-ftp==3.3.0
apache-airflow-providers-google==8.8.0
apache-airflow-providers-grpc==3.1.0
apache-airflow-providers-hashicorp==3.2.0
apache-airflow-providers-http==4.1.1
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-influxdb==2.1.0
apache-airflow-providers-microsoft-azure==5.1.0
apache-airflow-providers-microsoft-mssql==3.3.2
apache-airflow-providers-mongo==3.1.1
apache-airflow-providers-mysql==4.0.0
apache-airflow-providers-neo4j==3.2.1
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-presto==4.2.1
apache-airflow-providers-redis==3.1.0
apache-airflow-providers-sendgrid==3.1.0
apache-airflow-providers-sftp==4.2.1
apache-airflow-providers-slack==7.2.0
apache-airflow-providers-sqlite==3.3.1
apache-airflow-providers-ssh==3.4.0
apache-airflow-providers-trino==4.3.1
apache-airflow-providers-vertica==3.3.1

Deployment

Other

Deployment details

No response

Anything else

I have manually changed below in the taskinstance.py(get_relevant_upstream_map_indexes method) and it ran fine. Please check if you can implement the same

    if ti_count is None or ti_count == 0:
        return None

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions