-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
cncf-kubernetes
Versions of Apache Airflow Providers
8.4.1
Apache Airflow version
2.10.2
Operating System
Not sure - in GCP cloud composer
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
What happened
If you launch a kubernetes job operator, it tries to find the pod after execution. If the job has launched multiple pods, it then fails when trying to log since it can't find more than one pod.
I can pinpoint the place in the source code if you give me a link, I just can't find where it is on github but have identified it locally.
It's when raise AirflowException(f"More than one pod running with labels {label_selector}") gets called.
What you think should happen instead
Should be able to have some flag in the job operator constructor that prevents this behaviour from happening - many k8s jobs will launch more than one pods; or the find_pod logic is smart enough to know you will have more than one pod when your job has a large parallellism count.
How to reproduce
Launch a KubernetesJobOperator with parallelism > 1 and I hit it every time. This seems so basic though that I wonder if I am doing something wrong since I would have expected other people to run into it if that was the case.
I am running indexed jobs with completions equal to parallelism count.
Full config looks like this:
my_task = KubernetesJobOperator(
backoff_limit=18,
wait_until_job_complete=True,
completion_mode="Indexed",
completions=PARALLELISM,
parallelism=PARALLELISM,
ttl_seconds_after_finished=60 * 30,
reattach_on_restart=False,
get_logs=False,
labels={
"app.kubernetes.io/type": "<my pdb selector>",
},
job_poll_interval=60,
config_file="/home/airflow/composer_kube_config",
task_id="my_task",
namespace="composer-user-workloads",
name="my_task",
image=IMAGE_NAME,
cmds=["<my command>"],
arguments=["<my args"],
container_resources=k8s_models.V1ResourceRequirements(
requests={"cpu": "250m", "memory": "512Mi"},
limits={"memory": "512Mi"},
),
kubernetes_conn_id="kubernetes_default",
retries=1,
)
Anything else
I have fixed this issue by setting reattach_on_restart to False, which prevents the labels issue but has side effect of producing a stray pod. I then delete that with a python operator that makes use of the k8s hook:
def delete_all_pods_from_job():
k8s_hook = KubernetesHook(
conn_id="kubernetes_default",
config_file="/home/airflow/composer_kube_config",
)
pod_list: k8s_models.V1PodList = k8s_hook.core_v1_client.list_namespaced_pod(
namespace="composer-user-workloads", label_selector="mylabel=true"
)
print(f"found {len(pod_list.items)} pods to delete")
for pod in pod_list.items:
print(f"Deleting pod with name {pod.metadata.name}")
k8s_hook.core_v1_client.delete_namespaced_pod(name=pod.metadata.name, namespace=pod.metadata.namespace)
cleanup_pods = PythonOperator(
task_id="cleanup_pods",
python_callable=delete_all_pods_from_job,
)
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct