Skip to content

Commit

Permalink
updated log messages and max_active_tasks_include_deferred config des…
Browse files Browse the repository at this point in the history
…cription
  • Loading branch information
gopidesupavan committed Aug 23, 2024
1 parent 4897a9e commit 42a1ae6
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 24 deletions.
5 changes: 2 additions & 3 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,10 @@ core:
default: "16"
max_active_tasks_include_deferred:
description: |
It includes the active deferred task count in the calculation for max_active_tasks. To calculate
If True, deferred tasks are considered active when calculating max_active_tasks. To calculate
the number of tasks that is running concurrently for a DAG with this config, add up the number of
running and deferred tasks for all DAG runs of the DAG. This is configurable at the DAG level with
``max_active_tasks_include_deferred``, which is defaulted as
``[core] max_active_tasks_include_deferred``.
``max_active_tasks_include_deferred``, which defaults to this value.
version_added: ~
type: boolean
example: ~
Expand Down
49 changes: 28 additions & 21 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,35 +494,42 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -
current_active_tasks_per_dag = concurrency_map.dag_active_tasks_map[dag_id]
max_active_tasks_per_dag_limit = task_instance.dag_model.max_active_tasks

msg_description_0 = "DAG %s has %s/%s running and queued tasks"
msg_description_1 = (
"Not executing %s since the number of tasks running or queued from DAG "
"%s is >= to the DAG's max_active_tasks limit of %s"
)

if task_instance.dag_model.max_active_tasks_include_deferred:
current_active_tasks_per_dag += concurrency_map_deferred.dag_active_tasks_map[dag_id]

msg_description_0 = "DAG %s has %s/%s running, queued and deferred tasks"
msg_description_1 = (
"Not executing %s since the number of tasks running or queued or "
"deferred from DAG"
"%s is >= to the DAG's max_active_tasks limit of %s"
if task_instance.dag_model.max_active_tasks_include_deferred:
self.log.info(
"DAG %s has %s/%s running, queued and deferred tasks",
dag_id,
current_active_tasks_per_dag,
max_active_tasks_per_dag_limit,
)

self.log.info(
msg_description_0,
dag_id,
current_active_tasks_per_dag,
max_active_tasks_per_dag_limit,
)
if current_active_tasks_per_dag >= max_active_tasks_per_dag_limit:
else:
self.log.info(
msg_description_1,
task_instance,
"DAG %s has %s/%s running and queued tasks",
dag_id,
current_active_tasks_per_dag,
max_active_tasks_per_dag_limit,
)

if current_active_tasks_per_dag >= max_active_tasks_per_dag_limit:
if task_instance.dag_model.max_active_tasks_include_deferred:
self.log.info(
"Not executing %s since the number of tasks running or queued or "
"deferred from DAG"
"%s is >= to the DAG's max_active_tasks limit of %s",
task_instance,
dag_id,
max_active_tasks_per_dag_limit,
)
else:
self.log.info(
"Not executing %s since the number of tasks running or queued from DAG "
"%s is >= to the DAG's max_active_tasks limit of %s",
task_instance,
dag_id,
max_active_tasks_per_dag_limit,
)
starved_dags.add(dag_id)
continue

Expand Down

0 comments on commit 42a1ae6

Please sign in to comment.