Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions chart/templates/_helpers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1147,3 +1147,18 @@ capabilities:
app: keda-operator
{{- end }}
{{- end }}

{{/*
Convert a Kubernetes CPU limit (e.g., "500m", "1.5", "2", "750m") into an integer number of CPU cores.
*/}}
{{- define "cpu_count" -}}
{{- $v := toString . -}}
{{- if hasSuffix "m" $v -}}
{{- /* millicores path: e.g. 500m, 1500m */ -}}
{{- $m := float64 (trimSuffix "m" $v) -}}
{{- int (ceil (divf $m 1000)) -}}
{{- else -}}
{{- /* plain cores: e.g. 0.5, 1, 1.5 */ -}}
{{- int (ceil (float64 $v)) -}}
{{- end -}}
{{- end }}
1 change: 1 addition & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3011,6 +3011,7 @@ config:
celery:
flower_url_prefix: '{{ ternary "" .Values.ingress.flower.path (eq .Values.ingress.flower.path "/") }}'
worker_concurrency: 16
sync_parallelism: '{{ include "cpu_count" (((.Values.scheduler).resources).limits).cpu }}'
scheduler:
standalone_dag_processor: '{{ ternary "True" "False" (or (semverCompare ">=3.0.0" .Values.airflowVersion) (.Values.dagProcessor.enabled | default false)) }}'
# statsd params included for Airflow 1.10 backward compatibility; moved to [metrics] in 2.0
Expand Down
27 changes: 27 additions & 0 deletions helm-tests/tests/helm_tests/airflow_aux/test_configmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,30 @@ def test_execution_api_server_url(
assert "execution_api_server_url" not in config, (
"execution_api_server_url should not be set for Airflow 2.x versions"
)

@pytest.mark.parametrize(
("scheduler_cpu_limit", "expected_sync_parallelism"),
[
("1m", "1"),
("1000m", "1"),
("1001m", "2"),
("0.1", "1"),
("1", "1"),
("1.01", "2"),
(None, 0),
(0, 0),
],
)
def test_expected_celery_sync_parallelism(self, scheduler_cpu_limit, expected_sync_parallelism):
scheduler_resources_cpu_limit = {}
if scheduler_cpu_limit is not None:
scheduler_resources_cpu_limit = {
"scheduler": {"resources": {"limits": {"cpu": scheduler_cpu_limit}}}
}

configmap = render_chart(
values=scheduler_resources_cpu_limit,
show_only=["templates/configmaps/configmap.yaml"],
)
config = jmespath.search('data."airflow.cfg"', configmap[0])
assert f"\nsync_parallelism = {expected_sync_parallelism}\n" in config