Skip to content

SparkKubernetesOperator not rendering template correctly #37261

@zlosim

Description

@zlosim

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

7.14.0

Apache Airflow version

2.7.2

Operating System

aws

Deployment

Amazon (AWS) MWAA

Deployment details

No response

What happened

Im trying to run SparkKubernetesOperator passing template as python dict via template_spec , in this map Im trying to pass {{ds}} as one of the argument for the app. when I check the airflow UI i can see it was rendered correctly
gAJORwT
but when i check the app in K8s directly i can see it was not submitted with rendered argument

$ kubectl get sparkapplications.sparkoperator.k8s.io -n spark  xyz-ipc2kunh -o yaml

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  creationTimestamp: "2024-02-08T19:58:25Z"
  generation: 1
  name: xyz-ipc2kunh
  namespace: spark
  resourceVersion: "203894782"
  uid: 52cbdba9-0269-4dce-b7d1-d14e822dcde4
spec:
  arguments:
  - '{{ds}}'
  driver:
    coreLimit: 1200m
    cores: 1
    labels:
      version: 3.5.0
    memory: 1g
    serviceAccount: spark-operator-spark
  executor:
    cores: 1
    labels:
      version: 3.5.0
    memory: 8g
  hadoopConf: {}
  image: spark:3.5.0
  imagePullPolicy: Always
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar
  mainClass: org.apache.spark.examples.SparkPi
  mode: cluster
  restartPolicy:
    type: Never
  sparkConf:
    spark.jars.ivy: /tmp
    spark.kubernetes.authenticate.driver.serviceAccountName: zeppelin-server
    spark.kubernetes.driver.service.deleteOnTermination: "false"
  sparkVersion: 3.5.0
  timeToLiveSeconds: 1800
  type: Scala
status:
  applicationState:
    errorMessage: 'driver container failed with ExitCode: 1, Reason: Error'
    state: FAILED
  driverInfo:
    podName: xyz-ipc2kunh-driver
    webUIAddress: 10.100.194.61:0
    webUIPort: 4040
    webUIServiceName: xyz-ipc2kunh-ui-svc
  executionAttempts: 1
  executorState:
    spark-pi-4732d98d8a4cf813-exec-1: COMPLETED
  lastSubmissionAttemptTime: "2024-02-08T19:58:30Z"
  sparkApplicationId: spark-d87f2f6f028943c2910968a709719ec1
  submissionAttempts: 1
  submissionID: ca78be37-e059-4863-9259-1dfd2758aaa7
  terminationTime: "2024-02-08T19:58:43Z"

What you think should happen instead

airflow should submit spark app with rendered template

How to reproduce

you can use this dag and then check the spark app created

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator


DAG_ID = "data-migration3"
spec = {'apiVersion': 'sparkoperator.k8s.io/v1beta2',
        'kind': 'SparkApplication',
        'metadata': {'namespace': 'spark'},
        'spec': {
            'arguments': ['{{ds}}'],
            'driver': {
                'coreLimit': '1200m',
                'cores': 1,
                'labels': {'version': '3.5.0'},
                'memory': '1g',
                'serviceAccount': 'spark-operator-spark',
            },
            'executor': {
                'cores': 1,
                'labels': {'version': '3.5.0'},
                'memory': '8g',
            },
            'hadoopConf': {},
            'image': 'spark:3.5.0',
            'imagePullPolicy': 'Always',
            'mainApplicationFile': 'local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar',
            'mainClass': 'org.apache.spark.examples.SparkPi',
            'mode': 'cluster',
            'restartPolicy': {'type': 'Never'},
            'sparkConf': {
                'spark.jars.ivy': '/tmp',
                'spark.kubernetes.authenticate.driver.serviceAccountName': 'zeppelin-server',
                'spark.kubernetes.driver.service.deleteOnTermination': 'false',
            },
            'sparkVersion': '3.5.0',
            'timeToLiveSeconds': 1800,
            'type': 'Scala'}}
with DAG(
        DAG_ID,
        default_args={"max_active_runs": 1},
        description="migrate data from backups to new tables",
        schedule="5 4 2 * *",
        start_date=datetime(2023, 11, 22),
        end_date=datetime(2023, 12, 22),
        catchup=True,
) as dag:
    SparkKubernetesOperator(
        base_container_name="spark-kubernetes-driver",
        retries=0,
        retry_exponential_backoff=True,
        max_retry_delay=timedelta(minutes=3),
        retry_delay=timedelta(seconds=10),
        depends_on_past=False,
        task_id="{}".format("xyz"),
        namespace="spark",
        delete_on_termination=False,
        reattach_on_restart=True,
        get_logs=True,
        log_events_on_failure=True,
        name="xyz",
        template_spec=spec,
        kubernetes_conn_id='dwh_eks',
        dag=dag
    )

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions