Skip to content

Commit e3a2ebe

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
feat: add A2A support in Agent Engine
PiperOrigin-RevId: 804787822
1 parent 8cf7994 commit e3a2ebe

File tree

9 files changed

+8
-698
lines changed

9 files changed

+8
-698
lines changed

setup.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,6 @@
142142
"google-adk >= 1.0.0, < 2.0.0",
143143
]
144144

145-
a2a_extra_require = [
146-
"a2a-sdk >= 0.3.4",
147-
]
148-
149145
reasoning_engine_extra_require = [
150146
"cloudpickle >= 3.0, < 4.0",
151147
"google-cloud-trace < 2",
@@ -329,7 +325,6 @@
329325
"ray": ray_extra_require,
330326
"ray_testing": ray_testing_extra_require,
331327
"adk": adk_extra_require,
332-
"a2a": a2a_extra_require,
333328
"reasoningengine": reasoning_engine_extra_require,
334329
"agent_engines": agent_engines_extra_require,
335330
"evaluation": evaluation_extra_require,

tests/unit/vertex_langchain/test_agent_engines.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3482,7 +3482,7 @@ def test_update_class_methods_spec_with_registered_operation_not_found(self):
34823482
"register the API methods: "
34833483
"https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/develop/custom#custom-methods. "
34843484
"Error: {Unsupported api mode: `UNKNOWN_API_MODE`, "
3485-
"Supported modes are: ``, `a2a_extension`, `async`, `async_stream`, "
3485+
"Supported modes are: ``, `async`, `async_stream`, "
34863486
"`bidi_stream`, `stream`.}"
34873487
),
34883488
),

tests/unit/vertexai/genai/test_agent_engines.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2073,7 +2073,7 @@ def test_update_agent_engine_description(self, mock_await_operation):
20732073
"register the API methods: "
20742074
"https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/develop/custom#custom-methods. "
20752075
"Error: {Unsupported api mode: `UNKNOWN_API_MODE`, "
2076-
"Supported modes are: ``, `a2a_extension`, `async`, `async_stream`, `stream`.}"
2076+
"Supported modes are: ``, `async`, `async_stream`, `stream`.}"
20772077
),
20782078
),
20792079
],

vertexai/_genai/_agent_engines_utils.py

Lines changed: 2 additions & 179 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@
4444
Union,
4545
)
4646

47-
import httpx
48-
4947
import proto
5048

5149
from google.api_core import exceptions
@@ -106,32 +104,6 @@
106104
Session = Any
107105

108106

109-
try:
110-
from a2a.types import (
111-
AgentCard,
112-
TransportProtocol,
113-
Message,
114-
TaskIdParams,
115-
TaskQueryParams,
116-
)
117-
from a2a.client import ClientConfig, ClientFactory
118-
119-
AgentCard = AgentCard
120-
TransportProtocol = TransportProtocol
121-
Message = Message
122-
ClientConfig = ClientConfig
123-
ClientFactory = ClientFactory
124-
TaskIdParams = TaskIdParams
125-
TaskQueryParams = TaskQueryParams
126-
except (ImportError, AttributeError):
127-
AgentCard = None
128-
TransportProtocol = None
129-
Message = None
130-
ClientConfig = None
131-
ClientFactory = None
132-
TaskIdParams = None
133-
TaskQueryParams = None
134-
135107
_ACTIONS_KEY = "actions"
136108
_ACTION_APPEND = "append"
137109
_AGENT_FRAMEWORK_ATTR = "agent_framework"
@@ -176,8 +148,6 @@
176148
_REQUIREMENTS_FILE = "requirements.txt"
177149
_STANDARD_API_MODE = ""
178150
_STREAM_API_MODE = "stream"
179-
_A2A_EXTENSION_MODE = "a2a_extension"
180-
_A2A_AGENT_CARD = "a2a_agent_card"
181151
_WARNINGS_KEY = "warnings"
182152
_WARNING_MISSING = "missing"
183153
_WARNING_INCOMPATIBLE = "incompatible"
@@ -536,32 +506,11 @@ def _generate_class_methods_spec_or_raise(
536506

537507
class_method = _to_proto(schema_dict)
538508
class_method[_MODE_KEY_IN_SCHEMA] = mode
539-
if hasattr(agent, "agent_card"):
540-
class_method[_A2A_AGENT_CARD] = getattr(
541-
agent, "agent_card"
542-
).model_dump_json()
543509
class_methods_spec.append(class_method)
544510

545511
return class_methods_spec
546512

547513

548-
def _is_pydantic_serializable(param: inspect.Parameter) -> bool:
549-
"""Checks if the parameter is pydantic serializable."""
550-
551-
if param.annotation == inspect.Parameter.empty:
552-
return True
553-
554-
if isinstance(param.annotation, str):
555-
return False
556-
557-
pydantic = _import_pydantic_or_raise()
558-
try:
559-
pydantic.TypeAdapter(param.annotation)
560-
return True
561-
except Exception:
562-
return False
563-
564-
565514
def _generate_schema(
566515
f: Callable[..., Any],
567516
*,
@@ -623,7 +572,7 @@ def _generate_schema(
623572
)
624573
# For a bidi endpoint, it requires an asyncio.Queue as the input, but
625574
# it is not JSON serializable. We hence exclude it from the schema.
626-
and param.annotation != asyncio.Queue and _is_pydantic_serializable(param)
575+
and param.annotation != asyncio.Queue
627576
}
628577
parameters = pydantic.create_model(f.__name__, **fields_dict).schema()
629578
# Postprocessing
@@ -941,7 +890,6 @@ def _register_api_methods_or_raise(
941890
_ASYNC_API_MODE: _wrap_async_query_operation,
942891
_STREAM_API_MODE: _wrap_stream_query_operation,
943892
_ASYNC_STREAM_API_MODE: _wrap_async_stream_query_operation,
944-
_A2A_EXTENSION_MODE: _wrap_a2a_operation,
945893
}
946894
if isinstance(wrap_operation_fn, dict) and api_mode in wrap_operation_fn:
947895
# Override the default function with user-specified function if it exists.
@@ -958,13 +906,7 @@ def _register_api_methods_or_raise(
958906
)
959907

960908
# Bind the method to the object.
961-
if api_mode == _A2A_EXTENSION_MODE:
962-
agent_card = operation_schema.get(_A2A_AGENT_CARD)
963-
method = _wrap_operation(
964-
method_name=method_name, agent_card=agent_card
965-
) # type: ignore[call-arg]
966-
else:
967-
method = _wrap_operation(method_name=method_name) # type: ignore[call-arg]
909+
method = _wrap_operation(method_name=method_name) # type: ignore[call-arg]
968910
method.__name__ = method_name
969911
if method_description and isinstance(method_description, str):
970912
method.__doc__ = method_description
@@ -1580,125 +1522,6 @@ async def _method(self: genai_types.AgentEngine, **kwargs) -> AsyncIterator[Any]
15801522
return _method
15811523

15821524

1583-
def _wrap_a2a_operation(method_name: str, agent_card: str) -> Callable[..., list]:
1584-
"""Wraps an Agent Engine method, creating a callable for A2A API.
1585-
1586-
Args:
1587-
method_name: The name of the Agent Engine method to call.
1588-
agent_card: The agent card to use for the A2A API call.
1589-
Example:
1590-
{'additionalInterfaces': None,
1591-
'capabilities': {'extensions': None,
1592-
'pushNotifications': None,
1593-
'stateTransitionHistory': None,
1594-
'streaming': False},
1595-
'defaultInputModes': ['text'],
1596-
'defaultOutputModes': ['text'],
1597-
'description': (
1598-
'A helpful assistant agent that can answer questions.'
1599-
),
1600-
'documentationUrl': None,
1601-
'iconUrl': None,
1602-
'name': 'Q&A Agent',
1603-
'preferredTransport': 'JSONRPC',
1604-
'protocolVersion': '0.3.0',
1605-
'provider': None,
1606-
'security': None,
1607-
'securitySchemes': None,
1608-
'signatures': None,
1609-
'skills': [{
1610-
'description': (
1611-
'A helpful assistant agent that can answer questions.'
1612-
),
1613-
'examples': ['Who is leading 2025 F1 Standings?',
1614-
'Where can i find an active volcano?'],
1615-
'id': 'question_answer',
1616-
'inputModes': None,
1617-
'name': 'Q&A Agent',
1618-
'outputModes': None,
1619-
'security': None,
1620-
'tags': ['Question-Answer']}],
1621-
'supportsAuthenticatedExtendedCard': True,
1622-
'url': 'http://localhost:8080/',
1623-
'version': '1.0.0'}
1624-
Returns:
1625-
A callable object that executes the method on the Agent Engine via
1626-
the A2A API.
1627-
"""
1628-
1629-
async def _method(self, **kwargs) -> Any:
1630-
"""Wraps an Agent Engine method, creating a callable for A2A API."""
1631-
if not self.api_client:
1632-
raise ValueError("api_client is not initialized.")
1633-
if not self.api_resource:
1634-
raise ValueError("api_resource is not initialized.")
1635-
a2a_agent_card = AgentCard(**json.loads(agent_card))
1636-
# A2A + AE integration currently only supports Rest API.
1637-
if (
1638-
a2a_agent_card.preferred_transport
1639-
and a2a_agent_card.preferred_transport != TransportProtocol.http_json
1640-
):
1641-
raise ValueError(
1642-
"Only HTTP+JSON is supported for preferred transport on agent card "
1643-
)
1644-
1645-
# Set preferred transport to HTTP+JSON if not set.
1646-
if not hasattr(a2a_agent_card, "preferred_transport"):
1647-
a2a_agent_card.preferred_transport = TransportProtocol.http_json
1648-
1649-
# AE cannot support streaming yet. Turn off streaming for now.
1650-
if a2a_agent_card.capabilities and a2a_agent_card.capabilities.streaming:
1651-
raise ValueError(
1652-
"Streaming is not supported in Agent Engine, please change "
1653-
"a2a_agent_card.capabilities.streaming to False."
1654-
)
1655-
1656-
if not hasattr(a2a_agent_card.capabilities, "streaming"):
1657-
a2a_agent_card.capabilities.streaming = False
1658-
1659-
# agent_card is set on the class_methods before set_up is invoked.
1660-
# Ensure that the agent_card url is set correctly before the client is created.
1661-
base_url = self.api_client._api_client._http_options.base_url.rstrip("/")
1662-
api_version = self.api_client._api_client._http_options.api_version
1663-
a2a_agent_card.url = f"{base_url}/{api_version}/{self.api_resource.name}/a2a"
1664-
1665-
# Using a2a client, inject the auth token from the global config.
1666-
config = ClientConfig(
1667-
supported_transports=[
1668-
TransportProtocol.http_json,
1669-
],
1670-
use_client_preference=True,
1671-
httpx_client=httpx.AsyncClient(
1672-
headers={
1673-
"Authorization": (
1674-
f"Bearer {self.api_client._api_client._credentials.token}"
1675-
)
1676-
}
1677-
),
1678-
)
1679-
factory = ClientFactory(config)
1680-
client = factory.create(a2a_agent_card)
1681-
1682-
if method_name == "on_message_send":
1683-
response = client.send_message(Message(**kwargs))
1684-
chunks = []
1685-
async for chunk in response:
1686-
chunks.append(chunk)
1687-
return chunks
1688-
elif method_name == "on_get_task":
1689-
response = await client.get_task(TaskQueryParams(**kwargs))
1690-
elif method_name == "on_cancel_task":
1691-
response = await client.cancel_task(TaskIdParams(**kwargs))
1692-
elif method_name == "handle_authenticated_agent_card":
1693-
response = await client.get_card()
1694-
else:
1695-
raise ValueError(f"Unknown method name: {method_name}")
1696-
1697-
return response
1698-
1699-
return _method
1700-
1701-
17021525
def _yield_parsed_json(http_response: google_genai_types.HttpResponse) -> Iterator[Any]:
17031526
"""Converts the body of the HTTP Response message to JSON format.
17041527

vertexai/_genai/agent_engines.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1335,7 +1335,6 @@ def _register_api_methods(
13351335
"async": _agent_engines_utils._wrap_async_query_operation,
13361336
"stream": _agent_engines_utils._wrap_stream_query_operation,
13371337
"async_stream": _agent_engines_utils._wrap_async_stream_query_operation,
1338-
"a2a_extension": _agent_engines_utils._wrap_a2a_operation,
13391338
},
13401339
)
13411340
except Exception as e:

0 commit comments

Comments
 (0)