Skip to content

KubernetesJobOperator fails if you launch more than one pod #44994

@osintalex

Description

@osintalex

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

8.4.1

Apache Airflow version

2.10.2

Operating System

Not sure - in GCP cloud composer

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened

If you launch a kubernetes job operator, it tries to find the pod after execution. If the job has launched multiple pods, it then fails when trying to log since it can't find more than one pod.

I can pinpoint the place in the source code if you give me a link, I just can't find where it is on github but have identified it locally.

It's when raise AirflowException(f"More than one pod running with labels {label_selector}") gets called.

What you think should happen instead

Should be able to have some flag in the job operator constructor that prevents this behaviour from happening - many k8s jobs will launch more than one pods; or the find_pod logic is smart enough to know you will have more than one pod when your job has a large parallellism count.

How to reproduce

Launch a KubernetesJobOperator with parallelism > 1 and I hit it every time. This seems so basic though that I wonder if I am doing something wrong since I would have expected other people to run into it if that was the case.

I am running indexed jobs with completions equal to parallelism count.

Full config looks like this:

my_task = KubernetesJobOperator(
    backoff_limit=18,
    wait_until_job_complete=True,
    completion_mode="Indexed",
    completions=PARALLELISM,
    parallelism=PARALLELISM,
    ttl_seconds_after_finished=60 * 30,
    reattach_on_restart=False,
    get_logs=False,
    labels={
        "app.kubernetes.io/type": "<my pdb selector>",
    },
    job_poll_interval=60,
    config_file="/home/airflow/composer_kube_config",
    task_id="my_task",
    namespace="composer-user-workloads",
    name="my_task",
    image=IMAGE_NAME,
    cmds=["<my command>"],
    arguments=["<my args"],
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "250m", "memory": "512Mi"},
        limits={"memory": "512Mi"},
    ),
    kubernetes_conn_id="kubernetes_default",
    retries=1,
)

Anything else

I have fixed this issue by setting reattach_on_restart to False, which prevents the labels issue but has side effect of producing a stray pod. I then delete that with a python operator that makes use of the k8s hook:


  def delete_all_pods_from_job():
      k8s_hook = KubernetesHook(
          conn_id="kubernetes_default",
          config_file="/home/airflow/composer_kube_config",
      )
      pod_list: k8s_models.V1PodList = k8s_hook.core_v1_client.list_namespaced_pod(
          namespace="composer-user-workloads", label_selector="mylabel=true"
      )
      print(f"found {len(pod_list.items)} pods to delete")
      for pod in pod_list.items:
          print(f"Deleting pod with name {pod.metadata.name}")
          k8s_hook.core_v1_client.delete_namespaced_pod(name=pod.metadata.name, namespace=pod.metadata.namespace)

  cleanup_pods = PythonOperator(
      task_id="cleanup_pods",
      python_callable=delete_all_pods_from_job,
  )

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions