Skip to content

Databricks Operator set it owns task_key in depends_on instead of parent task key #47614

@pacmora

Description

@pacmora

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

apache-airflow-providers-databricks==7.2.0

Apache Airflow version

2.10.5

Operating System

Debian Bookworm

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened

When DatabricksWorkflowTaskGroup contains at least one task that has a child task, launch task generates wrong json payload. Tasks with parents are setting it owns task_key in depends_on field.

What you think should happen instead

Tasks with parent/s should set parent task_key instead of it owns task_key

How to reproduce

Create DatabricksWorkflowTaskGroup. Then add a task_A >> task_B,

Task_B will add it's owns task_key in "depends_on" field instead of task_A task_key

Anything else

I think the issue is located in file airflow/providers/databricks/operators/databricks.py.py

The first time _generated_databricks_task_key is executed, even if task_id is provided or not, self._databricks_task_key will be set. It means no matter how many times "_generate_databricks_task_key", even passing differents task_id param, is called, it always will return the same value, the one returned in the first call.


def _generate_databricks_task_key(self, task_id: str | None = None) -> str:
    """Create a databricks task key using the hash of dag_id and task_id."""
    if not self._databricks_task_key or len(self._databricks_task_key) > 100:
        self.log.info(
            "databricks_task_key has not be provided or the provided one exceeds 100 characters and will be truncated by the Databricks API. This will cause failure when trying to monitor the task. A task_key will be generated using the hash value of dag_id+task_id"
        )
        task_id = task_id or self.task_id
        task_key = f"{self.dag_id}__{task_id}".encode()
        self._databricks_task_key = hashlib.md5(task_key).hexdigest()
        self.log.info("Generated databricks task_key: %s", self._databricks_task_key)
    return self._databricks_task_key

If we check block of code that converts a task into databricks_task, is setting "task_key" with self.databricks_task_key. At this point if _generate_databricks_task_key is called again, with or without task_id param, it will return always the same value due self._databricks_task_key is not "None" anymore.

def _convert_to_databricks_workflow_task(
    self, relevant_upstreams: list[BaseOperator], context: Context | None = None
) -> dict[str, object]:
    """Convert the operator to a Databricks workflow task that can be a task in a workflow."""
    base_task_json = self._get_task_base_json()
    result = {
        "task_key": self.databricks_task_key,
        "depends_on": [
            {"task_key": self._generate_databricks_task_key(task_id)}
            for task_id in self.upstream_task_ids
            if task_id in relevant_upstreams
        ],
        **base_task_json,
    }

    if self.existing_cluster_id and self.job_cluster_key:
        raise ValueError(
            "Both existing_cluster_id and job_cluster_key are set. Only one can be set per task."
        )
    if self.existing_cluster_id:
        result["existing_cluster_id"] = self.existing_cluster_id
    elif self.job_cluster_key:
        result["job_cluster_key"] = self.job_cluster_key

    return result

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions