-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
3.1.3
If "Other Airflow 2/3 version" selected, which one?
No response
What happened?
When using XCom.get_value from the Task SDK / compat SDK inside an extra link plugin, the Airflow webserver/API fails with:
ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner'
This happens when the UI calls the /api/v2/.../links endpoint to resolve extra links for a task instance.
The same plugin works fine without the XCom.get_value(...) call (links render correctly). The problem only appears as soon as XCom.get_value is used in the BaseOperatorLink.get_link() implementation.
This looks like a bug / unsupported usage path where the Task SDK assumes it is running in a task runner context (with SUPERVISOR_COMMS) even though extra links are resolved in the API server / webserver context.
What you think should happen instead?
Using XCom.get_value(key="data_doc", ti_key=ti_key) inside BaseOperatorLink.get_link() shouldn’t crash the /api/v2/.../links endpoint.
If there is no XCom row, I’d expect None or similar; the extra link should either:
• return None/empty and be hidden, or
• return the URL string and render normally.
How to reproduce
- Add the plugin file above to $AIRFLOW_HOME/plugins (or the Astro project plugins/ directory).
from airflow.providers.common.compat.sdk import BaseOperator, BaseOperatorLink, XCom
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.standard.operators.python import PythonOperator
class RerunTask(BaseOperatorLink):
name = "Rerun Task"
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
azure_url_code = XCom.get_value(key="data_doc", ti_key=ti_key)
return "https://www.astronomer.io/"
class PythonOperatorLink(BaseOperatorLink):
name = "GE Result"
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
azure_url_code = XCom.get_value(key="data_doc", ti_key=ti_key)
return 'https://google.com'
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
RerunTask(),
]
global_operator_extra_links = [
PythonOperatorLink(),
RerunTask(),
]
- Restart Airflow.
- Trigger the example_extra_links DAG once.
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime
with DAG(
"example_extra_links",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
):
def my_task(**context):
context["ti"].xcom_push(key="data_doc", value="https://example.com/ge_result")
python_task = PythonOperator(
task_id="python_task_traditional",
python_callable=my_task,
)
- Go to Grid view → click python_task → open Details.
- API Server logs show the SUPERVISOR_COMMS ImportError.
Operating System
MacOS
Versions of Apache Airflow Providers
No response
Deployment
Astronomer
Deployment details
No response
Anything else?
Full Traceback:
ERROR: Exception in ASGI application
+ Exception Group Traceback (most recent call last):
| File "/usr/local/lib/python3.12/site-packages/starlette/_utils.py", line 79, in collapse_excgroups
| yield
| File "/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line 183, in __call__
| async with anyio.create_task_group() as task_group:
| ^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 781, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/usr/local/lib/python3.12/site-packages/uvicorn/protocols/http/httptools_impl.py", line 409, in run_asgi
| result = await app( # type: ignore[func-returns-value]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/fastapi/applications.py", line 1082, in __call__
| await super().__call__(scope, receive, send)
| File "/usr/local/lib/python3.12/site-packages/starlette/applications.py", line 113, in __call__
| await self.middleware_stack(scope, receive, send)
| File "/usr/local/lib/python3.12/site-packages/starlette/middleware/errors.py", line 186, in __call__
| raise exc
| File "/usr/local/lib/python3.12/site-packages/starlette/middleware/errors.py", line 164, in __call__
| await self.app(scope, receive, _send)
| File "/usr/local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line 29, in __call__
| await responder(scope, receive, send)
| File "/usr/local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line 130, in __call__
| await super().__call__(scope, receive, send)
| File "/usr/local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line 46, in __call__
| await self.app(scope, receive, self.send_with_compression)
| File "/usr/local/lib/python3.12/site-packages/starlette/middleware/cors.py", line 85, in __call__
| await self.app(scope, receive, send)
| File "/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line 182, in __call__
| with recv_stream, send_stream, collapse_excgroups():
| ^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
| self.gen.throw(value)
| File "/usr/local/lib/python3.12/site-packages/starlette/_utils.py", line 85, in collapse_excgroups
| raise exc
| File "/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line 184, in __call__
| response = await self.dispatch_func(request, call_next)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/astronomer/runtime/plugin.py", line 90, in dispatch
| response = await call_next(request)
| ^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line 159, in call_next
| raise app_exc
| File "/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line 144, in coro
| await self.app(scope, receive_or_disconnect, send_no_error)
| File "/usr/local/lib/python3.12/site-packages/starlette/middleware/exceptions.py", line 63, in __call__
| await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
| File "/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
| raise exc
| File "/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
| await app(scope, receive, sender)
| File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 716, in __call__
| await self.middleware_stack(scope, receive, send)
| File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 736, in app
| await route.handle(scope, receive, send)
| File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 290, in handle
| await self.app(scope, receive, send)
| File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 78, in app
| await wrap_app_handling_exceptions(app, request)(scope, receive, send)
| File "/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
| raise exc
| File "/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
| await app(scope, receive, sender)
| File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 75, in app
| response = await f(request)
| ^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/fastapi/routing.py", line 308, in app
| raw_response = await run_endpoint_function(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/fastapi/routing.py", line 221, in run_endpoint_function
| return await run_in_threadpool(dependant.call, **values)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/starlette/concurrency.py", line 38, in run_in_threadpool
| return await anyio.to_thread.run_sync(func)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/anyio/to_thread.py", line 56, in run_sync
| return await get_async_backend().run_sync_in_worker_thread(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2485, in run_sync_in_worker_thread
| return await future
| ^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 976, in run
| result = context.run(func, *args)
| ^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/airflow/api_fastapi/core_api/routes/public/extra_links.py", line 81, in get_extra_links
| all_extra_links = {link_name: link_url or None for link_name, link_url in sorted(all_extra_link_pairs)}
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/airflow/api_fastapi/core_api/routes/public/extra_links.py", line 79, in <genexpr>
| (link_name, task.get_extra_links(ti, link_name)) for link_name in task.extra_links
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 1407, in get_extra_links
| return link.get_link(self, ti_key=ti.key) # type: ignore[arg-type]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/airflow/plugins/extra_link_plugin.py", line 21, in get_link
| azure_url_code = XCom.get_value(key="data_doc", ti_key=ti_key)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/airflow/sdk/bases/xcom.py", line 153, in get_value
| return cls.get_one(
| ^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/airflow/sdk/bases/xcom.py", line 246, in get_one
| from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
| ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct