Skip to content

KubernetesPodOperator fails in deferrable mode when connection contains kube_config as dict #56947

@manipatnam

Description

@manipatnam

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes==10.8.2

Apache Airflow version

3.1

Operating System

debian

Deployment

Astronomer

Deployment details

No response

What happened

I have created a simple KPO task with deferrable mode = true, and I have provided config in the Kubernetes connection

Failed with the following error:

[2025-10-21 11:01:15] ERROR - Task failed with exception source=task loc=task_runner.py:971
AirflowException: Traceback (most recent call last):
  File "/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py", line 148, in run
    state = await self._wait_for_pod_start()
  File "/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py", line 213, in _wait_for_pod_start
    pod = await self._get_pod()
  File "/usr/python/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 189, in async_wrapped
    return await copy(fn, *args, **kwargs)
  File "/usr/python/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 111, in __call__
    do = await self.iter(retry_state=retry_state)
  File "/usr/python/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 153, in iter
    result = await action(retry_state)
  File "/usr/python/lib/python3.10/site-packages/tenacity/_utils.py", line 99, in inner
    return call(*args, **kwargs)
  File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 420, in exc_check
    raise retry_exc.reraise()
  File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 187, in reraise
    raise self.last_attempt.result()
  File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/python/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 114, in __call__
    result = await fn(*args, **kwargs)
  File "/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py", line 276, in _get_pod
    pod = await self.hook.get_pod(name=self.pod_name, namespace=self.pod_namespace)
  File "/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 853, in get_pod
    async with self.get_conn() as connection:
  File "/usr/python/lib/python3.10/contextlib.py", line 199, in __aenter__
    return await anext(self.gen)
  File "/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 840, in get_conn
    kube_client = await self._load_config() or async_client.ApiClient()
  File "/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 805, in _load_config
    await temp_config.write(kubeconfig.encode())
AttributeError: 'dict' object has no attribute 'encode'
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 919 in run

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1306 in _execute_task

File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 1632 in resume_execution

File "/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py", line 940 in trigger_reentry

What you think should happen instead

it should be able to handle dict in the config and work without any errors

How to reproduce

create a simple dag :

import os
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from pendulum import datetime

# Temporarily unset KUBECONFIG to force using connection config
if 'KUBECONFIG' in os.environ:
    del os.environ['KUBECONFIG']

with DAG(
    dag_id="kpo_localexec_astro_deferrable",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
) as dag:
    
    echo_task = KubernetesPodOperator(
        task_id="echo",
        name="echo",
        namespace="default",
        kubernetes_conn_id="kubernetes_eks_like",
        image="alpine:3.19",
        cmds=["sh", "-c"],
        arguments=["echo hello from KPO && uname -a"],
        get_logs=True,
        config_file=None,
        is_delete_operator_pod=True,
        deferrable=True,
    )

Create a basic Kubernetes connection and leave everything but add kube_config as in extras

{
  "kube_config": {
    "apiVersion": "v1",
    "clusters": [
      {
        "cluster": {
          "server": "https://host.docker.internal:55413",
          "insecure-skip-tls-verify": true
        },
        "name": "kind-airflow-kpo"
      }
    ],
    "contexts": [
      {
        "context": {
          "cluster": "kind-airflow-kpo",
          "user": "kind-airflow-kpo"
        },
        "name": "kind-airflow-kpo"
      }
    ],
    "current-context": "kind-airflow-kpo",
    "kind": "Config",
    "users": [
      {
        "name": "kind-airflow-kpo",
        "user": {
          "client-certificate-data": "abcd",
          "client-key-data": "abcd"
        }
      }
    ]
  }
}

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions