-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
databricks
Versions of Apache Airflow Providers
apache-airflow-providers-databricks==6.13.*
Apache Airflow version
2.10.2
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Astronomer
Deployment details
No response
What happened
_get_databricks_task_id only cleanses the task id, ref:
airflow/providers/src/airflow/providers/databricks/plugins/databricks_workflow.py
Line 67 in a924284
| return f"{task.dag_id}__{task.task_id.replace('.', '__')}" |
| task_id = f"{self.dag_id}__{task_id.replace('.', '__')}" |
However, the dag_id may also contain . - so the replacement of . with __ should be applied to the whole string, not just the task id portion, else periods placed in the dag name results in errors such as:
[2024-11-21, 13:12:42 GMT] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
return ExecutionCallableRunner(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py", line 1252, in execute
self.monitor_databricks_job()
File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py", line 1203, in monitor_databricks_job
current_task_run_id = self._get_current_databricks_task()["run_id"]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py", line 1165, in _get_current_databricks_task
return {task["task_key"]: task for task in sorted_task_runs}[
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyError: 'my.airflow.dag.with.periods__my_airflow_task'
(as the invalid chars are getting silently stripped by databricks, so the task key on the databricks side is myairflowdagwithperiods__my_airflow_task rather than my.airflow.dag.with.periods__my_airflow_task)
What you think should happen instead
The replacement of . with __ should be applied to the whole task key / run name string, not just the task id portion
How to reproduce
Use the affected operator(s) e.g. DatabricksNotebookOperator on a DAG which contains . in the dag_id
Anything else
Every time
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct