-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Labels
area:providerskind:bugThis is a clearly a bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetlabel for new issues that we didn't triage yetprovider:cncf-kubernetesKubernetes (k8s) provider related issuesKubernetes (k8s) provider related issues
Description
Apache Airflow Provider(s)
cncf-kubernetes
Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes==10.8.2
Apache Airflow version
3.1
Operating System
debian
Deployment
Astronomer
Deployment details
No response
What happened
I have created a simple KPO task with deferrable mode = true, and I have provided config in the Kubernetes connection
Failed with the following error:
[2025-10-21 11:01:15] ERROR - Task failed with exception source=task loc=task_runner.py:971
AirflowException: Traceback (most recent call last):
File "/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py", line 148, in run
state = await self._wait_for_pod_start()
File "/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py", line 213, in _wait_for_pod_start
pod = await self._get_pod()
File "/usr/python/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 189, in async_wrapped
return await copy(fn, *args, **kwargs)
File "/usr/python/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 111, in __call__
do = await self.iter(retry_state=retry_state)
File "/usr/python/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 153, in iter
result = await action(retry_state)
File "/usr/python/lib/python3.10/site-packages/tenacity/_utils.py", line 99, in inner
return call(*args, **kwargs)
File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 420, in exc_check
raise retry_exc.reraise()
File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 187, in reraise
raise self.last_attempt.result()
File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/usr/python/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 114, in __call__
result = await fn(*args, **kwargs)
File "/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py", line 276, in _get_pod
pod = await self.hook.get_pod(name=self.pod_name, namespace=self.pod_namespace)
File "/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 853, in get_pod
async with self.get_conn() as connection:
File "/usr/python/lib/python3.10/contextlib.py", line 199, in __aenter__
return await anext(self.gen)
File "/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 840, in get_conn
kube_client = await self._load_config() or async_client.ApiClient()
File "/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 805, in _load_config
await temp_config.write(kubeconfig.encode())
AttributeError: 'dict' object has no attribute 'encode'
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 919 in run
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1306 in _execute_task
File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 1632 in resume_execution
File "/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py", line 940 in trigger_reentry
What you think should happen instead
it should be able to handle dict in the config and work without any errors
How to reproduce
create a simple dag :
import os
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from pendulum import datetime
# Temporarily unset KUBECONFIG to force using connection config
if 'KUBECONFIG' in os.environ:
del os.environ['KUBECONFIG']
with DAG(
dag_id="kpo_localexec_astro_deferrable",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
) as dag:
echo_task = KubernetesPodOperator(
task_id="echo",
name="echo",
namespace="default",
kubernetes_conn_id="kubernetes_eks_like",
image="alpine:3.19",
cmds=["sh", "-c"],
arguments=["echo hello from KPO && uname -a"],
get_logs=True,
config_file=None,
is_delete_operator_pod=True,
deferrable=True,
)
Create a basic Kubernetes connection and leave everything but add kube_config as in extras
{
"kube_config": {
"apiVersion": "v1",
"clusters": [
{
"cluster": {
"server": "https://host.docker.internal:55413",
"insecure-skip-tls-verify": true
},
"name": "kind-airflow-kpo"
}
],
"contexts": [
{
"context": {
"cluster": "kind-airflow-kpo",
"user": "kind-airflow-kpo"
},
"name": "kind-airflow-kpo"
}
],
"current-context": "kind-airflow-kpo",
"kind": "Config",
"users": [
{
"name": "kind-airflow-kpo",
"user": {
"client-certificate-data": "abcd",
"client-key-data": "abcd"
}
}
]
}
}
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
johnhoran
Metadata
Metadata
Assignees
Labels
area:providerskind:bugThis is a clearly a bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetlabel for new issues that we didn't triage yetprovider:cncf-kubernetesKubernetes (k8s) provider related issuesKubernetes (k8s) provider related issues