Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions chart/templates/NOTES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,14 @@ DEPRECATION WARNING:

{{- end }}

{{- if not (empty .Values.workers.podManagementPolicy) }}

DEPRECATION WARNING:
`workers.podManagementPolicy` has been renamed to `workers.celery.podManagementPolicy`.
Please change your values as support for the old name will be dropped in a future release.

{{- end }}

{{ if (semverCompare ">=3.0.0" .Values.airflowVersion) }}
#####################################################
# WARNING: You should set a static API secret key #
Expand Down
4 changes: 2 additions & 2 deletions chart/templates/workers/worker-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ spec:
tier: airflow
component: worker
release: {{ .Release.Name }}
{{- if and $persistence .Values.workers.podManagementPolicy }}
podManagementPolicy: {{ .Values.workers.podManagementPolicy }}
{{- if and $persistence (or .Values.workers.celery.podManagementPolicy .Values.workers.podManagementPolicy) }}
podManagementPolicy: {{ .Values.workers.celery.podManagementPolicy | default .Values.workers.podManagementPolicy }}
{{- end }}
{{- if and $persistence (or .Values.workers.celery.updateStrategy .Values.workers.updateStrategy) }}
updateStrategy: {{- toYaml (.Values.workers.celery.updateStrategy | default .Values.workers.updateStrategy) | nindent 4 }}
Expand Down
14 changes: 13 additions & 1 deletion chart/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1725,7 +1725,7 @@
"default": null
},
"podManagementPolicy": {
"description": "Specifies the policy for managing pods within the Airflow Celery worker. Only applicable to StatefulSet.",
"description": "Specifies the policy for managing pods within the Airflow Celery worker (deprecated, use `workers.celery.podManagementPolicy` instead). Only applicable to StatefulSet.",
"type": [
"null",
"string"
Expand Down Expand Up @@ -2682,6 +2682,18 @@
}
}
},
"podManagementPolicy": {
"description": "Specifies the policy for managing pods within the Airflow Celery worker. Only applicable to StatefulSet.",
"type": [
"null",
"string"
],
"default": null,
"enum": [
"OrderedReady",
"Parallel"
]
},
"serviceAccount": {
"description": "Create ServiceAccount.",
"type": "object",
Expand Down
5 changes: 5 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ workers:
maxUnavailable: "50%"

# Allow relaxing ordering guarantees for Airflow Celery worker while preserving its uniqueness and identity
# (deprecated, use `workers.celery.podManagementPolicy` instead)
# podManagementPolicy: Parallel

# When not set, the values defined in the global securityContext will
Expand Down Expand Up @@ -1065,6 +1066,10 @@ workers:
maxSurge: "100%"
maxUnavailable: "50%"

# Allow relaxing ordering guarantees for Airflow Celery worker
# while preserving its uniqueness and identity
# podManagementPolicy: Parallel

# Create ServiceAccount for Airflow Celery workers
serviceAccount:
# default value is true
Expand Down
39 changes: 38 additions & 1 deletion helm-tests/tests/helm_tests/airflow_core/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import jmespath
import pytest
from chart_utils.helm_template_generator import render_chart
from chart_utils.helm_template_generator import HelmFailedError, render_chart
from chart_utils.log_groomer import LogGroomerTestBase


Expand Down Expand Up @@ -1593,6 +1593,43 @@ def test_init_container_volume_permissions_not_exist(self, workers_values):
== 0
)

def test_pod_management_policy_default(self):
docs = render_chart(show_only=["templates/workers/worker-deployment.yaml"])
assert jmespath.search("spec.podManagementPolicy", docs[0]) is None

@pytest.mark.parametrize(
("workers_values", "expected"),
[
({"podManagementPolicy": "Parallel"}, "Parallel"),
({"podManagementPolicy": "OrderedReady"}, "OrderedReady"),
({"celery": {"podManagementPolicy": "Parallel"}}, "Parallel"),
({"celery": {"podManagementPolicy": "OrderedReady"}}, "OrderedReady"),
(
{"podManagementPolicy": "OrderedReady", "celery": {"podManagementPolicy": "Parallel"}},
"Parallel",
),
(
{"podManagementPolicy": "Parallel", "celery": {"podManagementPolicy": "OrderedReady"}},
"OrderedReady",
),
],
)
def test_pod_management_policy(self, workers_values, expected):
docs = render_chart(
values={"workers": workers_values}, show_only=["templates/workers/worker-deployment.yaml"]
)

assert jmespath.search("spec.podManagementPolicy", docs[0]) == expected

@pytest.mark.parametrize(
"workers_values", [{"podManagementPolicy": "Test"}, {"celery": {"podManagementPolicy": "Test"}}]
)
def test_pod_management_policy_not_valid_value(self, workers_values):
with pytest.raises(HelmFailedError):
render_chart(
values={"workers": workers_values}, show_only=["templates/workers/worker-deployment.yaml"]
)


class TestWorkerLogGroomer(LogGroomerTestBase):
"""Worker groomer."""
Expand Down