Skip to content

XCom.get_value() via SDK fails in extra link plugin with SUPERVISOR_COMMS ImportError #59093

@varaprasadregani

Description

@varaprasadregani

Apache Airflow version

3.1.3

If "Other Airflow 2/3 version" selected, which one?

No response

What happened?

When using XCom.get_value from the Task SDK / compat SDK inside an extra link plugin, the Airflow webserver/API fails with:
ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner'

This happens when the UI calls the /api/v2/.../links endpoint to resolve extra links for a task instance.

The same plugin works fine without the XCom.get_value(...) call (links render correctly). The problem only appears as soon as XCom.get_value is used in the BaseOperatorLink.get_link() implementation.

This looks like a bug / unsupported usage path where the Task SDK assumes it is running in a task runner context (with SUPERVISOR_COMMS) even though extra links are resolved in the API server / webserver context.

What you think should happen instead?

Using XCom.get_value(key="data_doc", ti_key=ti_key) inside BaseOperatorLink.get_link() shouldn’t crash the /api/v2/.../links endpoint.
Image

If there is no XCom row, I’d expect None or similar; the extra link should either:
• return None/empty and be hidden, or
• return the URL string and render normally.

How to reproduce

  1. Add the plugin file above to $AIRFLOW_HOME/plugins (or the Astro project plugins/ directory).
from airflow.providers.common.compat.sdk import BaseOperator, BaseOperatorLink, XCom

from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin

from airflow.providers.standard.operators.python import PythonOperator


class RerunTask(BaseOperatorLink):
    name = "Rerun Task"

    def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
        azure_url_code = XCom.get_value(key="data_doc", ti_key=ti_key)
        return "https://www.astronomer.io/"


class PythonOperatorLink(BaseOperatorLink):
    name = "GE Result"

    def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
        azure_url_code = XCom.get_value(key="data_doc", ti_key=ti_key)
        return 'https://google.com'

class AirflowExtraLinkPlugin(AirflowPlugin):
    name = "extra_link_plugin"

    operator_extra_links = [
        RerunTask(),
    ]

    global_operator_extra_links = [
        PythonOperatorLink(),
        RerunTask(),
    ]
  1. Restart Airflow.
  2. Trigger the example_extra_links DAG once.
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime

with DAG(
    "example_extra_links",
    start_date=datetime(2025, 1, 1),
    schedule=None,
    catchup=False,
):
    def my_task(**context):
        context["ti"].xcom_push(key="data_doc", value="https://example.com/ge_result")

    python_task = PythonOperator(
        task_id="python_task_traditional",
        python_callable=my_task,
    )
  1. Go to Grid view → click python_task → open Details.
  2. API Server logs show the SUPERVISOR_COMMS ImportError.

Operating System

MacOS

Versions of Apache Airflow Providers

No response

Deployment

Astronomer

Deployment details

No response

Anything else?

Full Traceback:

ERROR:    Exception in ASGI application
  + Exception Group Traceback (most recent call last):
  |   File "/usr/local/lib/python3.12/site-packages/starlette/_utils.py", line 79, in collapse_excgroups
  |     yield
  |   File "/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line 183, in __call__
  |     async with anyio.create_task_group() as task_group:
  |                ^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 781, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/usr/local/lib/python3.12/site-packages/uvicorn/protocols/http/httptools_impl.py", line 409, in run_asgi
    |     result = await app(  # type: ignore[func-returns-value]
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.12/site-packages/fastapi/applications.py", line 1082, in __call__
    |     await super().__call__(scope, receive, send)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/applications.py", line 113, in __call__
    |     await self.middleware_stack(scope, receive, send)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/middleware/errors.py", line 186, in __call__
    |     raise exc
    |   File "/usr/local/lib/python3.12/site-packages/starlette/middleware/errors.py", line 164, in __call__
    |     await self.app(scope, receive, _send)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line 29, in __call__
    |     await responder(scope, receive, send)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line 130, in __call__
    |     await super().__call__(scope, receive, send)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line 46, in __call__
    |     await self.app(scope, receive, self.send_with_compression)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/middleware/cors.py", line 85, in __call__
    |     await self.app(scope, receive, send)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line 182, in __call__
    |     with recv_stream, send_stream, collapse_excgroups():
    |                                    ^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
    |     self.gen.throw(value)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/_utils.py", line 85, in collapse_excgroups
    |     raise exc
    |   File "/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line 184, in __call__
    |     response = await self.dispatch_func(request, call_next)
    |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.12/site-packages/astronomer/runtime/plugin.py", line 90, in dispatch
    |     response = await call_next(request)
    |                ^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line 159, in call_next
    |     raise app_exc
    |   File "/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line 144, in coro
    |     await self.app(scope, receive_or_disconnect, send_no_error)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/middleware/exceptions.py", line 63, in __call__
    |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    |     raise exc
    |   File "/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    |     await app(scope, receive, sender)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 716, in __call__
    |     await self.middleware_stack(scope, receive, send)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 736, in app
    |     await route.handle(scope, receive, send)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 290, in handle
    |     await self.app(scope, receive, send)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 78, in app
    |     await wrap_app_handling_exceptions(app, request)(scope, receive, send)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    |     raise exc
    |   File "/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    |     await app(scope, receive, sender)
    |   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 75, in app
    |     response = await f(request)
    |                ^^^^^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.12/site-packages/fastapi/routing.py", line 308, in app
    |     raw_response = await run_endpoint_function(
    |                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.12/site-packages/fastapi/routing.py", line 221, in run_endpoint_function
    |     return await run_in_threadpool(dependant.call, **values)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.12/site-packages/starlette/concurrency.py", line 38, in run_in_threadpool
    |     return await anyio.to_thread.run_sync(func)
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.12/site-packages/anyio/to_thread.py", line 56, in run_sync
    |     return await get_async_backend().run_sync_in_worker_thread(
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2485, in run_sync_in_worker_thread
    |     return await future
    |            ^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 976, in run
    |     result = context.run(func, *args)
    |              ^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.12/site-packages/airflow/api_fastapi/core_api/routes/public/extra_links.py", line 81, in get_extra_links
    |     all_extra_links = {link_name: link_url or None for link_name, link_url in sorted(all_extra_link_pairs)}
    |                                                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.12/site-packages/airflow/api_fastapi/core_api/routes/public/extra_links.py", line 79, in <genexpr>
    |     (link_name, task.get_extra_links(ti, link_name)) for link_name in task.extra_links
    |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 1407, in get_extra_links
    |     return link.get_link(self, ti_key=ti.key)  # type: ignore[arg-type]
    |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/airflow/plugins/extra_link_plugin.py", line 21, in get_link
    |     azure_url_code = XCom.get_value(key="data_doc", ti_key=ti_key)
    |                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/bases/xcom.py", line 153, in get_value
    |     return cls.get_one(
    |            ^^^^^^^^^^^^
    |   File "/usr/local/lib/python3.12/site-packages/airflow/sdk/bases/xcom.py", line 246, in get_one
    |     from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
    | ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions