Skip to content

DatabricksRunNowOperator failing as named parameters Jinja templating not getting resolved #40788

@vatsrahul1001

Description

@vatsrahul1001

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

DatabricksRunNowOperator started failing after upgrading to 6.7.0 version with the below error


[2024-07-15, 05:29:05 UTC] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 563, in _do_api_call
    for attempt in self._get_retry_object():
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 435, in __iter__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 368, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 390, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
                                     ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 573, in _do_api_call
    response.raise_for_status()
  File "/usr/local/lib/python3.11/site-packages/requests/models.py", line 1021, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://adb-2703548196728655.15.azuredatabricks.net/api/2.1/jobs/run-now
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 460, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py", line 862, in execute
    self.run_id = hook.run_now(self.json)
                  ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks.py", line 243, in run_now
    response = self._do_api_call(RUN_NOW_ENDPOINT, json)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 580, in _do_api_call
    raise AirflowException(msg)
airflow.exceptions.AirflowException: Response: {"error_code":"INVALID_PARAMETER_VALUE","message":"Job 0 does not exist."}, Status Code: 400

I have verified it works well with 6.6.0 version

What you think should happen instead?

No response

How to reproduce

  1. Try to run below DAG with databricks provider 6.7.0

import json
import os
from datetime import timedelta
from typing import Dict, Optional

from airflow.models.dag import DAG
from airflow.utils.timezone import datetime

from airflow.providers.databricks.operators.databricks import (
DatabricksRunNowOperator,
DatabricksSubmitRunOperator,
)

DATABRICKS_CONN_ID = os.getenv("ASTRO_DATABRICKS_CONN_ID", "databricks_default")
# Notebook path as a Json object
notebook_task = '{"notebook_path": "/Users/x/quick_start"}'
NOTEBOOK_TASK = json.loads(os.getenv("DATABRICKS_NOTEBOOK_TASK", notebook_task))
notebook_params: Optional[Dict[str, str]] = {"Variable": "5"}
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))

default_args = {
    "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
    "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
    "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}

new_cluster = {
    "num_workers": 1,
    "spark_version": "10.4.x-scala2.12",
    "spark_conf": {},
    "azure_attributes": {
        "availability": "ON_DEMAND_AZURE",
        "spot_bid_max_price": -1,
    },
    "node_type_id": "Standard_D3_v2",
    "ssh_public_keys": [],
    "custom_tags": {},
    "spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"},
    "cluster_source": "JOB",
    "init_scripts": [],
}


with DAG(
    dag_id="example_async_databricks",
    start_date=datetime(2022, 1, 1),
    schedule=None,
    catchup=False,
    default_args=default_args,
    tags=["example", "async", "databricks"],
) as dag:
    # [START howto_operator_databricks_submit_run_async]
    opr_submit_run = DatabricksSubmitRunOperator(
        task_id="submit_run",
        databricks_conn_id=DATABRICKS_CONN_ID,
        new_cluster=new_cluster,
        notebook_task=NOTEBOOK_TASK,
        do_xcom_push=True,
        deferrable=True
    )
    # [END howto_operator_databricks_submit_run_async]

    # [START howto_operator_databricks_run_now_async]
    opr_run_now = DatabricksRunNowOperator(
        task_id="run_now",
        databricks_conn_id=DATABRICKS_CONN_ID,
        job_id="{{ task_instance.xcom_pull(task_ids='submit_run', dag_id='example_async_databricks', key='job_id') }}",
        notebook_params=notebook_params,
        deferrable=True
    )
    # [END howto_operator_databricks_run_now_async]

opr_submit_run >> opr_run_now

Operating System

Linux

Versions of Apache Airflow Providers

databricks 6.7.0

Deployment

Astronomer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions