Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chart/templates/_helpers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1188,7 +1188,7 @@ Usage:
{{- end -}}

{{/*
Convert dagBundleConfigList YAML list to JSON string for dag_bundle_config_list.
Convert `dagBundleConfigList` YAML list to JSON string for `dag_bundle_config_list`.
This helper function converts the structured YAML format to the JSON string
format required by Airflow's dag_bundle_config_list configuration.

Expand Down
219 changes: 142 additions & 77 deletions chart/templates/dag-processor/dag-processor-deployment.yaml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,97 @@
under the License.
*/}}

################################
## Airflow Dag Processor PodDisruptionBudget
#################################
{{- if semverCompare ">=2.3.0" .Values.airflowVersion }}
{{- $enabled := .Values.dagProcessor.enabled }}
{{- if eq $enabled nil}}
{{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }}
{{/*
Helper template for dag processor PodDisruptionBudget
Expects context with:
- .Values.dagProcessor: dag processor configuration (may be merged with bundle overrides)
- .bundleName: bundle name (empty string if not per-bundle mode)
- .deployPerBundle: boolean indicating if per-bundle mode is enabled
*/}}
{{- define "dag-processor.poddisruptionbudget" }}
{{- $bundleName := .bundleName | default "" }}
{{- $deployPerBundle := .deployPerBundle | default false }}
{{- $mergedConfig := .Values.dagProcessor }}
{{- $bundleOverrides := .bundleOverrides | default dict }}
{{- if $mergedConfig.podDisruptionBudget.enabled }}
{{- /* Determine PDB name suffix */}}
{{- $nameSuffix := "" }}
{{- if $deployPerBundle }}
{{- $nameSuffix = printf "-%s-pdb" $bundleName }}
{{- else }}
{{- $nameSuffix = "-pdb" }}
{{- end }}
{{- if and $enabled .Values.dagProcessor.podDisruptionBudget.enabled }}
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: {{ include "airflow.fullname" . }}-dag-processor-pdb
name: {{ include "airflow.fullname" . }}-dag-processor{{ $nameSuffix }}
labels:
tier: airflow
component: dag-processor
release: {{ .Release.Name }}
chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
heritage: {{ .Release.Service }}
{{- if or (.Values.labels) (.Values.dagProcessor.labels) }}
{{- mustMerge .Values.dagProcessor.labels .Values.labels | toYaml | nindent 4 }}
{{- if $deployPerBundle }}
bundle: {{ $bundleName }}
{{- end }}
{{- if or (.Values.labels) ($mergedConfig.labels) }}
{{- mustMerge $mergedConfig.labels .Values.labels | toYaml | nindent 4 }}
{{- end }}
spec:
selector:
matchLabels:
tier: airflow
component: dag-processor
release: {{ .Release.Name }}
{{- toYaml .Values.dagProcessor.podDisruptionBudget.config | nindent 2 }}
{{- if $deployPerBundle }}
bundle: {{ $bundleName }}
{{- end }}
{{- $pdbConfig := $mergedConfig.podDisruptionBudget.config }}
{{- if and $bundleOverrides.podDisruptionBudget $bundleOverrides.podDisruptionBudget.config }}
{{- $pdbConfig = $bundleOverrides.podDisruptionBudget.config }}
{{- end }}
{{- if and (hasKey $pdbConfig "minAvailable") (hasKey $pdbConfig "maxUnavailable") }}
{{- if ne (index $pdbConfig "minAvailable" | toString) "<nil>" }}
{{- $pdbConfig = omit $pdbConfig "maxUnavailable" }}
{{- end }}
{{- end }}
{{- toYaml $pdbConfig | nindent 2 }}
{{- end }}
{{- end }}

################################
## Airflow Dag Processor PodDisruptionBudget
#################################
{{- if semverCompare ">=2.3.0" .Values.airflowVersion }}
{{- $enabled := .Values.dagProcessor.enabled }}
{{- if eq $enabled nil}}
{{ $enabled = ternary true false (semverCompare ">=3.0.0" .Values.airflowVersion) }}
{{- end }}
{{- $deployPerBundle := .Values.dagProcessor.deployPerBundle.enabled | default false }}
{{- if and $enabled .Values.dagProcessor.podDisruptionBudget.enabled }}
{{- if not $deployPerBundle }}
{{- /* Single PDB mode: use base dagProcessor config */}}
{{- $context := dict "Values" (merge (dict "dagProcessor" .Values.dagProcessor) .Values) "Release" .Release "Chart" .Chart "Template" .Template "Files" .Files "bundleName" "" "deployPerBundle" false }}
{{- include "dag-processor.poddisruptionbudget" $context }}
{{- else }}
{{- /* Per-bundle PDB mode: create one PDB per bundle */}}
{{- $firstPDB := true }}
{{- range $bundle := .Values.dagProcessor.dagBundleConfigList }}
{{- $bundleName := $bundle.name }}
{{- $bundleOverrides := index $.Values.dagProcessor.deployPerBundle.bundleOverrides $bundleName | default dict | deepCopy }}
{{- $baseConfig := $.Values.dagProcessor | deepCopy }}
{{- $mergedConfig := mergeOverwrite $baseConfig $bundleOverrides }}
{{- $_ := set $mergedConfig "dagBundleConfigList" $.Values.dagProcessor.dagBundleConfigList }}
{{- if $mergedConfig.podDisruptionBudget.enabled }}
{{- if not $firstPDB }}
---
{{- end }}
{{- $firstPDB = false }}
{{- $bundleValues := merge (dict "dagProcessor" $mergedConfig) $.Values }}
{{- $context := dict "Values" $bundleValues "Release" $.Release "Chart" $.Chart "Template" $.Template "Files" $.Files "bundleName" $bundleName "deployPerBundle" true "bundleOverrides" $bundleOverrides }}
{{- include "dag-processor.poddisruptionbudget" $context }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}
42 changes: 42 additions & 0 deletions chart/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -4438,6 +4438,48 @@
}
}
},
"deployPerBundle": {
"description": "Per-bundle deployment option. When enabled, creates a separate deployment for each bundle in `dagBundleConfigList`.",
"type": "object",
"additionalProperties": false,
"properties": {
"enabled": {
"description": "Enable per-bundle deployments. When true, creates a separate deployment for each bundle.",
"type": "boolean",
"default": false
},
"args": {
"description": "Command args template for per-bundle deployments. `{{ bundleName }}` will be replaced with the actual bundle name.",
"type": "array",
"default": [
"bash",
"-c",
"exec airflow dag-processor --bundle-name {{ bundleName }}"
],
"items": {
"type": "string"
}
},
"bundleOverrides": {
"description": "Per-bundle specific overrides. Each key should match a bundle name from `dagBundleConfigList`.",
"type": "object",
"additionalProperties": {
"type": "object",
"additionalProperties": {}
},
"default": {}
}
},
"default": {
"enabled": false,
"args": [
"bash",
"-c",
"exec airflow dag-processor --bundle-name {{ bundleName }}"
],
"bundleOverrides": {}
}
},
"livenessProbe": {
"description": "Liveness probe configuration for dag processor.",
"type": "object",
Expand Down
33 changes: 33 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2295,6 +2295,39 @@ dagProcessor:
# classpath: "airflow.dag_processing.bundles.local.LocalDagBundle"
# kwargs: {}

# Per-bundle deployment option
# When enabled, creates a separate deployment for each bundle in `dagBundleConfigList`
deployPerBundle:
enabled: false
# Command args template for per-bundle deployments
# `{{ bundleName }}` will be replaced with the actual bundle name
args: ["bash", "-c", "exec airflow dag-processor --bundle-name {{ bundleName }}"]

# Per-bundle specific overrides (optional)
# Each key should match a bundle name from `dagBundleConfigList`
# These settings override the base dagProcessor configuration for each bundle
bundleOverrides: {}
# Example: Override settings for a specific bundle
# bundleOverrides:
# bundle1:
# replicas: 3
# resources:
# requests:
# memory: "2Gi"
# cpu: "1000m"
# limits:
# memory: "4Gi"
# cpu: "2000m"
# nodeSelector:
# workload-type: production
# env:
# - name: SOME_ENV_VAR
# value: "20"
# podDisruptionBudget:
# enabled: true
# config:
# minAvailable: 1

# Number of airflow dag processors in the deployment
replicas: 1
# Max number of old replicasets to retain
Expand Down
Loading
Loading