Skip to content

DatabricksNotebookOperator generating invalid dependency graph #47983

@dheerajkumar-solanki

Description

@dheerajkumar-solanki

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

apache-airflow-providers-common-sql==1.24.0
apache-airflow-providers-databricks==7.2.1
databricks-sql-connector==4.0.0

Apache Airflow version

2.10.3

Operating System

MacOs

Deployment

Docker-Compose

Deployment details

Testing using AWS MWAA Local Runner

What happened

When upgraded the databricks provider to version 7.2.1 and tested the DAG with DatabricksNotebookOperator, DAG is failing with the below error:
airflow.exceptions.AirflowException: Response: {"error_code":"INVALID_PARAMETER_VALUE","message":"Invalid dependency graph, task 'e771895875324e2902a93fdc2ff36326' can not reference itself."}, Status Code: 400

Image

What you think should happen instead

Ideally, It should generate correct dependency between Databricks tasks and deploy the Databricks Job. Whereas it is generating the wrong dependency graph for databricks tasks (referencing the dependent task itself).

How to reproduce

Below is the DAG code to regenerate the error:

import os
from airflow.models.dag import DAG
from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator
from airflow.providers.databricks.operators.databricks_workflow import DatabricksWorkflowTaskGroup
from airflow.utils.timezone import datetime

DATABRICKS_CONN_ID = os.getenv("DATABRICKS_CONN_ID", "databricks_default")

job_cluster_spec = [
    {
        "job_cluster_key": "Shared_job_cluster",
        "new_cluster": {
            "cluster_name": "",
            "spark_version": "11.3.x-scala2.12",
            "num_workers": 1,
            "spark_conf": {},
            "node_type_id": "r3.xlarge",
            "ssh_public_keys": [],
            "custom_tags": {},
            "spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"},
            "cluster_source": "JOB",
            "init_scripts": [],
        },
    }
]

dag = DAG(
    dag_id="example_databricks_workflow",
    start_date=datetime(2022, 1, 1),
    schedule=None,
    catchup=False,
)
with dag:
    task_group = DatabricksWorkflowTaskGroup(
        group_id=f"test_workflow",
        databricks_conn_id=DATABRICKS_CONN_ID,
        job_clusters=job_cluster_spec,
    )
    with task_group:
        notebook_1 = DatabricksNotebookOperator(
            task_id="workflow_notebook_1",
            databricks_conn_id=DATABRICKS_CONN_ID,
            notebook_path="/Shared/Notebook_1",
            source="WORKSPACE",
            job_cluster_key="Shared_job_cluster",
        )

        notebook_2 = DatabricksNotebookOperator(
            task_id="workflow_notebook_2",
            databricks_conn_id=DATABRICKS_CONN_ID,
            notebook_path="/Shared/Notebook_2",
            source="WORKSPACE",
            job_cluster_key="Shared_job_cluster",
        )

        notebook_1 >> notebook_2

Code is from Databricks example dags: https://github.com/apache/airflow/blob/providers-databricks/7.2.1/providers/databricks/tests/system/databricks/example_databricks_workflow.py

Anything else

It was working fine in apache-airflow-providers-databricks==6.12.0, when upgraded to 7.2.1 version, started getting the error.
Issue maybe related to this MR: #44960

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions