-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
cncf-kubernetes
Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes==8.4.2
Apache Airflow version
2.10.3
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
What happened
When a pod launched by KubernetesPodOperator is evicted due to node-level disk pressure(or any other reason), and the task is configured with retries, the second retry attempt fails because the KPO detects the evicted pod as already existing and tries to reuse it. This results in a 400 Bad Request from the Kubernetes API when attempting to fetch logs from a non-existent container (base) within the evicted pod.
testdag-testtask-474-60z3cjb3 0/1 Evicted 0 20h
During second try.
Found matching pod testdag-testtask-474-60z3cjb3 with labels {'airflow_kpo_in_cluster': 'True', 'airflow_version': '2.10.3', 'app': 'airflow', 'dag_id': 'testdag', 'kubernetes_pod_operator': 'True', 'run_id': 'scheduled__2025-07-02T0500000000-7534437cd', 'task_id': 'testtask', 'try_number': '1'}
K8s exception:
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Audit-Id': '02839004-5cb2-4628-81d8-51dfc8d9e079', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Thu, 03 Jul 2025 05:22:24 GMT', 'Content-Length': '212'})
HTTP response body: b'{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"container \"base\" in pod \"testdag-testtask-474-60z3cjb3\" is not available","reason":"BadRequest","code":400}\n'
[2025-07-03, 05:22:24 UTC] {taskinstance.py:1225} INFO - Marking task as FAILED.
What you think should happen instead
When a pod is evicted:
-
It should not be reused by KPO in subsequent retries.
-
The operator should recognise the pod’s status (Evicted) and launch a new pod.
How to reproduce
Step 1: Deploy a Standalone Pod from Airflow (via KPO)
Create a simple test DAG using KubernetesPodOperator.
Example:
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from datetime import datetime, timedelta
with DAG(
"kpo_evict_test",
start_date=datetime(2025, 7, 8),
schedule_interval=None,
catchup=False,
) as dag:
test_task = KubernetesPodOperator(
task_id="evict_me",
name="evict-me-pod",
namespace="default",
image="busybox",
cmds=["sh", "-c", "sleep 6000"],
retries=1,
retry_delay=timedelta(minutes=1),
get_logs=True,
)Step 2: Trigger the Dag and wait for Pod to start;
Watch the pod:
kubectl get pods -n default -l task_id=evict_me -w
Wait until the pod is in the Running or Pending state.
Step 3: Manually Evict the Pod
Once the pod is up (e.g., evict-me-pod-),
run:
kubectl evict pod/<pod-name> --namespace default --force
This will evict the pod, and its status will become:
kubectl get pod <pod-name>
# Output:
# NAME READY STATUS RESTARTS AGE
# evict-me-pod-abc123 0/1 Evicted 0 3m
Step 4: Watch the Retry Attempt in Airflow
-
Airflow will detect that the task failed and will retry.
-
On retry, KPO will find the old evicted pod (via label selector).
-
It will attempt to reuse it or fetch logs from the base container (which doesn’t exist anymore).
-
This results in a 400 Bad Request, and the task is marked FAILED.
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct