Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,11 @@ def message(self) -> str | None:
ConfigChange(
config=ConfigParameter("database", "load_default_connections"),
),
# triggerer
ConfigChange(
config=ConfigParameter("triggerer", "default_capacity"),
renamed_to=ConfigParameter("triggerer", "capacity"),
),
]


Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2587,7 +2587,7 @@ scheduler:
triggerer:
description: ~
options:
default_capacity:
capacity:
description: |
How many triggers a single Triggerer will run at once, by default.
version_added: 2.2.0
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ def sensitive_config_values(self) -> set[tuple[str, str]]:
("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"),
("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"),
("api", "access_logfile"): ("webserver", "access_logfile", "3.0"),
("triggerer", "capacity"): ("triggerer", "default_capacity", "3.0"),
}

# A mapping of new section -> (old section, since_version).
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def __init__(
):
super().__init__(job)
if capacity is None:
self.capacity = conf.getint("triggerer", "default_capacity", fallback=1000)
self.capacity = conf.getint("triggerer", "capacity")
elif isinstance(capacity, int) and capacity > 0:
self.capacity = capacity
else:
Expand Down
6 changes: 6 additions & 0 deletions chart/templates/_helpers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,12 @@ server_tls_key_file = /etc/pgbouncer/server.key
{{- include "_serviceAccountName" (merge (dict "key" "triggerer") .) -}}
{{- end }}

{{/* Determine trigger capacity, taking Airflow 2 and 3 config option differences into account */}}
{{- define "triggerer.capacity" -}}
{{- $triggerer_section := .Values.config.triggerer | default dict }}
{{- $triggerer_section.capacity | default $triggerer_section.default_capacity | default 1000 | int -}}
{{- end -}}

{{/* Create the name of the dag processor service account to use */}}
{{- define "dagProcessor.serviceAccountName" -}}
{{- include "_serviceAccountName" (merge (dict "key" "dagProcessor" "nameSuffix" "dag-processor") .) -}}
Expand Down
2 changes: 1 addition & 1 deletion chart/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -3580,7 +3580,7 @@
"query": {
"description": "Query to use for KEDA autoscaling. Must return a single integer.",
"type": "string",
"default": "SELECT ceil(COUNT(*)::decimal / {{ .Values.config.triggerer.default_capacity }}) FROM trigger"
"default": "SELECT ceil(COUNT(*)::decimal / {{ include \"triggerer.capacity\" . }}) FROM trigger"
},
"usePgbouncer": {
"description": "Whether to use PGBouncer to connect to the database or not when it is enabled. This configuration will be ignored if PGBouncer is not enabled.",
Expand Down
5 changes: 2 additions & 3 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,7 @@ triggerer:

# Query to use for KEDA autoscaling. Must return a single integer.
query: >-
SELECT ceil(COUNT(*)::decimal / {{ .Values.config.triggerer.default_capacity }})
SELECT ceil(COUNT(*)::decimal / {{ include "triggerer.capacity" . }})
FROM trigger

# Whether to use PGBouncer to connect to the database or not when it is enabled
Expand Down Expand Up @@ -2710,8 +2710,7 @@ config:
worker_container_repository: '{{ .Values.images.airflow.repository | default .Values.defaultAirflowRepository }}'
worker_container_tag: '{{ .Values.images.airflow.tag | default .Values.defaultAirflowTag }}'
multi_namespace_mode: '{{ ternary "True" "False" .Values.multiNamespaceMode }}'
triggerer:
default_capacity: 1000

# yamllint enable rule:line-length

# Whether Airflow can launch workers and/or pods in multiple namespaces
Expand Down
2 changes: 1 addition & 1 deletion helm_tests/airflow_core/test_triggerer.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ def test_include_event_source_container_name_in_scaled_object_for_triggerer(self
),
# test custom template query
(
"SELECT ceil(COUNT(*)::decimal / {{ mul .Values.config.triggerer.default_capacity 2 }})"
'SELECT ceil(COUNT(*)::decimal / {{ mul (include "triggerer.capacity" . | int) 2 }})'
" FROM trigger",
"SELECT ceil(COUNT(*)::decimal / 2000) FROM trigger",
),
Expand Down
Loading