-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.5.1
What happened
Expanding of task group fails when the list is empty and there is a task which references mapped index in xcom pull of that group.
throws below error
Traceback (most recent call last):
File "/opt/bitnami/airflow/venv/bin/airflow", line 8, in
sys.exit(main())
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/main.py", line 39, in main
args.func(args)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 52, in command
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/cli.py", line 108, in wrapper
return f(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 73, in scheduler
_run_scheduler_job(args=args)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 43, in _run_scheduler_job
job.run()
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 258, in run
self._execute()
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 759, in _execute
self._run_scheduler_loop()
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 885, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 964, in _do_scheduling
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/retries.py", line 78, in wrapped_function
for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/tenacity/init.py", line 384, in iter
do = self.iter(retry_state=retry_state)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/tenacity/init.py", line 351, in iter
return fut.result()
File "/opt/bitnami/python/lib/python3.9/concurrent/futures/_base.py", line 439, in result
return self.__get_result()
File "/opt/bitnami/python/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/retries.py", line 87, in wrapped_function
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1253, in _schedule_all_dag_runs
callback_to_run = self._schedule_dag_run(dag_run, session)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1322, in _schedule_dag_run
schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/dagrun.py", line 563, in update_state
info = self.task_instance_scheduling_decisions(session)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/dagrun.py", line 710, in task_instance_scheduling_decisions
schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/dagrun.py", line 793, in _get_ready_tis
if not schedulable.are_dependencies_met(session=session, dep_context=dep_context):
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1070, in are_dependencies_met
for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, session=session):
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1091, in get_failed_dep_statuses
for dep_status in dep.get_dep_statuses(self, session, dep_context):
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 107, in get_dep_statuses
yield from self._get_dep_statuses(ti, session, cxt)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 93, in _get_dep_statuses
yield from self._evaluate_trigger_rule(ti=ti, dep_context=dep_context, session=session)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 219, in evaluate_trigger_rule
.filter(or(_iter_upstream_conditions()))
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 191, in _iter_upstream_conditions
map_indexes = _get_relevant_upstream_map_indexes(upstream_id)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/ti_deps/deps/trigger_rule_dep.py", line 138, in _get_relevant_upstream_map_indexes
return ti.get_relevant_upstream_map_indexes(
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2652, in get_relevant_upstream_map_indexes
ancestor_map_index = self.map_index * ancestor_ti_count // ti_count
What you think should happen instead
In case of empty list all the task group should be skipped
How to reproduce
from airflow.operators.bash import BashOperator
from airflow.operators.python import get_current_context
import pendulum
from airflow.decorators import dag, task, task_group
from airflow.operators.empty import EmptyOperator
@dag(dag_id="test", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None, catchup=False,
render_template_as_native_obj=True
)
def testdag():
task1 =EmptyOperator(task_id="get_attribute_can_json_mapping")
@task
def lkp_schema_output_mapping(**context):
return 1
@task
def task2(**context):
return 2
@task
def task3(table_list, **context):
return []
[task2(), task1,
group2.expand(file_name=task3(table_list=task2()))]
@task_group(
group_id="group2"
)
def group2(file_name):
@task
def get_table_name(name):
return "testing"
table_name = get_table_name(file_name)
run_this = BashOperator(
task_id="run_this",
bash_command="echo {{task_instance.xcom_pull(task_ids='copy_to_staging.get_table_name',"
"map_indexes=task_instance.map_index)}}",
)
table_name >> run_this
dag = testdag()
if __name__ == "__main__":
dag.test()
Operating System
Debian GNU/Linux 11
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==7.1.0
apache-airflow-providers-apache-cassandra==3.1.0
apache-airflow-providers-apache-drill==2.3.1
apache-airflow-providers-apache-druid==3.3.1
apache-airflow-providers-apache-hdfs==3.2.0
apache-airflow-providers-apache-hive==5.1.1
apache-airflow-providers-apache-pinot==4.0.1
apache-airflow-providers-arangodb==2.1.0
apache-airflow-providers-celery==3.1.0
apache-airflow-providers-cloudant==3.1.0
apache-airflow-providers-cncf-kubernetes==5.1.1
apache-airflow-providers-common-sql==1.3.3
apache-airflow-providers-databricks==4.0.0
apache-airflow-providers-docker==3.4.0
apache-airflow-providers-elasticsearch==4.3.3
apache-airflow-providers-exasol==4.1.3
apache-airflow-providers-ftp==3.3.0
apache-airflow-providers-google==8.8.0
apache-airflow-providers-grpc==3.1.0
apache-airflow-providers-hashicorp==3.2.0
apache-airflow-providers-http==4.1.1
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-influxdb==2.1.0
apache-airflow-providers-microsoft-azure==5.1.0
apache-airflow-providers-microsoft-mssql==3.3.2
apache-airflow-providers-mongo==3.1.1
apache-airflow-providers-mysql==4.0.0
apache-airflow-providers-neo4j==3.2.1
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-presto==4.2.1
apache-airflow-providers-redis==3.1.0
apache-airflow-providers-sendgrid==3.1.0
apache-airflow-providers-sftp==4.2.1
apache-airflow-providers-slack==7.2.0
apache-airflow-providers-sqlite==3.3.1
apache-airflow-providers-ssh==3.4.0
apache-airflow-providers-trino==4.3.1
apache-airflow-providers-vertica==3.3.1
Deployment
Other
Deployment details
No response
Anything else
I have manually changed below in the taskinstance.py(get_relevant_upstream_map_indexes method) and it ran fine. Please check if you can implement the same
if ti_count is None or ti_count == 0:
return None
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
