Skip to content

SparkKubernetesOperator not retrieves logs from the driver pod and displays them in the Airflow UI. #37681

@thispejo

Description

@thispejo

Apache Airflow version

2.8.1

If "Other Airflow 2 version" selected, which one?

2.8.1

What happened?

I was using version 2.6.1 and after updating to version 2.8.1 the SparkKubernetesOperator task stopped retrieving the logs and showing them in the UI.

the code:

elt_task = SparkKubernetesOperator(
    task_id=f'ex_elt',
    namespace='spark',
    application_file='runner_elt_digital_coupon.yaml',
    do_xcom_push=True,
    params={
        'clientId': Variable.get("clientId", default_var="0"),
        'reprocess': Variable.get("reprocess", default_var="0")
    }
)       
    
exit_etl = SparkKubernetesSensor(
    task_id=f'exit_elt',
    namespace='spark',
    application_name='runner-elt-digital-coupon',
    poke_interval=30,
    timeout=10800, 
    attach_log=True
)

log:

[2024-02-24, 21:02:44 UTC] {init.py:54} DEBUG - Loading core task runner: StandardTaskRunner
[2024-02-24, 21:02:44 UTC] {base_task_runner.py:72} DEBUG - Planning to run as the user
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.ex_elt manual__2024-02-24T21:02:41.702796+00:00 [queued]> dependency 'Task Instance State' PASSED: True, Task state queued was valid.
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.ex_elt manual__2024-02-24T21:02:41.702796+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.ex_elt manual__2024-02-24T21:02:41.702796+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.ex_elt manual__2024-02-24T21:02:41.702796+00:00 [queued]> dependency 'Task Instance Not Running' PASSED: True, Task is not in running state.
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.ex_elt manual__2024-02-24T21:02:41.702796+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dashboard_digital_coupon.ex_elt manual__2024-02-24T21:02:41.702796+00:00 [queued]>
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.ex_elt manual__2024-02-24T21:02:41.702796+00:00 [queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.ex_elt manual__2024-02-24T21:02:41.702796+00:00 [queued]> dependency 'Pool Slots Available' PASSED: True, There are enough open slots in default_pool to execute the task
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.ex_elt manual__2024-02-24T21:02:41.702796+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.ex_elt manual__2024-02-24T21:02:41.702796+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.ex_elt manual__2024-02-24T21:02:41.702796+00:00 [queued]> dependency 'Task Concurrency' PASSED: True, Task concurrency is not set.
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dashboard_digital_coupon.ex_elt manual__2024-02-24T21:02:41.702796+00:00 [queued]>
[2024-02-24, 21:02:44 UTC] {taskinstance.py:2171} INFO - Starting attempt 1 of 1
[2024-02-24, 21:02:44 UTC] {taskinstance.py:2192} INFO - Executing <Task(SparkKubernetesOperator): ex_elt> on 2024-02-24 21:02:41.702796+00:00
[2024-02-24, 21:02:44 UTC] {standard_task_runner.py:60} INFO - Started process 133 to run task
[2024-02-24, 21:02:44 UTC] {standard_task_runner.py:87} INFO - Running: ['airflow', 'tasks', 'run', 'dashboard_digital_coupon', 'ex_elt', 'manual__2024-02-24T21:02:41.702796+00:00', '--job-id', '38', '--raw', '--subdir', 'DAGS_FOLDER/git_devops/airflowdags/dags/dashboard/digital-coupon/dashboard_digital_coupon.py', '--cfg-path', '/tmp/tmp64gjofv0']
[2024-02-24, 21:02:44 UTC] {standard_task_runner.py:88} INFO - Job 38: Subtask ex_elt
[2024-02-24, 21:02:44 UTC] {cli_action_loggers.py:67} DEBUG - Calling callbacks: [<function default_action_log at 0x7f7f904a53a0>]
[2024-02-24, 21:02:44 UTC] {task_command.py:423} INFO - Running <TaskInstance: dashboard_digital_coupon.ex_elt manual__2024-02-24T21:02:41.702796+00:00 [running]> on host airflow-homo-worker-0.airflow-homo-worker-hl.airflow-homo.svc.cluster.local
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1747} DEBUG - Clearing XCom data
[2024-02-24, 21:02:44 UTC] {retries.py:93} DEBUG - Running RenderedTaskInstanceFields.write with retries. Try 1 of 3
[2024-02-24, 21:02:44 UTC] {retries.py:93} DEBUG - Running RenderedTaskInstanceFields._do_delete_old_records with retries. Try 1 of 3
[2024-02-24, 21:02:44 UTC] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='xpto@xpto.com.br' AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='dashboard_digital_coupon' AIRFLOW_CTX_TASK_ID='ex_elt' AIRFLOW_CTX_EXECUTION_DATE='2024-02-24T21:02:41.702796+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-02-24T21:02:41.702796+00:00'
[2024-02-24, 21:02:44 UTC] {init.py:115} DEBUG - Preparing lineage inlets and outlets
[2024-02-24, 21:02:44 UTC] {init.py:154} DEBUG - inlets: [], outlets: []
[2024-02-24, 21:02:44 UTC] {base.py:83} INFO - Using connection ID 'kubernetes_default' for task execution.
[2024-02-24, 21:02:44 UTC] {rest.py:231} DEBUG - response body: {"apiVersion":"sparkoperator.k8s.io/v1beta2","kind":"SparkApplication","metadata":{"creationTimestamp":"2024-02-24T21:02:44Z","generation":1,"managedFields":[{"apiVersion":"sparkoperator.k8s.io/v1beta2","fieldsType":"FieldsV1","fieldsV1":{"f:spec":{".":{},"f:driver":{".":{},"f:coreLimit":{},"f:cores":{},"f:env":{},"f:memory":{},"f:serviceAccount":{}},"f:dynamicAllocation":{".":{},"f:enabled":{}},"f:executor":{".":{},"f:cores":{},"f:instances":{},"f:memory":{}},"f:image":{},"f:imagePullPolicy":{},"f:imagePullSecrets":{},"f:mainApplicationFile":{},"f:mode":{},"f:nodeSelector":{".":{},"f:name":{}},"f:pythonVersion":{},"f:restartPolicy":{".":{},"f:onFailureRetries":{},"f:onFailureRetryInterval":{},"f:onSubmissionFailureRetries":{},"f:onSubmissionFailureRetryInterval":{}},"f:sparkConf":{".":{},"f:spark.driver.extraJavaOptions":{},"f:spark.executor.extraJavaOptions":{},"f:spark.kubernetes.driver.terminationGracePeriodSeconds":{},"f:spark.kubernetes.executor.terminationGracePeriodSeconds":{}},"f:sparkVersion":{},"f:timeToLiveSeconds":{},"f:type":{}}},"manager":"OpenAPI-Generator","operation":"Update","time":"2024-02-24T21:02:44Z"}],"name":"runner-elt-digital-coupon","namespace":"spark","resourceVersion":"177941341","uid":"58532468-f329-462c-88a7-8722245092c0"},"spec":{"driver":{"coreLimit":"1200m","cores":1,"env":[{"name":"SPARK_DRIVER_MEMORY","value":"2g"},{"name":"SPARK_DRIVER_JAVA_OPTS","value":"-Xss4m"}],"memory":"2g","serviceAccount":"default"},"dynamicAllocation":{"enabled":false},"executor":{"cores":1,"instances":1,"memory":"4g"},"image":"iad.ocir.io/idfv4xzttkan/xptover-spark:last","imagePullPolicy":"Always","imagePullSecrets":["ocirsecret"],"mainApplicationFile":"local:///app/dash_elt_digital_coupon.py","mode":"cluster","nodeSelector":{"name":"Spark"},"pythonVersion":"3","restartPolicy":{"onFailureRetries":3,"onFailureRetryInterval":10,"onSubmissionFailureRetries":5,"onSubmissionFailureRetryInterval":20},"sparkConf":{"spark.driver.extraJavaOptions":"-Xss4m","spark.executor.extraJavaOptions":"-Xss4m","spark.kubernetes.driver.terminationGracePeriodSeconds":"90","spark.kubernetes.executor.terminationGracePeriodSeconds":"90"},"sparkVersion":"3.5.0","timeToLiveSeconds":15,"type":"Python"}}
[2024-02-24, 21:02:44 UTC] {kubernetes.py:316} DEBUG - Response: {'apiVersion': 'sparkoperator.k8s.io/v1beta2', 'kind': 'SparkApplication', 'metadata': {'creationTimestamp': '2024-02-24T21:02:44Z', 'generation': 1, 'managedFields': [{'apiVersion': 'sparkoperator.k8s.io/v1beta2', 'fieldsType': 'FieldsV1', 'fieldsV1': {'f:spec': {'.': {}, 'f:driver': {'.': {}, 'f:coreLimit': {}, 'f:cores': {}, 'f:env': {}, 'f:memory': {}, 'f:serviceAccount': {}}, 'f:dynamicAllocation': {'.': {}, 'f:enabled': {}}, 'f:executor': {'.': {}, 'f:cores': {}, 'f:instances': {}, 'f:memory': {}}, 'f:image': {}, 'f:imagePullPolicy': {}, 'f:imagePullSecrets': {}, 'f:mainApplicationFile': {}, 'f:mode': {}, 'f:nodeSelector': {'.': {}, 'f:name': {}}, 'f:pythonVersion': {}, 'f:restartPolicy': {'.': {}, 'f:onFailureRetries': {}, 'f:onFailureRetryInterval': {}, 'f:onSubmissionFailureRetries': {}, 'f:onSubmissionFailureRetryInterval': {}}, 'f:sparkConf': {'.': {}, 'f:spark.driver.extraJavaOptions': {}, 'f:spark.executor.extraJavaOptions': {}, 'f:spark.kubernetes.driver.terminationGracePeriodSeconds': {}, 'f:spark.kubernetes.executor.terminationGracePeriodSeconds': {}}, 'f:sparkVersion': {}, 'f:timeToLiveSeconds': {}, 'f:type': {}}}, 'manager': 'OpenAPI-Generator', 'operation': 'Update', 'time': '2024-02-24T21:02:44Z'}], 'name': 'runner-elt-digital-coupon', 'namespace': 'spark', 'resourceVersion': '177941341', 'uid': '58532468-f329-462c-88a7-8722245092c0'}, 'spec': {'driver': {'coreLimit': '1200m', 'cores': 1, 'env': [{'name': 'SPARK_DRIVER_MEMORY', 'value': '2g'}, {'name': 'SPARK_DRIVER_JAVA_OPTS', 'value': '-Xss4m'}], 'memory': '2g', 'serviceAccount': 'default'}, 'dynamicAllocation': {'enabled': False}, 'executor': {'cores': 1, 'instances': 1, 'memory': '4g'}, 'image': 'iad.ocir.io/idfv47cttkan/xpto-spark:last', 'imagePullPolicy': 'Always', 'imagePullSecrets': ['***'], 'mainApplicationFile': 'local:///app/dash_elt_digital_coupon.py', 'mode': 'cluster', 'nodeSelector': {'name': 'Spark'}, 'pythonVersion': '3', 'restartPolicy': {'onFailureRetries': 3, 'onFailureRetryInterval': 10, 'onSubmissionFailureRetries': 5, 'onSubmissionFailureRetryInterval': 20}, 'sparkConf': {'spark.driver.extraJavaOptions': '-Xss4m', 'spark.executor.extraJavaOptions': '-Xss4m', 'spark.kubernetes.driver.terminationGracePeriodSeconds': '90', 'spark.kubernetes.executor.terminationGracePeriodSeconds': '90'}, 'sparkVersion': '3.5.0', 'timeToLiveSeconds': 15, 'type': 'Python'}}
[2024-02-24, 21:02:44 UTC] {init.py:73} DEBUG - Lineage called with inlets: [], outlets: []
[2024-02-24, 21:02:44 UTC] {taskinstance.py:538} DEBUG - Clearing next_method and next_kwargs.
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=dashboard_digital_coupon, task_id=ex_elt, execution_date=20240224T210241, start_date=20240224T210244, end_date=20240224T210244
[2024-02-24, 21:02:44 UTC] {taskinstance.py:516} DEBUG - Task Duration set to 0.242214
[2024-02-24, 21:02:44 UTC] {cli_action_loggers.py:85} DEBUG - Calling callbacks: []
[2024-02-24, 21:02:44 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 0
[2024-02-24, 21:02:44 UTC] {dagrun.py:813} DEBUG - number of tis tasks for <DagRun dashboard_digital_coupon @ 2024-02-24 21:02:41.702796+00:00: manual__2024-02-24T21:02:41.702796+00:00, state:running, queued_at: 2024-02-24 21:02:41.715274+00:00. externally triggered: True>: 4 task(s)
[2024-02-24, 21:02:44 UTC] {dagrun.py:834} DEBUG - number of scheduleable tasks for <DagRun dashboard_digital_coupon @ 2024-02-24 21:02:41.702796+00:00: manual__2024-02-24T21:02:41.702796+00:00, state:running, queued_at: 2024-02-24 21:02:41.715274+00:00. externally triggered: True>: 2 task(s)
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.ex_es manual__2024-02-24T21:02:41.702796+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.ex_es manual__2024-02-24T21:02:41.702796+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.ex_es manual__2024-02-24T21:02:41.702796+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=0, skipped=0, failed=0, upstream_failed=0, removed=0, done=0, success_setup=0, skipped_setup=0), upstream_task_ids={'exit_elt'}
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1947} DEBUG - Dependencies not met for <TaskInstance: dashboard_digital_coupon.ex_es manual__2024-02-24T21:02:41.702796+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=0, skipped=0, failed=0, upstream_failed=0, removed=0, done=0, success_setup=0, skipped_setup=0), upstream_task_ids={'exit_elt'}
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.exit_es manual__2024-02-24T21:02:41.702796+00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=0, skipped=0, failed=0, upstream_failed=0, removed=0, done=0, success_setup=0, skipped_setup=0), upstream_task_ids={'ex_es'}
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1947} DEBUG - Dependencies not met for <TaskInstance: dashboard_digital_coupon.exit_es manual__2024-02-24T21:02:41.702796+00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=0, skipped=0, failed=0, upstream_failed=0, removed=0, done=0, success_setup=0, skipped_setup=0), upstream_task_ids={'ex_es'}
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.exit_es manual__2024-02-24T21:02:41.702796+00:00 [None]> dependency 'Ready To Reschedule' PASSED: True, Task is not in reschedule mode.
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.exit_es manual__2024-02-24T21:02:41.702796+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2024-02-24, 21:02:44 UTC] {taskinstance.py:1966} DEBUG - <TaskInstance: dashboard_digital_coupon.exit_es manual__2024-02-24T21:02:41.702796+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2024-02-24, 21:02:44 UTC] {taskinstance.py:3281} INFO - 0 downstream tasks scheduled from follow-on schedule check

What you think should happen instead?

No response

How to reproduce

elt_task = SparkKubernetesOperator(
    task_id=f'ex_elt',
    namespace='spark',
    application_file='runner_elt_digital_coupon.yaml',
    do_xcom_push=True,
    params={
        'clientId': Variable.get("clientId", default_var="0"),
        'reprocess': Variable.get("reprocess", default_var="0")
    }
)       
    
exit_etl = SparkKubernetesSensor(
    task_id=f'exit_elt',
    namespace='spark',
    application_name='runner-elt-digital-coupon',
    poke_interval=30,
    timeout=10800, 
    attach_log=True
)

executor: "CeleryKubernetesExecutor"

requiriments.txt
authlib
discord-webhook
kubernetes>=21.7.0,<24
apache-airflow-providers-apache-spark
pytz

Operating System

kubernetes v1.26.2

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions