Skip to content

Databricks Provider _get_databricks_task_id only cleanses task id #44250

@mwoods-familiaris

Description

@mwoods-familiaris

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

apache-airflow-providers-databricks==6.13.*

Apache Airflow version

2.10.2

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Astronomer

Deployment details

No response

What happened

_get_databricks_task_id only cleanses the task id, ref:

return f"{task.dag_id}__{task.task_id.replace('.', '__')}"

task_id = f"{self.dag_id}__{task_id.replace('.', '__')}"

However, the dag_id may also contain . - so the replacement of . with __ should be applied to the whole string, not just the task id portion, else periods placed in the dag name results in errors such as:

[2024-11-21, 13:12:42 GMT] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py", line 1252, in execute
    self.monitor_databricks_job()
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py", line 1203, in monitor_databricks_job
    current_task_run_id = self._get_current_databricks_task()["run_id"]
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py", line 1165, in _get_current_databricks_task
    return {task["task_key"]: task for task in sorted_task_runs}[
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyError: 'my.airflow.dag.with.periods__my_airflow_task'

(as the invalid chars are getting silently stripped by databricks, so the task key on the databricks side is myairflowdagwithperiods__my_airflow_task rather than my.airflow.dag.with.periods__my_airflow_task)

What you think should happen instead

The replacement of . with __ should be applied to the whole task key / run name string, not just the task id portion

How to reproduce

Use the affected operator(s) e.g. DatabricksNotebookOperator on a DAG which contains . in the dag_id

Anything else

Every time

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions