Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions chart/templates/_helpers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1202,3 +1202,26 @@ Usage:
{{- end -}}
{{- $result -}}
{{- end -}}

{{/*
Convert dagBundleConfigList YAML list to JSON string for dag_bundle_config_list.
This helper function converts the structured YAML format to the JSON string
format required by Airflow's dag_bundle_config_list configuration.

Usage:
config:
dag_processor:
dag_bundle_config_list: '{{ include "dag_bundle_config_list" . }}'
*/}}
{{- define "dag_bundle_config_list" -}}
{{- if .Values.dagProcessor.dagBundleConfigList -}}
{{- $bundles := list -}}
{{- range .Values.dagProcessor.dagBundleConfigList -}}
{{- $bundle := dict "name" .name "classpath" .classpath "kwargs" (.kwargs | default dict) -}}
{{- $bundles = append $bundles $bundle -}}
{{- end -}}
{{- $bundles | toJson -}}
{{- else -}}
{{- "[]" -}}
{{- end -}}
{{- end }}
42 changes: 42 additions & 0 deletions chart/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -4382,6 +4382,48 @@
],
"default": null
},
"dagBundleConfigList": {
"description": "Define Dag bundles in a structured YAML format. This will be automatically converted to JSON string format for config.dag_processor.dag_bundle_config_list.",
"type": "array",
"default": [
{
"name": "dags-folder",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {}
}
],
"items": {
"type": "object",
"additionalProperties": false,
"required": [
"name",
"classpath"
],
"properties": {
"name": {
"description": "Name of the Dag bundle.",
"type": "string"
},
"classpath": {
"description": "Class path of the Dag bundle class.",
"type": "string"
},
"kwargs": {
"description": "Keyword arguments for the Dag bundle.",
"type": "object",
"default": {},
"additionalProperties": {
"type": [
"string",
"number",
"boolean",
"null"
]
}
}
}
}
},
"livenessProbe": {
"description": "Liveness probe configuration for dag processor.",
"type": "object",
Expand Down
34 changes: 34 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2267,6 +2267,34 @@ triggerer:
# Airflow Dag Processor Config
dagProcessor:
enabled: ~

# Dag Bundle Configuration
# Define Dag bundles in a structured YAML format. This will be automatically
# converted to JSON string format for config.dag_processor.dag_bundle_config_list.
dagBundleConfigList:
- name: dags-folder
classpath: "airflow.dag_processing.bundles.local.LocalDagBundle"
kwargs: {}
# Example:
# dagBundleConfigList:
# - name: bundle1
# classpath: "airflow.providers.git.bundles.git.GitDagBundle"
# kwargs:
# git_conn_id: "GITHUB__repo1"
# subdir: "dags"
# tracking_ref: "main"
# refresh_interval: 60
# - name: bundle2
# classpath: "airflow.providers.git.bundles.git.GitDagBundle"
# kwargs:
# git_conn_id: "GITHUB__repo2"
# subdir: "dags"
# tracking_ref: "develop"
# refresh_interval: 120
# - name: dags-folder
# classpath: "airflow.dag_processing.bundles.local.LocalDagBundle"
# kwargs: {}

# Number of airflow dag processors in the deployment
replicas: 1
# Max number of old replicasets to retain
Expand Down Expand Up @@ -3268,6 +3296,12 @@ config:
statsd_host: '{{ printf "%s-statsd" (include "airflow.fullname" .) }}'
# `run_duration` included for Airflow 1.10 backward compatibility; removed in 2.0.
run_duration: 41460
dag_processor:
# Dag bundle configuration list in JSON string format.
# This is automatically generated from .Values.dagProcessor.dagBundleConfigList using the dag_bundle_config_list helper function.
# Deprecated: Direct override via config.dag_processor.dag_bundle_config_list is deprecated.
# Use dagProcessor.dagBundleConfigList instead.
dag_bundle_config_list: '{{ include "dag_bundle_config_list" . }}'
elasticsearch:
json_format: 'True'
log_id_template: "{dag_id}_{task_id}_{execution_date}_{try_number}"
Expand Down
77 changes: 77 additions & 0 deletions helm-tests/tests/helm_tests/airflow_aux/test_configmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
# under the License.
from __future__ import annotations

import json
import re

import jmespath
import pytest
from chart_utils.helm_template_generator import render_chart
Expand Down Expand Up @@ -374,3 +377,77 @@ def test_expected_celery_sync_parallelism(self, scheduler_cpu_limit, expected_sy
)
config = jmespath.search('data."airflow.cfg"', configmap[0])
assert f"\nsync_parallelism = {expected_sync_parallelism}\n" in config

def test_dag_bundle_config_list(self):
"""Test dag_bundle_config_list is generated from dagProcessor.dagBundleConfigList."""
docs = render_chart(
values={
"dagProcessor": {
"dagBundleConfigList": [
{
"name": "bundle1",
"classpath": "airflow.providers.git.bundles.git.GitDagBundle",
"kwargs": {
"git_conn_id": "GITHUB__repo1",
"subdir": "dags",
"tracking_ref": "main",
"refresh_interval": 60,
},
},
{
"name": "bundle2",
"classpath": "airflow.providers.git.bundles.git.GitDagBundle",
"kwargs": {
"git_conn_id": "GITHUB__repo2",
"subdir": "dags",
"tracking_ref": "develop",
"refresh_interval": 120,
},
},
{
"name": "dags-folder",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {},
},
],
},
},
show_only=["templates/configmaps/configmap.yaml"],
)

cfg = jmespath.search('data."airflow.cfg"', docs[0])
assert "[dag_processor]" in cfg

# Extract the JSON value from dag_bundle_config_list
match = re.search(r"dag_bundle_config_list\s*=\s*(.+)", cfg)
assert match is not None, "dag_bundle_config_list not found in config"
json_str = match.group(1).strip()

# Parse and validate JSON structure
bundles = json.loads(json_str)
assert isinstance(bundles, list), "dag_bundle_config_list should be a list"
assert len(bundles) == 3, f"Expected 3 bundles, got {len(bundles)}"

# Verify bundle1
bundle1 = next((b for b in bundles if b["name"] == "bundle1"), None)
assert bundle1 is not None, "bundle1 not found"
assert bundle1["classpath"] == "airflow.providers.git.bundles.git.GitDagBundle"
assert bundle1["kwargs"]["git_conn_id"] == "GITHUB__repo1"
assert bundle1["kwargs"]["subdir"] == "dags"
assert bundle1["kwargs"]["tracking_ref"] == "main"
assert bundle1["kwargs"]["refresh_interval"] == 60

# Verify bundle2
bundle2 = next((b for b in bundles if b["name"] == "bundle2"), None)
assert bundle2 is not None, "bundle2 not found"
assert bundle2["classpath"] == "airflow.providers.git.bundles.git.GitDagBundle"
assert bundle2["kwargs"]["git_conn_id"] == "GITHUB__repo2"
assert bundle2["kwargs"]["subdir"] == "dags"
assert bundle2["kwargs"]["tracking_ref"] == "develop"
assert bundle2["kwargs"]["refresh_interval"] == 120

# Verify dags-folder
dags_folder = next((b for b in bundles if b["name"] == "dags-folder"), None)
assert dags_folder is not None, "dags-folder not found"
assert dags_folder["classpath"] == "airflow.dag_processing.bundles.local.LocalDagBundle"
assert dags_folder["kwargs"] == {}