Skip to content

kubernetes.client.exceptions.ApiException: (404) Reason: Not Found (driver pod is not found) #42132

@captify-mkambur

Description

@captify-mkambur

Apache Airflow version

2.10.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

I'm trying to run the Spark application with the custom image using Airflow 2.10.0. Also, I use Kubernetes cluster connection (EKS). apache-airflow-providers-cncf-kubernetes version is 8.3.4. When I manually deploy SparkApplication resources to the cluster (via kubectl),the application works fine and is complete without errors. When I schedule DAG, it starts fine, and works without interruptions almost till the end, but then fails with error 404. It seems like it cannot communicate with the driver. However, when I check the cluster, the driver pod is still there and looks fine.
The error text:

[2024-09-10, 10:16:22 UTC] {taskinstance.py:3301} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 293, in execute
    return super().execute(context=context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 592, in execute
    return self.execute_sync(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 634, in execute_sync
    self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 727, in await_xcom_sidecar_container_start
    if self.container_is_running(pod, PodDefaults.SIDECAR_CONTAINER_NAME):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 645, in container_is_running
    remote_pod = self.read_pod(pod)
                 ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 336, in wrapped_f
    return copy(f, *args, **kw)
           ^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 475, in __call__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 376, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 418, in exc_check
    raise retry_exc.reraise()
          ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 185, in reraise
    raise self.last_attempt.result()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 478, in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 720, in read_pod
    return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api/core_v1_api.py", line 23693, in read_namespaced_pod
    return self.read_namespaced_pod_with_http_info(name, namespace, **kwargs)  # noqa: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api/core_v1_api.py", line 23780, in read_namespaced_pod_with_http_info
    return self.api_client.call_api(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
                    ^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py", line 373, in request
    return self.rest_client.GET(url,
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/rest.py", line 244, in GET
    return self.request("GET", url,
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/rest.py", line 238, in request
    raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'b75ef5bb-a810-46c8-a052-6b42307c5229', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '034ee985-68e4-4ac9-ba26-c2b9e6b4cd1d', 'X-Kubernetes-Pf-Prioritylevel-Uid': '2e7745b6-a7a8-493c-99ec-ce7f9c416cff', 'Date': 'Tue, 10 Sep 2024 10:16:22 GMT', 'Content-Length': '270'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods \"<my-spark-app>-task-qyzuffwi-driver\" not found","reason":"NotFound","details":{"name":"<my-spark-app>-task-qyzuffwi-driver","kind":"pods"},"code":404}
[2024-09-10, 10:16:22 UTC] {taskinstance.py:1225} INFO - Marking task as FAILED. dag_id=<my-spark-app>, task_id=<my-spark-app>-task, run_id=scheduled__2024-09-10T09:15:00+00:00, execution_date=20240910T091500, start_date=20240910T091612, end_date=20240910T101622

What you think should happen instead?

DAG run should finish with the "success" status.

How to reproduce

Please keep in mind, that all the values in the angle brackets are not real, they were changed for security reasons. If you need any other information, e.g. any Airflow config values, please let me know.
DAG file looks like this:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.utils.dates import days_ago

default_args = {
    'owner': '<me>',
    'depends_on_past': False,
    'start_date': '2024-09-06',
    'email': ['<my@mail>'],
    'email_on_failure': True,
    'email_on_retry': False,
    'max_active_runs': 1,
    'retries': 0,
    'max_active_tis_per_dag': 1 #This parameter controls the number of concurrent running task instances across dag_runs per task.
    #'catchup': False
}

with DAG(
    '<my-spark-app>',
    default_args=default_args,
    schedule_interval="*/10 * * * *",
    tags=['<my-spark-app>']
) as dag:

    spark_task = SparkKubernetesOperator(
        task_id="c<my-spark-app>-task",
        application_file='<my-spark-app>.yaml',
        dag=dag,
        namespace='default',
        kubernetes_conn_id='<EKS_CONN>',
        do_xcom_push=True,
        params={"app_name": f"<my-spark-app>"}
    )

    sensor = SparkKubernetesSensor(
        task_id='<my-spark-app>-monitor',
        namespace="default",
        application_name="{{ task_instance.xcom_pull(task_ids='<my-spark-app>-task')['metadata']['name'] }}",
        kubernetes_conn_id="<EKS_CONN>",
        dag=dag,
        api_group="sparkoperator.k8s.io",
        api_version='v1beta2',
        attach_log=True
    )

    spark_task >> sensor
SparkApplication resource YAML looks like this:
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  labels: &Labels
    app:  <my-spark-app>
  name:  <my-spark-app>
spec:
  type: Python
  restartPolicy:
    type: Never
  pythonVersion: "3"
  sparkVersion: "3.5.0"
  mode: cluster
  image: <my-ecr-repo>:<my-spark-app>
  mainApplicationFile: local:///<my-spark-app>/main.py
  sparkConf:
    spark.ui.port: "4040"
    spark.ui.showConsoleProgress: "true"
    spark.sql.broadcastTimeout: "6000"
    spark.hadoop.fs.s3a.multiobjectdelete.enable: "false"
    spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled: "true"
    spark.metrics.namespace: '<my-spark-app>-qa'
    spark.metrics.conf.*.sink.graphite.host: <graphite-exporter-address>
    spark.metrics.conf.*.sink.graphite.port: "<graphite-port>"
    spark.metrics.conf.*.sink.graphite.class: "org.apache.spark.metrics.sink.GraphiteSink"
    spark.metrics.conf.*.sink.graphite.period: "10"
    spark.metrics.conf.*.sink.graphite.unit": "seconds"
    spark.metrics.appStatusSource.enabled: "true"
    spark.driver.extraJavaOptions: "-Dlog4j2.configurationFile=file:///<my-spark-app>/log4j2.properties -Dgraylog_host=<graylog-server> -Dgraylog_port=<graylog-port> -Dgraylog_app=<my-spark-app> -Dlog4j2.debug=false"
    spark.executor.extraJavaOptions: "-Dlog4j2.configurationFile=file:///<my-spark-app>/log4j2.properties -Dgraylog_host=<graylog-server> -Dgraylog_port=<graylog-port> -Dgraylog_app=<my-spark-app> -Dlog4j2.debug=false"
    spark.metrics.conf.driver.source.jvm.class: org.apache.spark.metrics.source.JvmSource
    spark.metrics.conf.executor.source.jvm.class: org.apache.spark.metrics.source.JvmSource
    spark.metrics.conf.worker.source.jvm.class: org.apache.spark.metrics.source.JvmSource
    spark.metrics.conf.master.source.jvm.class: org.apache.spark.metrics.source.JvmSource
  hadoopConf:
    spark.hadoop.fs.s3a.user.agent.prefix: '<my-spark-app>-qa'
    fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
    fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
    fs.s3a.bucket.all.committer.magic.enabled: "true"
    fs.s3a.endpoint: http://s3.eu-west-1.amazonaws.com
    fs.s3a.connection.ssl.enabled: "false"
  executor:
    nodeSelector:
      karpenter.sh/nodepool: default
    labels: *Labels
    serviceAccount: spark-operator-spark
    cores: 8
    coreRequest: "7500m"
    instances: 1
    memory: "40g"
  driver:
    nodeSelector:
      karpenter.sh/nodepool: ondemand
    labels: *Labels
    serviceAccount: spark-operator-spark
    cores: 4
    memory: "16g"
    env:
      - name: DEPLOY_ENV
        value: qa

Operating System

Airflow runs on EKS cluster and was deployed using Helm chart (version 1.16.0-dev)

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes -- 8.3.4

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions