From 681005693cb99c926bd4c747e36dda7b1baf0649 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Fri, 23 Aug 2024 21:04:45 +0100 Subject: [PATCH] updated log messages and max_active_tasks_include_deferred config description --- airflow/config_templates/config.yml | 5 ++- airflow/jobs/scheduler_job_runner.py | 49 ++++++++++++++++------------ 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 13d9613a0a5922..66e465954ea13c 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -109,11 +109,10 @@ core: default: "16" max_active_tasks_include_deferred: description: | - It includes the active deferred task count in the calculation for max_active_tasks. To calculate + If True, deferred tasks are considered active when calculating max_active_tasks. To calculate the number of tasks that is running concurrently for a DAG with this config, add up the number of running and deferred tasks for all DAG runs of the DAG. This is configurable at the DAG level with - ``max_active_tasks_include_deferred``, which is defaulted as - ``[core] max_active_tasks_include_deferred``. + ``max_active_tasks_include_deferred``, which defaults to this value. version_added: ~ type: boolean example: ~ diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 976c9ca0e52f87..2af94c476fda22 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -471,35 +471,42 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - current_active_tasks_per_dag = concurrency_map.dag_active_tasks_map[dag_id] max_active_tasks_per_dag_limit = task_instance.dag_model.max_active_tasks - msg_description_0 = "DAG %s has %s/%s running and queued tasks" - msg_description_1 = ( - "Not executing %s since the number of tasks running or queued from DAG " - "%s is >= to the DAG's max_active_tasks limit of %s" - ) - if task_instance.dag_model.max_active_tasks_include_deferred: current_active_tasks_per_dag += concurrency_map_deferred.dag_active_tasks_map[dag_id] - msg_description_0 = "DAG %s has %s/%s running, queued and deferred tasks" - msg_description_1 = ( - "Not executing %s since the number of tasks running or queued or " - "deferred from DAG" - "%s is >= to the DAG's max_active_tasks limit of %s" + if task_instance.dag_model.max_active_tasks_include_deferred: + self.log.info( + "DAG %s has %s/%s running, queued and deferred tasks", + dag_id, + current_active_tasks_per_dag, + max_active_tasks_per_dag_limit, ) - - self.log.info( - msg_description_0, - dag_id, - current_active_tasks_per_dag, - max_active_tasks_per_dag_limit, - ) - if current_active_tasks_per_dag >= max_active_tasks_per_dag_limit: + else: self.log.info( - msg_description_1, - task_instance, + "DAG %s has %s/%s running and queued tasks", dag_id, + current_active_tasks_per_dag, max_active_tasks_per_dag_limit, ) + + if current_active_tasks_per_dag >= max_active_tasks_per_dag_limit: + if task_instance.dag_model.max_active_tasks_include_deferred: + self.log.info( + "Not executing %s since the number of tasks running or queued or " + "deferred from DAG" + "%s is >= to the DAG's max_active_tasks limit of %s", + task_instance, + dag_id, + max_active_tasks_per_dag_limit, + ) + else: + self.log.info( + "Not executing %s since the number of tasks running or queued from DAG " + "%s is >= to the DAG's max_active_tasks limit of %s", + task_instance, + dag_id, + max_active_tasks_per_dag_limit, + ) starved_dags.add(dag_id) continue