-
Notifications
You must be signed in to change notification settings - Fork 151
Initial A2A Integration #218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ad7b3d9
10c9bb1
5fe7f9a
8ae3c39
b87b1ed
23c31f9
25c331c
82f251e
f0d505e
3ed915b
a522a3f
9d981f0
8f2561e
f7c0f83
045897f
6f1543e
9a3f5e7
67ccda0
84d1a78
a727d14
f301054
b930c09
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -216,6 +216,9 @@ def __init__( | |
record_direct_tool_call: bool = True, | ||
load_tools_from_directory: bool = True, | ||
trace_attributes: Optional[Mapping[str, AttributeValue]] = None, | ||
*, | ||
name: str | None = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we switch to |
||
description: str | None = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The PR description has this example:
But per this code, the name/description are now part of Agent correct? Could the PR description be updated? |
||
): | ||
"""Initialize the Agent with the specified configuration. | ||
|
||
|
@@ -248,6 +251,10 @@ def __init__( | |
load_tools_from_directory: Whether to load and automatically reload tools in the `./tools/` directory. | ||
Defaults to True. | ||
trace_attributes: Custom trace attributes to apply to the agent's trace span. | ||
name: name of the Agent | ||
Defaults to None. | ||
description: description of what the Agent does | ||
Defaults to None. | ||
|
||
Raises: | ||
ValueError: If max_parallel_tools is less than 1. | ||
|
@@ -310,6 +317,8 @@ def __init__( | |
self.trace_span: Optional[trace.Span] = None | ||
|
||
self.tool_caller = Agent.ToolCaller(self) | ||
self.name = name | ||
self.description = description | ||
|
||
@property | ||
def tool(self) -> ToolCaller: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
"""Multiagent capabilities for Strands Agents. | ||
|
||
This module provides support for multiagent systems, including agent-to-agent (A2A) | ||
communication protocols and coordination mechanisms. | ||
|
||
Submodules: | ||
a2a: Implementation of the Agent-to-Agent (A2A) protocol, which enables | ||
standardized communication between agents. | ||
""" | ||
|
||
from . import a2a | ||
|
||
__all__ = ["a2a"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
"""Agent-to-Agent (A2A) communication protocol implementation for Strands Agents. | ||
|
||
This module provides classes and utilities for enabling Strands Agents to communicate | ||
with other agents using the Agent-to-Agent (A2A) protocol. | ||
|
||
Docs: https://google-a2a.github.io/A2A/latest/ | ||
|
||
Classes: | ||
A2AAgent: A wrapper that adapts a Strands Agent to be A2A-compatible. | ||
""" | ||
|
||
from .agent import A2AAgent | ||
|
||
__all__ = ["A2AAgent"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
"""A2A-compatible wrapper for Strands Agent. | ||
|
||
This module provides the A2AAgent class, which adapts a Strands Agent to the A2A protocol, | ||
allowing it to be used in A2A-compatible systems. | ||
""" | ||
|
||
import logging | ||
from typing import Any, Literal | ||
|
||
import uvicorn | ||
from a2a.server.apps import A2AFastAPIApplication, A2AStarletteApplication | ||
from a2a.server.request_handlers import DefaultRequestHandler | ||
from a2a.server.tasks import InMemoryTaskStore | ||
from a2a.types import AgentCapabilities, AgentCard, AgentSkill | ||
from fastapi import FastAPI | ||
from starlette.applications import Starlette | ||
|
||
from ...agent.agent import Agent as SAAgent | ||
from .executor import StrandsA2AExecutor | ||
|
||
log = logging.getLogger(__name__) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit -> I think our convention is |
||
|
||
|
||
class A2AAgent: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know we also wanted the concept of an A2A agent that Strands can talk to like a StrandsAgent - what were we thinking with naming there? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As is, this seems more like a |
||
"""A2A-compatible wrapper for Strands Agent.""" | ||
|
||
def __init__( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there anything we can do to mark this as experiemental/in-development? Maybe log an info message in the constructor or something? |
||
self, | ||
agent: SAAgent, | ||
*, | ||
# AgentCard | ||
host: str = "0.0.0", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this have a 4th zero or is this not an ip? |
||
port: int = 9000, | ||
version: str = "0.0.1", | ||
): | ||
"""Initialize an A2A-compatible agent from a Strands agent. | ||
|
||
Args: | ||
agent: The Strands Agent to wrap with A2A compatibility. | ||
name: The name of the agent, used in the AgentCard. | ||
description: A description of the agent's capabilities, used in the AgentCard. | ||
host: The hostname or IP address to bind the A2A server to. Defaults to "localhost". | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now it hosts to "0.0.0" actually |
||
port: The port to bind the A2A server to. Defaults to 9000. | ||
version: The version of the agent. Defaults to "0.0.1". | ||
""" | ||
self.host = host | ||
self.port = port | ||
self.http_url = f"http://{self.host}:{self.port}/" | ||
self.version = version | ||
self.strands_agent = agent | ||
self.name = self.strands_agent.name | ||
self.description = self.strands_agent.description | ||
# TODO: enable configurable capabilities and request handler | ||
self.capabilities = AgentCapabilities() | ||
self.request_handler = DefaultRequestHandler( | ||
agent_executor=StrandsA2AExecutor(self.strands_agent), | ||
task_store=InMemoryTaskStore(), | ||
) | ||
|
||
@property | ||
def public_agent_card(self) -> AgentCard: | ||
"""Get the public AgentCard for this agent. | ||
|
||
The AgentCard contains metadata about the agent, including its name, | ||
description, URL, version, skills, and capabilities. This information | ||
is used by other agents and systems to discover and interact with this agent. | ||
|
||
Returns: | ||
AgentCard: The public agent card containing metadata about this agent. | ||
|
||
Raises: | ||
ValueError: If name or description is None or empty. | ||
""" | ||
if not self.name: | ||
raise ValueError("A2A agent name cannot be None or empty") | ||
if not self.description: | ||
raise ValueError("A2A agent description cannot be None or empty") | ||
|
||
return AgentCard( | ||
name=self.name, | ||
description=self.description, | ||
url=self.http_url, | ||
version=self.version, | ||
skills=self.agent_skills, | ||
defaultInputModes=["text"], | ||
defaultOutputModes=["text"], | ||
capabilities=self.capabilities, | ||
) | ||
|
||
@property | ||
def agent_skills(self) -> list[AgentSkill]: | ||
"""Get the list of skills this agent provides. | ||
|
||
Skills represent specific capabilities that the agent can perform. | ||
Strands agent tools are adapted to A2A skills. | ||
|
||
Returns: | ||
list[AgentSkill]: A list of skills this agent provides. | ||
""" | ||
# TODO: translate Strands tools (native & MCP) to skills | ||
return [] | ||
|
||
def to_starlette_app(self) -> Starlette: | ||
"""Create a Starlette application for serving this agent via HTTP. | ||
|
||
This method creates a Starlette application that can be used to serve | ||
the agent via HTTP using the A2A protocol. | ||
|
||
Returns: | ||
Starlette: A Starlette application configured to serve this agent. | ||
""" | ||
return A2AStarletteApplication(agent_card=self.public_agent_card, http_handler=self.request_handler).build() | ||
|
||
def to_fastapi_app(self) -> FastAPI: | ||
"""Create a FastAPI application for serving this agent via HTTP. | ||
|
||
This method creates a FastAPI application that can be used to serve | ||
the agent via HTTP using the A2A protocol. | ||
|
||
Returns: | ||
FastAPI: A FastAPI application configured to serve this agent. | ||
""" | ||
return A2AFastAPIApplication(agent_card=self.public_agent_card, http_handler=self.request_handler).build() | ||
|
||
def serve(self, app_type: Literal["fastapi", "starlette"] = "starlette", **kwargs: Any) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this required by A2A or is this our thing? Now that we have this concept, I sort of want it for Strands natively - or does that not make sense now that A2A exists? |
||
"""Start the A2A server with the specified application type. | ||
|
||
This method starts an HTTP server that exposes the agent via the A2A protocol. | ||
The server can be implemented using either FastAPI or Starlette, depending on | ||
the specified app_type. | ||
|
||
Args: | ||
app_type: The type of application to serve, either "fastapi" or "starlette". | ||
Defaults to "starlette". | ||
**kwargs: Additional keyword arguments to pass to uvicorn.run. | ||
""" | ||
try: | ||
log.info("Starting Strands A2A server...") | ||
if app_type == "fastapi": | ||
uvicorn.run(self.to_fastapi_app(), host=self.host, port=self.port, **kwargs) | ||
else: | ||
uvicorn.run(self.to_starlette_app(), host=self.host, port=self.port, **kwargs) | ||
except KeyboardInterrupt: | ||
log.warning("Strands A2A server shutdown requested (KeyboardInterrupt).") | ||
except Exception: | ||
log.exception("Strands A2A server encountered exception.") | ||
finally: | ||
log.info("Strands A2A server has shutdown.") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
"""Strands Agent executor for the A2A protocol. | ||
|
||
This module provides the StrandsA2AExecutor class, which adapts a Strands Agent | ||
to be used as an executor in the A2A protocol. It handles the execution of agent | ||
requests and the conversion of Strands Agent responses to A2A events. | ||
""" | ||
|
||
import logging | ||
|
||
from a2a.server.agent_execution import AgentExecutor, RequestContext | ||
from a2a.server.events import EventQueue | ||
from a2a.types import UnsupportedOperationError | ||
from a2a.utils import new_agent_text_message | ||
from a2a.utils.errors import ServerError | ||
|
||
from ...agent.agent import Agent as SAAgent | ||
from ...agent.agent_result import AgentResult as SAAgentResult | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class StrandsA2AExecutor(AgentExecutor): | ||
"""Executor that adapts a Strands Agent to the A2A protocol.""" | ||
|
||
def __init__(self, agent: SAAgent): | ||
"""Initialize a StrandsA2AExecutor. | ||
|
||
Args: | ||
agent: The Strands Agent to adapt to the A2A protocol. | ||
""" | ||
self.agent = agent | ||
|
||
async def execute( | ||
self, | ||
context: RequestContext, | ||
event_queue: EventQueue, | ||
) -> None: | ||
"""Execute a request using the Strands Agent and send the response as A2A events. | ||
|
||
This method executes the user's input using the Strands Agent and converts | ||
the agent's response to A2A events, which are then sent to the event queue. | ||
|
||
Args: | ||
context: The A2A request context, containing the user's input and other metadata. | ||
event_queue: The A2A event queue, used to send response events. | ||
""" | ||
result: SAAgentResult = self.agent(context.get_user_input()) | ||
if result.message and "content" in result.message: | ||
for content_block in result.message["content"]: | ||
if "text" in content_block: | ||
await event_queue.enqueue_event(new_agent_text_message(content_block["text"])) | ||
|
||
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add this as a use case for #81 |
||
"""Cancel an ongoing execution. | ||
|
||
This method is called when a request is cancelled. Currently, cancellation | ||
is not supported, so this method raises an UnsupportedOperationError. | ||
|
||
Args: | ||
context: The A2A request context. | ||
event_queue: The A2A event queue. | ||
|
||
Raises: | ||
ServerError: Always raised with an UnsupportedOperationError, as cancellation | ||
is not currently supported. | ||
""" | ||
raise ServerError(error=UnsupportedOperationError()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""Tests for the multiagent module.""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""Tests for the A2A module.""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
"""Common fixtures for A2A module tests.""" | ||
|
||
from unittest.mock import AsyncMock, MagicMock | ||
|
||
import pytest | ||
from a2a.server.agent_execution import RequestContext | ||
from a2a.server.events import EventQueue | ||
|
||
from strands.agent.agent import Agent as SAAgent | ||
from strands.agent.agent_result import AgentResult as SAAgentResult | ||
|
||
|
||
@pytest.fixture | ||
def mock_strands_agent(): | ||
"""Create a mock Strands Agent for testing.""" | ||
agent = MagicMock(spec=SAAgent) | ||
agent.name = "Test Agent" | ||
agent.description = "A test agent for unit testing" | ||
|
||
# Setup default response | ||
mock_result = MagicMock(spec=SAAgentResult) | ||
mock_result.message = {"content": [{"text": "Test response"}]} | ||
agent.return_value = mock_result | ||
|
||
return agent | ||
|
||
|
||
@pytest.fixture | ||
def mock_request_context(): | ||
"""Create a mock RequestContext for testing.""" | ||
context = MagicMock(spec=RequestContext) | ||
context.get_user_input.return_value = "Test input" | ||
return context | ||
|
||
|
||
@pytest.fixture | ||
def mock_event_queue(): | ||
"""Create a mock EventQueue for testing.""" | ||
queue = MagicMock(spec=EventQueue) | ||
queue.enqueue_event = AsyncMock() | ||
return queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🥺 What's this needed for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, is this because of the dependency conflict stuff? If so, can you add comments throughout the file indicating that?
Also, do we have a tracking issue for the dependency issue. I think the long-term fix is to remove the http exporter dependency from strands?