Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
## Airflow Dag Processor Deployment
#################################
{{- if semverCompare ">=2.3.0" .Values.airflowVersion }}
{{- if .Values.dagProcessor.enabled }}
{{- $enabled := .Values.dagProcessor.enabled }}
{{- if eq $enabled nil}}
{{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }}
{{- end }}
{{- if $enabled }}
{{- $nodeSelector := or .Values.dagProcessor.nodeSelector .Values.nodeSelector }}
{{- $affinity := or .Values.dagProcessor.affinity .Values.affinity }}
{{- $tolerations := or .Values.dagProcessor.tolerations .Values.tolerations }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
## Airflow Dag Processor ServiceAccount
#################################
{{- if semverCompare ">=2.3.0" .Values.airflowVersion }}
{{- if and .Values.dagProcessor.serviceAccount.create .Values.dagProcessor.enabled }}
{{- $enabled := .Values.dagProcessor.enabled }}
{{- if eq $enabled nil}}
{{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }}
{{- end }}
{{- if and .Values.dagProcessor.serviceAccount.create $enabled }}
apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: {{ .Values.dagProcessor.serviceAccount.automountServiceAccountToken }}
Expand Down
6 changes: 5 additions & 1 deletion chart/templates/scheduler/scheduler-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
# If we're using a StatefulSet
{{- $stateful := and $local $persistence }}
# We can skip DAGs mounts on scheduler if dagProcessor is enabled, except with $local mode
{{- $localOrDagProcessorDisabled := or (not .Values.dagProcessor.enabled) $local }}
{{- $dagProcessorEnabled := .Values.dagProcessor.enabled }}
{{- if eq $dagProcessorEnabled nil}}
{{ $dagProcessorEnabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }}
{{- end }}
{{- $localOrDagProcessorDisabled := or (not $dagProcessorEnabled) $local }}
# If we're using elasticsearch or opensearch logging
{{- $remoteLogging := or .Values.elasticsearch.enabled .Values.opensearch.enabled }}
{{- $nodeSelector := or .Values.scheduler.nodeSelector .Values.nodeSelector }}
Expand Down
7 changes: 5 additions & 2 deletions chart/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -3586,8 +3586,11 @@
"properties": {
"enabled": {
"description": "Enable standalone dag processor (requires Airflow 2.3.0+).",
"type": "boolean",
"default": false
"type": [
"boolean",
"null"
],
"default": null
},
"livenessProbe": {
"description": "Liveness probe configuration for dag processor.",
Expand Down
4 changes: 2 additions & 2 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1798,7 +1798,7 @@ triggerer:

# Airflow Dag Processor Config
dagProcessor:
enabled: false
enabled: ~
# Number of airflow dag processors in the deployment
replicas: 1
# Max number of old replicasets to retain
Expand Down Expand Up @@ -2640,7 +2640,7 @@ config:
flower_url_prefix: '{{ ternary "" .Values.ingress.flower.path (eq .Values.ingress.flower.path "/") }}'
worker_concurrency: 16
scheduler:
standalone_dag_processor: '{{ ternary "True" "False" .Values.dagProcessor.enabled }}'
standalone_dag_processor: '{{ ternary "True" "False" (or (semverCompare ">=3.0.0" .Values.airflowVersion) (.Values.dagProcessor.enabled | default false)) }}'
# statsd params included for Airflow 1.10 backward compatibility; moved to [metrics] in 2.0
statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
statsd_port: 9125
Expand Down
2 changes: 2 additions & 0 deletions helm_tests/airflow_aux/test_basic_helm_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ def test_basic_deployments(self, version):
expected.update(
(
("Deployment", "test-basic-api-server"),
("Deployment", "test-basic-dag-processor"),
("Service", "test-basic-api-server"),
("ServiceAccount", "test-basic-api-server"),
("ServiceAccount", "test-basic-dag-processor"),
("Service", "test-basic-triggerer"),
)
)
Expand Down
39 changes: 39 additions & 0 deletions helm_tests/airflow_aux/test_configmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,42 @@ def test_expected_default_dag_folder(self, dag_values, expected_default_dag_fold
cfg = jmespath.search('data."airflow.cfg"', docs[0])
expected_folder_config = f"dags_folder = {expected_default_dag_folder}"
assert expected_folder_config in cfg.splitlines()

@pytest.mark.parametrize(
"airflow_version, enabled",
[
("2.10.4", False),
("3.0.0", True),
],
)
def test_default_standalone_dag_processor_by_airflow_version(self, airflow_version, enabled):
docs = render_chart(
values={"airflowVersion": airflow_version},
show_only=["templates/configmaps/configmap.yaml"],
)

cfg = jmespath.search('data."airflow.cfg"', docs[0])
expected_line = f"standalone_dag_processor = {enabled}"
assert expected_line in cfg.splitlines()

@pytest.mark.parametrize(
"airflow_version, enabled",
[
("2.10.4", False),
("2.10.4", True),
("3.0.0", False),
("3.0.0", True),
],
)
def test_standalone_dag_processor_explicit(self, airflow_version, enabled):
docs = render_chart(
values={
"airflowVersion": airflow_version,
"config": {"scheduler": {"standalone_dag_processor": enabled}},
},
show_only=["templates/configmaps/configmap.yaml"],
)

cfg = jmespath.search('data."airflow.cfg"', docs[0])
expected_line = f"standalone_dag_processor = {str(enabled).lower()}"
assert expected_line in cfg.splitlines()
39 changes: 38 additions & 1 deletion helm_tests/airflow_core/test_dag_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,44 @@ def test_only_exists_on_new_airflow_versions(self, airflow_version, num_docs):
show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
)

assert num_docs == len(docs)
assert len(docs) == num_docs

@pytest.mark.parametrize(
"airflow_version, num_docs",
[
("2.10.4", 0),
("3.0.0", 1),
],
)
def test_enabled_by_airflow_version(self, airflow_version, num_docs):
"""Tests that Dag Processor is enabled by default with Airflow 3"""
docs = render_chart(
values={"airflowVersion": airflow_version},
show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
)

assert len(docs) == num_docs

@pytest.mark.parametrize(
"airflow_version, enabled",
[
("2.10.4", False),
("2.10.4", True),
("3.0.0", False),
("3.0.0", True),
],
)
def test_enabled_explicit(self, airflow_version, enabled):
"""Tests that Dag Processor can be enabled/disabled regardless of version"""
docs = render_chart(
values={"airflowVersion": airflow_version, "dagProcessor": {"enabled": enabled}},
show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
)

if enabled:
assert len(docs) == 1
else:
assert len(docs) == 0

def test_can_be_disabled(self):
"""Standalone Dag Processor is disabled by default."""
Expand Down
47 changes: 31 additions & 16 deletions helm_tests/airflow_core/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,34 +759,49 @@ def test_dags_gitsync_sidecar_and_init_container(self, dags_values):
]

@pytest.mark.parametrize(
"dag_processor, executor, skip_dags_mount",
"airflow_version, dag_processor, executor, skip_dags_mount",
[
(True, "LocalExecutor", False),
(True, "CeleryExecutor", True),
(True, "KubernetesExecutor", True),
(True, "LocalKubernetesExecutor", False),
(False, "LocalExecutor", False),
(False, "CeleryExecutor", False),
(False, "KubernetesExecutor", False),
(False, "LocalKubernetesExecutor", False),
# standalone dag_processor is optional on 2.10, so we can skip dags for non-local if its on
("2.10.4", True, "LocalExecutor", False),
("2.10.4", True, "CeleryExecutor", True),
("2.10.4", True, "KubernetesExecutor", True),
("2.10.4", True, "LocalKubernetesExecutor", False),
# but if standalone dag_processor is off, we must always have dags
("2.10.4", False, "LocalExecutor", False),
("2.10.4", False, "CeleryExecutor", False),
("2.10.4", False, "KubernetesExecutor", False),
("2.10.4", False, "LocalKubernetesExecutor", False),
# by default, we don't have a standalone dag_processor
("2.10.4", None, "LocalExecutor", False),
("2.10.4", None, "CeleryExecutor", False),
("2.10.4", None, "KubernetesExecutor", False),
("2.10.4", None, "LocalKubernetesExecutor", False),
# but in airflow 3, standalone dag_processor required, so we again can skip dags for non-local
("3.0.0", None, "LocalExecutor", False),
("3.0.0", None, "CeleryExecutor", True),
("3.0.0", None, "KubernetesExecutor", True),
("3.0.0", None, "LocalKubernetesExecutor", False),
],
)
def test_dags_mount_and_gitsync_expected_with_dag_processor(
self, dag_processor, executor, skip_dags_mount
self, airflow_version, dag_processor, executor, skip_dags_mount
):
"""
DAG Processor can move gitsync and DAGs mount from the scheduler to the DAG Processor only.

The only exception is when we have a Local executor.
In these cases, the scheduler does the worker role and needs access to DAGs anyway.
"""
values = {
"airflowVersion": airflow_version,
"executor": executor,
"dags": {"gitSync": {"enabled": True}, "persistence": {"enabled": True}},
"scheduler": {"logGroomerSidecar": {"enabled": False}},
}
if dag_processor is not None:
values["dagProcessor"] = {"enabled": dag_processor}
docs = render_chart(
values={
"dagProcessor": {"enabled": dag_processor},
"executor": executor,
"dags": {"gitSync": {"enabled": True}, "persistence": {"enabled": True}},
"scheduler": {"logGroomerSidecar": {"enabled": False}},
},
values=values,
show_only=["templates/scheduler/scheduler-deployment.yaml"],
)

Expand Down
8 changes: 8 additions & 0 deletions helm_tests/security/test_rbac.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@

CUSTOM_SERVICE_ACCOUNT_NAMES = (
(CUSTOM_SCHEDULER_NAME := "TestScheduler"),
(CUSTOM_DAG_PROCESSOR_NAME := "TestDagProcessor"),
(CUSTOM_WEBSERVER_NAME := "TestWebserver"),
(CUSTOM_API_SERVER_NAME := "TestAPISserver"),
(CUSTOM_WORKER_NAME := "TestWorker"),
Expand Down Expand Up @@ -120,10 +121,12 @@ def _get_object_tuples(version, sa: bool = True):
(
("Service", "test-rbac-api-server"),
("Deployment", "test-rbac-api-server"),
("Deployment", "test-rbac-dag-processor"),
)
)
if sa:
tuples.append(("ServiceAccount", "test-rbac-api-server"))
tuples.append(("ServiceAccount", "test-rbac-dag-processor"))
return tuples

@parametrize_version
Expand All @@ -148,6 +151,7 @@ def test_deployments_no_rbac_no_sa(self, version):
},
"redis": {"serviceAccount": {"create": False}},
"scheduler": {"serviceAccount": {"create": False}},
"dagProcessor": {"serviceAccount": {"create": False}},
"webserver": {"serviceAccount": {"create": False}},
"apiServer": {"serviceAccount": {"create": False}},
"workers": {"serviceAccount": {"create": False}},
Expand Down Expand Up @@ -200,6 +204,7 @@ def test_deployments_with_rbac_no_sa(self, version):
},
},
"scheduler": {"serviceAccount": {"create": False}},
"dagProcessor": {"serviceAccount": {"create": False}},
"webserver": {"serviceAccount": {"create": False}},
"apiServer": {"serviceAccount": {"create": False}},
"workers": {"serviceAccount": {"create": False}},
Expand Down Expand Up @@ -260,6 +265,7 @@ def test_service_account_custom_names(self):
},
},
"scheduler": {"serviceAccount": {"name": CUSTOM_SCHEDULER_NAME}},
"dagProcessor": {"serviceAccount": {"name": CUSTOM_DAG_PROCESSOR_NAME}},
"webserver": {"serviceAccount": {"name": CUSTOM_WEBSERVER_NAME}},
"apiServer": {"serviceAccount": {"name": CUSTOM_API_SERVER_NAME}},
"workers": {"serviceAccount": {"name": CUSTOM_WORKER_NAME}},
Expand Down Expand Up @@ -298,6 +304,7 @@ def test_service_account_custom_names_in_objects(self):
},
},
"scheduler": {"serviceAccount": {"name": CUSTOM_SCHEDULER_NAME}},
"dagProcessor": {"serviceAccount": {"name": CUSTOM_DAG_PROCESSOR_NAME}},
"webserver": {"serviceAccount": {"name": CUSTOM_WEBSERVER_NAME}},
"apiServer": {"serviceAccount": {"name": CUSTOM_API_SERVER_NAME}},
"workers": {"serviceAccount": {"name": CUSTOM_WORKER_NAME}},
Expand Down Expand Up @@ -353,6 +360,7 @@ def test_service_account_without_resource(self):
]
service_account_names = [
"test-rbac-scheduler",
"test-rbac-dag-processor",
"test-rbac-webserver",
"test-rbac-api-server",
"test-rbac-triggerer",
Expand Down