-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
databricks
Versions of Apache Airflow Providers
apache-airflow-providers-common-sql==1.24.0
apache-airflow-providers-databricks==7.2.1
databricks-sql-connector==4.0.0
Apache Airflow version
2.10.3
Operating System
MacOs
Deployment
Docker-Compose
Deployment details
Testing using AWS MWAA Local Runner
What happened
When upgraded the databricks provider to version 7.2.1 and tested the DAG with DatabricksNotebookOperator, DAG is failing with the below error:
airflow.exceptions.AirflowException: Response: {"error_code":"INVALID_PARAMETER_VALUE","message":"Invalid dependency graph, task 'e771895875324e2902a93fdc2ff36326' can not reference itself."}, Status Code: 400
What you think should happen instead
Ideally, It should generate correct dependency between Databricks tasks and deploy the Databricks Job. Whereas it is generating the wrong dependency graph for databricks tasks (referencing the dependent task itself).
How to reproduce
Below is the DAG code to regenerate the error:
import os
from airflow.models.dag import DAG
from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator
from airflow.providers.databricks.operators.databricks_workflow import DatabricksWorkflowTaskGroup
from airflow.utils.timezone import datetime
DATABRICKS_CONN_ID = os.getenv("DATABRICKS_CONN_ID", "databricks_default")
job_cluster_spec = [
{
"job_cluster_key": "Shared_job_cluster",
"new_cluster": {
"cluster_name": "",
"spark_version": "11.3.x-scala2.12",
"num_workers": 1,
"spark_conf": {},
"node_type_id": "r3.xlarge",
"ssh_public_keys": [],
"custom_tags": {},
"spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"},
"cluster_source": "JOB",
"init_scripts": [],
},
}
]
dag = DAG(
dag_id="example_databricks_workflow",
start_date=datetime(2022, 1, 1),
schedule=None,
catchup=False,
)
with dag:
task_group = DatabricksWorkflowTaskGroup(
group_id=f"test_workflow",
databricks_conn_id=DATABRICKS_CONN_ID,
job_clusters=job_cluster_spec,
)
with task_group:
notebook_1 = DatabricksNotebookOperator(
task_id="workflow_notebook_1",
databricks_conn_id=DATABRICKS_CONN_ID,
notebook_path="/Shared/Notebook_1",
source="WORKSPACE",
job_cluster_key="Shared_job_cluster",
)
notebook_2 = DatabricksNotebookOperator(
task_id="workflow_notebook_2",
databricks_conn_id=DATABRICKS_CONN_ID,
notebook_path="/Shared/Notebook_2",
source="WORKSPACE",
job_cluster_key="Shared_job_cluster",
)
notebook_1 >> notebook_2
Code is from Databricks example dags: https://github.com/apache/airflow/blob/providers-databricks/7.2.1/providers/databricks/tests/system/databricks/example_databricks_workflow.py
Anything else
It was working fine in apache-airflow-providers-databricks==6.12.0, when upgraded to 7.2.1 version, started getting the error.
Issue maybe related to this MR: #44960
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct