-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
databricks
Versions of Apache Airflow Providers
apache-airflow-providers-databricks==7.2.0
Apache Airflow version
2.10.5
Operating System
Debian Bookworm
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
What happened
When DatabricksWorkflowTaskGroup contains at least one task that has a child task, launch task generates wrong json payload. Tasks with parents are setting it owns task_key in depends_on field.
What you think should happen instead
Tasks with parent/s should set parent task_key instead of it owns task_key
How to reproduce
Create DatabricksWorkflowTaskGroup. Then add a task_A >> task_B,
Task_B will add it's owns task_key in "depends_on" field instead of task_A task_key
Anything else
I think the issue is located in file airflow/providers/databricks/operators/databricks.py.py
The first time _generated_databricks_task_key is executed, even if task_id is provided or not, self._databricks_task_key will be set. It means no matter how many times "_generate_databricks_task_key", even passing differents task_id param, is called, it always will return the same value, the one returned in the first call.
def _generate_databricks_task_key(self, task_id: str | None = None) -> str:
"""Create a databricks task key using the hash of dag_id and task_id."""
if not self._databricks_task_key or len(self._databricks_task_key) > 100:
self.log.info(
"databricks_task_key has not be provided or the provided one exceeds 100 characters and will be truncated by the Databricks API. This will cause failure when trying to monitor the task. A task_key will be generated using the hash value of dag_id+task_id"
)
task_id = task_id or self.task_id
task_key = f"{self.dag_id}__{task_id}".encode()
self._databricks_task_key = hashlib.md5(task_key).hexdigest()
self.log.info("Generated databricks task_key: %s", self._databricks_task_key)
return self._databricks_task_key
If we check block of code that converts a task into databricks_task, is setting "task_key" with self.databricks_task_key. At this point if _generate_databricks_task_key is called again, with or without task_id param, it will return always the same value due self._databricks_task_key is not "None" anymore.
def _convert_to_databricks_workflow_task(
self, relevant_upstreams: list[BaseOperator], context: Context | None = None
) -> dict[str, object]:
"""Convert the operator to a Databricks workflow task that can be a task in a workflow."""
base_task_json = self._get_task_base_json()
result = {
"task_key": self.databricks_task_key,
"depends_on": [
{"task_key": self._generate_databricks_task_key(task_id)}
for task_id in self.upstream_task_ids
if task_id in relevant_upstreams
],
**base_task_json,
}
if self.existing_cluster_id and self.job_cluster_key:
raise ValueError(
"Both existing_cluster_id and job_cluster_key are set. Only one can be set per task."
)
if self.existing_cluster_id:
result["existing_cluster_id"] = self.existing_cluster_id
elif self.job_cluster_key:
result["job_cluster_key"] = self.job_cluster_key
return result
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct