-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Labels
area:corearea:providerskind:bugThis is a clearly a bugThis is a clearly a bugpending-responseprovider:databricks
Description
Apache Airflow version
main (development)
If "Other Airflow 2 version" selected, which one?
No response
What happened?
DatabricksRunNowOperator started failing after upgrading to 6.7.0 version with the below error
[2024-07-15, 05:29:05 UTC] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 563, in _do_api_call
for attempt in self._get_retry_object():
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 435, in __iter__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 368, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 390, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 573, in _do_api_call
response.raise_for_status()
File "/usr/local/lib/python3.11/site-packages/requests/models.py", line 1021, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://adb-2703548196728655.15.azuredatabricks.net/api/2.1/jobs/run-now
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 460, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py", line 862, in execute
self.run_id = hook.run_now(self.json)
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks.py", line 243, in run_now
response = self._do_api_call(RUN_NOW_ENDPOINT, json)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 580, in _do_api_call
raise AirflowException(msg)
airflow.exceptions.AirflowException: Response: {"error_code":"INVALID_PARAMETER_VALUE","message":"Job 0 does not exist."}, Status Code: 400
I have verified it works well with 6.6.0 version
What you think should happen instead?
No response
How to reproduce
- Try to run below DAG with databricks provider 6.7.0
import json
import os
from datetime import timedelta
from typing import Dict, Optional
from airflow.models.dag import DAG
from airflow.utils.timezone import datetime
from airflow.providers.databricks.operators.databricks import (
DatabricksRunNowOperator,
DatabricksSubmitRunOperator,
)
DATABRICKS_CONN_ID = os.getenv("ASTRO_DATABRICKS_CONN_ID", "databricks_default")
# Notebook path as a Json object
notebook_task = '{"notebook_path": "/Users/x/quick_start"}'
NOTEBOOK_TASK = json.loads(os.getenv("DATABRICKS_NOTEBOOK_TASK", notebook_task))
notebook_params: Optional[Dict[str, str]] = {"Variable": "5"}
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
new_cluster = {
"num_workers": 1,
"spark_version": "10.4.x-scala2.12",
"spark_conf": {},
"azure_attributes": {
"availability": "ON_DEMAND_AZURE",
"spot_bid_max_price": -1,
},
"node_type_id": "Standard_D3_v2",
"ssh_public_keys": [],
"custom_tags": {},
"spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"},
"cluster_source": "JOB",
"init_scripts": [],
}
with DAG(
dag_id="example_async_databricks",
start_date=datetime(2022, 1, 1),
schedule=None,
catchup=False,
default_args=default_args,
tags=["example", "async", "databricks"],
) as dag:
# [START howto_operator_databricks_submit_run_async]
opr_submit_run = DatabricksSubmitRunOperator(
task_id="submit_run",
databricks_conn_id=DATABRICKS_CONN_ID,
new_cluster=new_cluster,
notebook_task=NOTEBOOK_TASK,
do_xcom_push=True,
deferrable=True
)
# [END howto_operator_databricks_submit_run_async]
# [START howto_operator_databricks_run_now_async]
opr_run_now = DatabricksRunNowOperator(
task_id="run_now",
databricks_conn_id=DATABRICKS_CONN_ID,
job_id="{{ task_instance.xcom_pull(task_ids='submit_run', dag_id='example_async_databricks', key='job_id') }}",
notebook_params=notebook_params,
deferrable=True
)
# [END howto_operator_databricks_run_now_async]
opr_submit_run >> opr_run_now
Operating System
Linux
Versions of Apache Airflow Providers
databricks 6.7.0
Deployment
Astronomer
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
area:corearea:providerskind:bugThis is a clearly a bugThis is a clearly a bugpending-responseprovider:databricks