-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
databricks
Versions of Apache Airflow Providers
apache-airflow-providers-databricks==7.7.1
Apache Airflow version
3.0.6
Operating System
mac os
Deployment
Amazon (AWS) MWAA
Deployment details
we are using aws mwaa 3.0.6 version. we are using the providers version according to constraints
What happened
in dag we mentioned
task=DatabricksSubmitRunOperator.partial(
task_id = JOB_TASK_ID,
databricks_conn_id="dev_databricks_conn_sample",
new_cluster=CLUSTER,
libraries =LIBRARIES,
deferrable=True,
dag=dag,
access_control_list=ACL
).expand_kwargs(kwargs=fn_get_market_list())
the error we are getting in airflow logs is
ERROR - Trigger failed:
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 963, in cleanup_finished_triggers
result = details["task"].result()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 1072, in run_trigger
async for event in trigger.run():
File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/triggers/databricks.py", line 90, in run
run_state = await self.hook.a_get_run_state(self.run_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks.py", line 514, in a_get_run_state
response = await self._a_do_api_call(GET_RUN_ENDPOINT, json)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 713, in _a_do_api_call
url = self._endpoint_url(full_endpoint)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 623, in _endpoint_url
port = f":{self.databricks_conn.port}" if self.databricks_conn.port else ""
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/functools.py", line 998, in get
val = self.func(instance)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 142, in databricks_conn
return self.get_connection(self.databricks_conn_id) # type: ignore[return-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/hooks/base.py", line 64, in get_connection
conn = Connection.get_connection_from_secrets(conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py", line 478, in get_connection_from_secrets
conn = TaskSDKConnection.get(conn_id=conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/sdk/definitions/connection.py", line 144, in get
return _get_connection(conn_id)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/context.py", line 160, in _get_connection
msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 740, in send
return async_to_sync(self.asend)(msg)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py", line 186, in call
raise RuntimeError(
RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.
What you think should happen instead
it should defer the dag instead failing?
How to reproduce
any fix?
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct