Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions task-sdk/src/airflow/sdk/definitions/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
# under the License.
from __future__ import annotations

import asyncio
import json
import logging
from json import JSONDecodeError
Expand Down Expand Up @@ -226,24 +225,6 @@ def get(cls, conn_id: str) -> Any:
return _get_connection(conn_id)
except AirflowRuntimeError as e:
cls._handle_connection_error(e, conn_id)
except RuntimeError as e:
# The error from async_to_sync is a RuntimeError, so we have to fall back to text matching
if str(e).startswith("You cannot use AsyncToSync in the same thread as an async event loop"):
import greenback

task = asyncio.current_task()
if greenback.has_portal(task):
import warnings

warnings.warn(
"You should not use sync calls here -- use `await Conn.async_get` instead",
stacklevel=2,
)

return greenback.await_(cls.async_get(conn_id))

log.exception("async_to_sync failed")
raise

@classmethod
async def async_get(cls, conn_id: str) -> Any:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ def get_connection(self, conn_id: str) -> Connection | None: # type: ignore[ove

# Convert ExecutionAPI response to SDK Connection
return _process_connection_result_conn(msg)
except RuntimeError as e:
# TriggerCommsDecoder.send() uses async_to_sync internally, which raises RuntimeError
# when called within an async event loop. In greenback portal contexts (triggerer),
# we catch this and use greenback to call the async version instead.
if str(e).startswith("You cannot use AsyncToSync in the same thread as an async event loop"):
import asyncio

import greenback

task = asyncio.current_task()
if greenback.has_portal(task):
import warnings

warnings.warn(
"You should not use sync calls here -- use `await aget_connection` instead",
stacklevel=2,
)
return greenback.await_(self.aget_connection(conn_id))
# Fall through to the general exception handler for other RuntimeErrors
return None
except Exception:
# If SUPERVISOR_COMMS fails for any reason, return None
# to allow fallback to other backends
Expand Down
38 changes: 38 additions & 0 deletions task-sdk/tests/task_sdk/execution_time/test_secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import pytest

from airflow.sdk.definitions.connection import Connection
from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend


Expand Down Expand Up @@ -120,6 +121,43 @@ def test_get_conn_value_not_implemented(self):
with pytest.raises(NotImplementedError, match="Use get_connection instead"):
backend.get_conn_value("test_conn")

def test_runtime_error_triggers_greenback_fallback(self, mocker, mock_supervisor_comms):
"""
Test that RuntimeError from async_to_sync triggers greenback fallback.

This test verifies the fix for issue #57145: when SUPERVISOR_COMMS.send()
raises the specific RuntimeError about async_to_sync in an event loop,
the backend catches it and uses greenback to call aget_connection().
"""

# Expected connection to be returned
expected_conn = Connection(
conn_id="databricks_default",
conn_type="databricks",
host="example.databricks.com",
)

# Simulate the RuntimeError that triggers greenback fallback
mock_supervisor_comms.send.side_effect = RuntimeError(
"You cannot use AsyncToSync in the same thread as an async event loop"
)

# Mock the greenback and asyncio modules that are imported inside the exception handler
mocker.patch("greenback.has_portal", return_value=True)
mock_greenback_await = mocker.patch("greenback.await_", return_value=expected_conn)
mocker.patch("asyncio.current_task")

backend = ExecutionAPISecretsBackend()
conn = backend.get_connection("databricks_default")

# Verify we got the expected connection
assert conn is not None
assert conn.conn_id == "databricks_default"
# Verify the greenback fallback was called
mock_greenback_await.assert_called_once()
# Verify send was attempted first (and raised RuntimeError)
mock_supervisor_comms.send.assert_called_once()


class TestContextDetection:
"""Test context detection in ensure_secrets_backend_loaded."""
Expand Down