-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.10.0
If "Other Airflow 2 version" selected, which one?
No response
What happened?
I'm trying to run the Spark application with the custom image using Airflow 2.10.0. Also, I use Kubernetes cluster connection (EKS). apache-airflow-providers-cncf-kubernetes version is 8.3.4. When I manually deploy SparkApplication resources to the cluster (via kubectl),the application works fine and is complete without errors. When I schedule DAG, it starts fine, and works without interruptions almost till the end, but then fails with error 404. It seems like it cannot communicate with the driver. However, when I check the cluster, the driver pod is still there and looks fine.
The error text:
[2024-09-10, 10:16:22 UTC] {taskinstance.py:3301} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
return ExecutionCallableRunner(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 293, in execute
return super().execute(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 592, in execute
return self.execute_sync(context)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 634, in execute_sync
self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod)
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 727, in await_xcom_sidecar_container_start
if self.container_is_running(pod, PodDefaults.SIDECAR_CONTAINER_NAME):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 645, in container_is_running
remote_pod = self.read_pod(pod)
^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 336, in wrapped_f
return copy(f, *args, **kw)
^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 475, in __call__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 376, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 418, in exc_check
raise retry_exc.reraise()
^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 185, in reraise
raise self.last_attempt.result()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 478, in __call__
result = fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 720, in read_pod
return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api/core_v1_api.py", line 23693, in read_namespaced_pod
return self.read_namespaced_pod_with_http_info(name, namespace, **kwargs) # noqa: E501
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api/core_v1_api.py", line 23780, in read_namespaced_pod_with_http_info
return self.api_client.call_api(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py", line 348, in call_api
return self.__call_api(resource_path, method,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
response_data = self.request(
^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py", line 373, in request
return self.rest_client.GET(url,
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/rest.py", line 244, in GET
return self.request("GET", url,
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/rest.py", line 238, in request
raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'b75ef5bb-a810-46c8-a052-6b42307c5229', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '034ee985-68e4-4ac9-ba26-c2b9e6b4cd1d', 'X-Kubernetes-Pf-Prioritylevel-Uid': '2e7745b6-a7a8-493c-99ec-ce7f9c416cff', 'Date': 'Tue, 10 Sep 2024 10:16:22 GMT', 'Content-Length': '270'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods \"<my-spark-app>-task-qyzuffwi-driver\" not found","reason":"NotFound","details":{"name":"<my-spark-app>-task-qyzuffwi-driver","kind":"pods"},"code":404}
[2024-09-10, 10:16:22 UTC] {taskinstance.py:1225} INFO - Marking task as FAILED. dag_id=<my-spark-app>, task_id=<my-spark-app>-task, run_id=scheduled__2024-09-10T09:15:00+00:00, execution_date=20240910T091500, start_date=20240910T091612, end_date=20240910T101622
What you think should happen instead?
DAG run should finish with the "success" status.
How to reproduce
Please keep in mind, that all the values in the angle brackets are not real, they were changed for security reasons. If you need any other information, e.g. any Airflow config values, please let me know.
DAG file looks like this:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.utils.dates import days_ago
default_args = {
'owner': '<me>',
'depends_on_past': False,
'start_date': '2024-09-06',
'email': ['<my@mail>'],
'email_on_failure': True,
'email_on_retry': False,
'max_active_runs': 1,
'retries': 0,
'max_active_tis_per_dag': 1 #This parameter controls the number of concurrent running task instances across dag_runs per task.
#'catchup': False
}
with DAG(
'<my-spark-app>',
default_args=default_args,
schedule_interval="*/10 * * * *",
tags=['<my-spark-app>']
) as dag:
spark_task = SparkKubernetesOperator(
task_id="c<my-spark-app>-task",
application_file='<my-spark-app>.yaml',
dag=dag,
namespace='default',
kubernetes_conn_id='<EKS_CONN>',
do_xcom_push=True,
params={"app_name": f"<my-spark-app>"}
)
sensor = SparkKubernetesSensor(
task_id='<my-spark-app>-monitor',
namespace="default",
application_name="{{ task_instance.xcom_pull(task_ids='<my-spark-app>-task')['metadata']['name'] }}",
kubernetes_conn_id="<EKS_CONN>",
dag=dag,
api_group="sparkoperator.k8s.io",
api_version='v1beta2',
attach_log=True
)
spark_task >> sensor
SparkApplication resource YAML looks like this:
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
labels: &Labels
app: <my-spark-app>
name: <my-spark-app>
spec:
type: Python
restartPolicy:
type: Never
pythonVersion: "3"
sparkVersion: "3.5.0"
mode: cluster
image: <my-ecr-repo>:<my-spark-app>
mainApplicationFile: local:///<my-spark-app>/main.py
sparkConf:
spark.ui.port: "4040"
spark.ui.showConsoleProgress: "true"
spark.sql.broadcastTimeout: "6000"
spark.hadoop.fs.s3a.multiobjectdelete.enable: "false"
spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled: "true"
spark.metrics.namespace: '<my-spark-app>-qa'
spark.metrics.conf.*.sink.graphite.host: <graphite-exporter-address>
spark.metrics.conf.*.sink.graphite.port: "<graphite-port>"
spark.metrics.conf.*.sink.graphite.class: "org.apache.spark.metrics.sink.GraphiteSink"
spark.metrics.conf.*.sink.graphite.period: "10"
spark.metrics.conf.*.sink.graphite.unit": "seconds"
spark.metrics.appStatusSource.enabled: "true"
spark.driver.extraJavaOptions: "-Dlog4j2.configurationFile=file:///<my-spark-app>/log4j2.properties -Dgraylog_host=<graylog-server> -Dgraylog_port=<graylog-port> -Dgraylog_app=<my-spark-app> -Dlog4j2.debug=false"
spark.executor.extraJavaOptions: "-Dlog4j2.configurationFile=file:///<my-spark-app>/log4j2.properties -Dgraylog_host=<graylog-server> -Dgraylog_port=<graylog-port> -Dgraylog_app=<my-spark-app> -Dlog4j2.debug=false"
spark.metrics.conf.driver.source.jvm.class: org.apache.spark.metrics.source.JvmSource
spark.metrics.conf.executor.source.jvm.class: org.apache.spark.metrics.source.JvmSource
spark.metrics.conf.worker.source.jvm.class: org.apache.spark.metrics.source.JvmSource
spark.metrics.conf.master.source.jvm.class: org.apache.spark.metrics.source.JvmSource
hadoopConf:
spark.hadoop.fs.s3a.user.agent.prefix: '<my-spark-app>-qa'
fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
fs.s3a.bucket.all.committer.magic.enabled: "true"
fs.s3a.endpoint: http://s3.eu-west-1.amazonaws.com
fs.s3a.connection.ssl.enabled: "false"
executor:
nodeSelector:
karpenter.sh/nodepool: default
labels: *Labels
serviceAccount: spark-operator-spark
cores: 8
coreRequest: "7500m"
instances: 1
memory: "40g"
driver:
nodeSelector:
karpenter.sh/nodepool: ondemand
labels: *Labels
serviceAccount: spark-operator-spark
cores: 4
memory: "16g"
env:
- name: DEPLOY_ENV
value: qa
Operating System
Airflow runs on EKS cluster and was deployed using Helm chart (version 1.16.0-dev)
Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes -- 8.3.4
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct