Skip to content

Conversation

@seanghaeli
Copy link
Contributor

When these lines were merged from #54083:

https://github.com/apache/airflow/pull/54083/files/5f2f2d8c95dfc4bf9bc98c7f9c3bb97f34062da6#diff-acfa34467241916c1189174e98f87a4094e8cbf58d391fa084e6afb2594decbcL61-L71

The Amazon provider system tests all started failing because the testing infrastructure does not use task-sdk. An ImportError is thrown when the testing infrastructure tries to retrieve the connection via task-sdk. In this PR, we handle this import error and the testing infrastructure then falls back to the default connection, which is the standard way the tests are run. With this change, the tests work.

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Aug 20, 2025
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that will be fixed in final 3.1 (@amoghrajesh ?) - but for now it's good as it is.

@potiuk potiuk merged commit 64f6de9 into apache:main Aug 20, 2025
77 checks passed
@potiuk
Copy link
Member

potiuk commented Aug 20, 2025

Following the comment of @amoghrajesh -> this actualy should be fixed by installing task-sdk in the system tests environment..

Question @seanghaeli -> is there something "special" you do to get the environment for system tests?

@o-nikolas
Copy link
Contributor

because the testing infrastructure does not use task-sdk

I'm curious about this, we're certainly using the task API and we're not doing anything special beyond running the breeze system tests commands.

this actualy should be fixed by installing task-sdk in the system tests environment..

This is interesting, the image we use for most test workers is either breeze itself (in the local case) or the apache airflow public image from dockerhub. If task-sdk isn't installed in either we can be sure to do that, but shouldn't it be there already @potiuk?

@amoghrajesh
Copy link
Contributor

@o-nikolas yes it should. Could either of you show me some of the failures so that we can handle that better if needed? But long term (3.1+), task SDK should be the way to do these things.

@potiuk
Copy link
Member

potiuk commented Aug 20, 2025

This is interesting, the image we use for most test workers is either breeze itself (in the local case) or the apache airflow public image from dockerhub. If task-sdk isn't installed in either we can be sure to do that, but shouldn't it be there already @potiuk?

Task SDK is not installed in Airflow 2 - maybe you are using those when running system tests on it?

@o-nikolas
Copy link
Contributor

This is interesting, the image we use for most test workers is either breeze itself (in the local case) or the apache airflow public image from dockerhub. If task-sdk isn't installed in either we can be sure to do that, but shouldn't it be there already @potiuk?

Task SDK is not installed in Airflow 2 - maybe you are using those when running system tests on it?

We're running on main (so effectively 3.1) this is how this recent change affected our builds

@seanghaeli
Copy link
Contributor Author

@amoghrajesh the end of the stacktrace I was seeing is:

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 158, in _get_connection
    from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py)

This PR provides the workaround so that the testing infrastructure can handle the thrown error and then fall back to default credentials.

@o-nikolas
Copy link
Contributor

@amoghrajesh Re:

Could either of you show me some of the failures so that we can handle that better if needed?

Do you need anything further from @seanghaeli ?

mangal-vairalkar pushed a commit to mangal-vairalkar/airflow that referenced this pull request Aug 30, 2025
@o-nikolas
Copy link
Contributor

Hey @amoghrajesh

We're seeing this in our Kubernetes system tests as well, when run in deferrable mode specifically. Here is a traceback:

AirflowException: Traceback (most recent call last):
  File
"/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py", line
148, in run
    state = await self._wait_for_pod_start()
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File
"/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py", line
213, in _wait_for_pod_start
    pod = await self._get_pod()
          ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 189, in
async_wrapped
    return await copy(fn, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 111, in
__call__
    do = await self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 153, in iter
    result = await action(retry_state)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/site-packages/tenacity/_utils.py", line 99, in inner
    return call(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/site-packages/tenacity/__init__.py", line 420, in exc_check
    raise retry_exc.reraise()
          ^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/site-packages/tenacity/__init__.py", line 187, in reraise
    raise self.last_attempt.result()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/python/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 114, in
__call__
    result = await fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File
"/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py", line
276, in _get_pod
    pod = await self.hook.get_pod(name=self.pod_name, namespace=self.pod_namespace)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File
"/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
line 851, in get_pod
    async with self.get_conn() as connection:
               ^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/contextlib.py", line 210, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
  File
"/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
line 838, in get_conn
    kube_client = await self._load_config() or async_client.ApiClient()
                  ^^^^^^^^^^^^^^^^^^^^^^^^^
  File
"/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
line 757, in _load_config
    in_cluster = self._coalesce_param(self.in_cluster, await self._get_field("in_cluster"))
                                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File
"/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
line 828, in _get_field
    extras = await self.get_conn_extras()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File
"/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
line 816, in get_conn_extras
    connection = await sync_to_async(self.get_connection)(self.conn_id)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/site-packages/asgiref/sync.py", line 439, in __call__
    ret = await asyncio.shield(exec_coro)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/concurrent/futures/thread.py", line 59, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/site-packages/asgiref/sync.py", line 493, in thread_handler
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File
"/opt/airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
line 180, in get_connection
    return super().get_connection(conn_id)  # type: ignore[return-value]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/task-sdk/src/airflow/sdk/bases/hook.py", line 61, in get_connection
    conn = Connection.get(conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line 203, in get
    return _get_connection(conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 167, in
_get_connection
    from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner'
(/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py)

I would prefer to not put another catch-log-ignore in the cncf hook like we did here for the amazon provider (I'd preferably like to remove that one also).

Is it possible for you to have a look at this one?

@o-nikolas
Copy link
Contributor

o-nikolas commented Sep 4, 2025

I've done a lot of deep diving and I think I've identified what's happening here. So the above exception is actually coming from the reentry point when we're returning from the triggerer. Just before that exception in the logs we see:

[2025-09-03 19:16:03,685] {pod.py:909} ERROR - Trigger emitted an error event, failing the task: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py)
[2025-09-03T19:16:03.685+0000] {pod.py:909} ERROR - Trigger emitted an error event, failing the task: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py)
[2025-09-03T19:16:20.896+0000] {pod_manager.py:765} INFO - Pod run-pod-xxeffbtb has phase Pending
[2025-09-03T19:16:28.631+0000] {pod_manager.py:765} INFO - Pod run-pod-xxeffbtb has phase Pending
[2025-09-03T19:16:36.345+0000] {pod_manager.py:765} INFO - Pod run-pod-xxeffbtb has phase Pending
[2025-09-03T19:16:44.060+0000] {pod_manager.py:765} INFO - Pod run-pod-xxeffbtb has phase Pending
[2025-09-03 19:17:03,348] {pod.py:1181} INFO - Skipping deleting pod: run-pod-xxeffbtb
[2025-09-03T19:17:03.348+0000] {pod.py:1181} INFO - Skipping deleting pod: run-pod-xxeffbtb
2025-09-03 19:17:03 [error    ] Task failed with exception     [task]

And I see successful communication with task API comms above that. So it's actually within the Triggerer scope that we're failing to communicate with task api to get connection information.

This is because in the dag.test() world, we run triggerers inline here and we do not setup the inprocess supervisor comms like we do for regular task execution in dag.test() here.
Triggers definitely need a supervisor comms setup, you can see we do that in the normal case (i.e. not dag.test()) here.

So I believe we need to update the inline execution of triggers in dag.test() to setup a supervisor comms.

I have a draft PR here with what I think that might look like. But likely needs refinement: #55236

CC @kaxil @amoghrajesh

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants