Skip to content

Initial A2A Integration #218

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ format-fix = [
]
lint-check = [
"ruff check",
"mypy -p src"
"mypy -p src --exclude src/strands/multiagent"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥺 What's this needed for?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, is this because of the dependency conflict stuff? If so, can you add comments throughout the file indicating that?

Also, do we have a tracking issue for the dependency issue. I think the long-term fix is to remove the http exporter dependency from strands?

]
lint-fix = [
"ruff check --fix"
Expand Down Expand Up @@ -137,17 +137,27 @@ features = ["dev", "docs", "anthropic", "litellm", "llamaapi", "ollama", "otel"]
dev-mode = true
features = ["dev", "docs", "anthropic", "litellm", "llamaapi", "ollama", "a2a"]

[tool.hatch.envs.a2a.scripts]
run = [
"pytest{env:HATCH_TEST_ARGS:} tests/multiagent/a2a {args}"
]
run-cov = [
"pytest{env:HATCH_TEST_ARGS:} tests/multiagent/a2a --cov --cov-config=pyproject.toml {args}"
]
lint-check = [
"ruff check",
"mypy -p src/strands/multiagent/a2a"
]

[[tool.hatch.envs.hatch-test.matrix]]
python = ["3.13", "3.12", "3.11", "3.10"]


[tool.hatch.envs.hatch-test.scripts]
run = [
"pytest{env:HATCH_TEST_ARGS:} {args}"
"pytest{env:HATCH_TEST_ARGS:} {args} --ignore=tests/multiagent/a2a"
]
run-cov = [
"pytest{env:HATCH_TEST_ARGS:} --cov --cov-config=pyproject.toml {args}"
"pytest{env:HATCH_TEST_ARGS:} --cov --cov-config=pyproject.toml {args} --ignore=tests/multiagent/a2a"
]

cov-combine = []
Expand Down Expand Up @@ -181,6 +191,9 @@ prepare = [
"hatch fmt --formatter",
"hatch test --all"
]
test-a2a = [
"hatch -e a2a run run {args}"
]

[tool.mypy]
python_version = "3.10"
Expand Down
9 changes: 9 additions & 0 deletions src/strands/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ def __init__(
record_direct_tool_call: bool = True,
load_tools_from_directory: bool = True,
trace_attributes: Optional[Mapping[str, AttributeValue]] = None,
*,
name: str | None = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we switch to Optional[str] for consistency elsewhere? (I like this syntax more TBH, but we use Optional everywhere else - issue to switch the library over maybe?)

description: str | None = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description has this example:

# Log that we're starting
logging.info("Starting A2A server with root logger")

strands_agent = Agent(model="us.anthropic.claude-3-haiku-20240307-v1:0", callback_handler=None)
strands_a2a_agent = A2AAgent(agent=strands_agent, name="Hello World Agent", description="Just a hello world agent")
strands_a2a_agent.serve()

But per this code, the name/description are now part of Agent correct? Could the PR description be updated?

):
"""Initialize the Agent with the specified configuration.

Expand Down Expand Up @@ -248,6 +251,10 @@ def __init__(
load_tools_from_directory: Whether to load and automatically reload tools in the `./tools/` directory.
Defaults to True.
trace_attributes: Custom trace attributes to apply to the agent's trace span.
name: name of the Agent
Defaults to None.
description: description of what the Agent does
Defaults to None.

Raises:
ValueError: If max_parallel_tools is less than 1.
Expand Down Expand Up @@ -310,6 +317,8 @@ def __init__(
self.trace_span: Optional[trace.Span] = None

self.tool_caller = Agent.ToolCaller(self)
self.name = name
self.description = description

@property
def tool(self) -> ToolCaller:
Expand Down
13 changes: 13 additions & 0 deletions src/strands/multiagent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Multiagent capabilities for Strands Agents.

This module provides support for multiagent systems, including agent-to-agent (A2A)
communication protocols and coordination mechanisms.

Submodules:
a2a: Implementation of the Agent-to-Agent (A2A) protocol, which enables
standardized communication between agents.
"""

from . import a2a

__all__ = ["a2a"]
14 changes: 14 additions & 0 deletions src/strands/multiagent/a2a/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""Agent-to-Agent (A2A) communication protocol implementation for Strands Agents.

This module provides classes and utilities for enabling Strands Agents to communicate
with other agents using the Agent-to-Agent (A2A) protocol.

Docs: https://google-a2a.github.io/A2A/latest/

Classes:
A2AAgent: A wrapper that adapts a Strands Agent to be A2A-compatible.
"""

from .agent import A2AAgent

__all__ = ["A2AAgent"]
148 changes: 148 additions & 0 deletions src/strands/multiagent/a2a/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""A2A-compatible wrapper for Strands Agent.

This module provides the A2AAgent class, which adapts a Strands Agent to the A2A protocol,
allowing it to be used in A2A-compatible systems.
"""

import logging
from typing import Any, Literal

import uvicorn
from a2a.server.apps import A2AFastAPIApplication, A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from fastapi import FastAPI
from starlette.applications import Starlette

from ...agent.agent import Agent as SAAgent
from .executor import StrandsA2AExecutor

log = logging.getLogger(__name__)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit -> I think our convention is logger =



class A2AAgent:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we also wanted the concept of an A2A agent that Strands can talk to like a StrandsAgent - what were we thinking with naming there?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As is, this seems more like a A2AAgentAdapter or something. A2AAgent makes me think it's a native A2AAgent that I can talk to like an Agent

"""A2A-compatible wrapper for Strands Agent."""

def __init__(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anything we can do to mark this as experiemental/in-development? Maybe log an info message in the constructor or something?

self,
agent: SAAgent,
*,
# AgentCard
host: str = "0.0.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have a 4th zero or is this not an ip?

port: int = 9000,
version: str = "0.0.1",
):
"""Initialize an A2A-compatible agent from a Strands agent.

Args:
agent: The Strands Agent to wrap with A2A compatibility.
name: The name of the agent, used in the AgentCard.
description: A description of the agent's capabilities, used in the AgentCard.
host: The hostname or IP address to bind the A2A server to. Defaults to "localhost".
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now it hosts to "0.0.0" actually

port: The port to bind the A2A server to. Defaults to 9000.
version: The version of the agent. Defaults to "0.0.1".
"""
self.host = host
self.port = port
self.http_url = f"http://{self.host}:{self.port}/"
self.version = version
self.strands_agent = agent
self.name = self.strands_agent.name
self.description = self.strands_agent.description
# TODO: enable configurable capabilities and request handler
self.capabilities = AgentCapabilities()
self.request_handler = DefaultRequestHandler(
agent_executor=StrandsA2AExecutor(self.strands_agent),
task_store=InMemoryTaskStore(),
)

@property
def public_agent_card(self) -> AgentCard:
"""Get the public AgentCard for this agent.

The AgentCard contains metadata about the agent, including its name,
description, URL, version, skills, and capabilities. This information
is used by other agents and systems to discover and interact with this agent.

Returns:
AgentCard: The public agent card containing metadata about this agent.

Raises:
ValueError: If name or description is None or empty.
"""
if not self.name:
raise ValueError("A2A agent name cannot be None or empty")
if not self.description:
raise ValueError("A2A agent description cannot be None or empty")

return AgentCard(
name=self.name,
description=self.description,
url=self.http_url,
version=self.version,
skills=self.agent_skills,
defaultInputModes=["text"],
defaultOutputModes=["text"],
capabilities=self.capabilities,
)

@property
def agent_skills(self) -> list[AgentSkill]:
"""Get the list of skills this agent provides.

Skills represent specific capabilities that the agent can perform.
Strands agent tools are adapted to A2A skills.

Returns:
list[AgentSkill]: A list of skills this agent provides.
"""
# TODO: translate Strands tools (native & MCP) to skills
return []

def to_starlette_app(self) -> Starlette:
"""Create a Starlette application for serving this agent via HTTP.

This method creates a Starlette application that can be used to serve
the agent via HTTP using the A2A protocol.

Returns:
Starlette: A Starlette application configured to serve this agent.
"""
return A2AStarletteApplication(agent_card=self.public_agent_card, http_handler=self.request_handler).build()

def to_fastapi_app(self) -> FastAPI:
"""Create a FastAPI application for serving this agent via HTTP.

This method creates a FastAPI application that can be used to serve
the agent via HTTP using the A2A protocol.

Returns:
FastAPI: A FastAPI application configured to serve this agent.
"""
return A2AFastAPIApplication(agent_card=self.public_agent_card, http_handler=self.request_handler).build()

def serve(self, app_type: Literal["fastapi", "starlette"] = "starlette", **kwargs: Any) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required by A2A or is this our thing?

Now that we have this concept, I sort of want it for Strands natively - or does that not make sense now that A2A exists?

"""Start the A2A server with the specified application type.

This method starts an HTTP server that exposes the agent via the A2A protocol.
The server can be implemented using either FastAPI or Starlette, depending on
the specified app_type.

Args:
app_type: The type of application to serve, either "fastapi" or "starlette".
Defaults to "starlette".
**kwargs: Additional keyword arguments to pass to uvicorn.run.
"""
try:
log.info("Starting Strands A2A server...")
if app_type == "fastapi":
uvicorn.run(self.to_fastapi_app(), host=self.host, port=self.port, **kwargs)
else:
uvicorn.run(self.to_starlette_app(), host=self.host, port=self.port, **kwargs)
except KeyboardInterrupt:
log.warning("Strands A2A server shutdown requested (KeyboardInterrupt).")
except Exception:
log.exception("Strands A2A server encountered exception.")
finally:
log.info("Strands A2A server has shutdown.")
67 changes: 67 additions & 0 deletions src/strands/multiagent/a2a/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Strands Agent executor for the A2A protocol.

This module provides the StrandsA2AExecutor class, which adapts a Strands Agent
to be used as an executor in the A2A protocol. It handles the execution of agent
requests and the conversion of Strands Agent responses to A2A events.
"""

import logging

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import UnsupportedOperationError
from a2a.utils import new_agent_text_message
from a2a.utils.errors import ServerError

from ...agent.agent import Agent as SAAgent
from ...agent.agent_result import AgentResult as SAAgentResult

log = logging.getLogger(__name__)


class StrandsA2AExecutor(AgentExecutor):
"""Executor that adapts a Strands Agent to the A2A protocol."""

def __init__(self, agent: SAAgent):
"""Initialize a StrandsA2AExecutor.

Args:
agent: The Strands Agent to adapt to the A2A protocol.
"""
self.agent = agent

async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
"""Execute a request using the Strands Agent and send the response as A2A events.

This method executes the user's input using the Strands Agent and converts
the agent's response to A2A events, which are then sent to the event queue.

Args:
context: The A2A request context, containing the user's input and other metadata.
event_queue: The A2A event queue, used to send response events.
"""
result: SAAgentResult = self.agent(context.get_user_input())
if result.message and "content" in result.message:
for content_block in result.message["content"]:
if "text" in content_block:
await event_queue.enqueue_event(new_agent_text_message(content_block["text"]))

async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this as a use case for #81

"""Cancel an ongoing execution.

This method is called when a request is cancelled. Currently, cancellation
is not supported, so this method raises an UnsupportedOperationError.

Args:
context: The A2A request context.
event_queue: The A2A event queue.

Raises:
ServerError: Always raised with an UnsupportedOperationError, as cancellation
is not currently supported.
"""
raise ServerError(error=UnsupportedOperationError())
1 change: 1 addition & 0 deletions tests/multiagent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Tests for the multiagent module."""
1 change: 1 addition & 0 deletions tests/multiagent/a2a/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Tests for the A2A module."""
41 changes: 41 additions & 0 deletions tests/multiagent/a2a/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Common fixtures for A2A module tests."""

from unittest.mock import AsyncMock, MagicMock

import pytest
from a2a.server.agent_execution import RequestContext
from a2a.server.events import EventQueue

from strands.agent.agent import Agent as SAAgent
from strands.agent.agent_result import AgentResult as SAAgentResult


@pytest.fixture
def mock_strands_agent():
"""Create a mock Strands Agent for testing."""
agent = MagicMock(spec=SAAgent)
agent.name = "Test Agent"
agent.description = "A test agent for unit testing"

# Setup default response
mock_result = MagicMock(spec=SAAgentResult)
mock_result.message = {"content": [{"text": "Test response"}]}
agent.return_value = mock_result

return agent


@pytest.fixture
def mock_request_context():
"""Create a mock RequestContext for testing."""
context = MagicMock(spec=RequestContext)
context.get_user_input.return_value = "Test input"
return context


@pytest.fixture
def mock_event_queue():
"""Create a mock EventQueue for testing."""
queue = MagicMock(spec=EventQueue)
queue.enqueue_event = AsyncMock()
return queue
Loading