Skip to content

Initial A2A Integration #218

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open

Initial A2A Integration #218

wants to merge 22 commits into from

Conversation

jer96
Copy link
Contributor

@jer96 jer96 commented Jun 13, 2025

Description

This PR seeks to integrate Google's A2A protocol into Strands agents

Related Issues

Documentation PR

N/A

Type of Change

  • New feature

Limitations and Next Steps

  • This is the most basic integration with A2A. A Strands agent is wrapped in an A2AAgentClass which currently only supports messages.
  • Currently there is a dependency conflict with opentelemetry-exporter-otlp-proto-http and a2a-sdk dependencies. a2a-sdk relies on protobuf>=6.3.1, but opentelemetry-exporter-otlp-proto-http requires <6.0. This warranted the dependency changes to pyproject.toml and implementation changes to tracer.py. I plan on documenting the conflict and submitting issues to both the opentelemetry-exporter-otlp-proto-http and the a2a-sdk repo.
  • Unit tests for the A2A integration are currently not run because of the dependency conflict mentioned above. I will investigate how to run unit tests for two separate environments in the interim.
  • Native Strands tools and MCP tool integration
  • Resolve dependency and build papercuts
  • Documentation

Testing

  • unit tests
  • manual testing

sample server: a2a_server.py

import logging
import sys

from strands import Agent
from strands.multiagent.a2a import A2AAgent

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)],
    force=True,
)

# Log that we're starting
logging.info("Starting A2A server with root logger")

strands_agent = Agent(model="us.anthropic.claude-3-haiku-20240307-v1:0", callback_handler=None)
strands_a2a_agent = A2AAgent(agent=strands_agent, name="Hello World Agent", description="Just a hello world agent")
strands_a2a_agent.serve()

sample client a2a_client.py

import logging
from typing import Any
from uuid import uuid4

import httpx
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import (
    AgentCard,
    MessageSendParams,
    SendMessageRequest,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
PUBLIC_AGENT_CARD_PATH = "/.well-known/agent.json"
BASE_URL = "http://localhost:9000"


async def main() -> None:
    async with httpx.AsyncClient() as httpx_client:
        # Initialize A2ACardResolver
        resolver = A2ACardResolver(
            httpx_client=httpx_client,
            base_url=BASE_URL,
        )

        # Fetch Public Agent Card and Initialize Client
        agent_card: AgentCard | None = None

        try:
            logger.info("Attempting to fetch public agent card from: {} {}", BASE_URL, PUBLIC_AGENT_CARD_PATH)
            agent_card = await resolver.get_agent_card()  # Fetches from default public path
            logger.info("Successfully fetched public agent card:")
            logger.info(agent_card.model_dump_json(indent=2, exclude_none=True))
        except Exception as e:
            logger.exception("Critical error fetching public agent card")
            raise RuntimeError("Failed to fetch the public agent card. Cannot continue.") from e

        client = A2AClient(httpx_client=httpx_client, agent_card=agent_card)
        logger.info("A2AClient initialized.")

        send_message_payload: dict[str, Any] = {
            "message": {
                "role": "user",
                "parts": [{"kind": "text", "text": "how much is 10 USD in INR?"}],
                "messageId": uuid4().hex,
            },
        }
        request = SendMessageRequest(id=str(uuid4()), params=MessageSendParams(**send_message_payload))

        response = await client.send_message(request)
        print(response.model_dump(mode="json", exclude_none=True))


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

Run python a2a_server.py and python a2a_client.py in separate terminal panes and observe output.

Standard Testing

  • hatch fmt --linter
  • hatch fmt --formatter
  • hatch test --all
  • Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli

Checklist

  • I have read the CONTRIBUTING document
  • I have added tests that prove my fix is effective or my feature works
  • [] I have updated the documentation accordingly
  • [] I have added an appropriate example to the documentation to outline the feature
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@mikegc-aws
Copy link

Thanks for this. Let's get the changes to pyproject.toml that give the feature options for otel in it's own PR so we can get this deployed ASAP and unblock A2A. Then follow up with the implementation stuff? Thoughts?

@jer96
Copy link
Contributor Author

jer96 commented Jun 16, 2025

@mikegc-aws yes great callout, i'll extract that into a separate PR and ship that first.

@jer96 jer96 temporarily deployed to auto-approve June 18, 2025 16:24 — with GitHub Actions Inactive
@jer96 jer96 temporarily deployed to auto-approve June 18, 2025 16:32 — with GitHub Actions Inactive
@jer96 jer96 temporarily deployed to auto-approve June 18, 2025 16:40 — with GitHub Actions Inactive
@zastrowm zastrowm marked this pull request as draft June 18, 2025 16:43
@jer96 jer96 temporarily deployed to auto-approve June 18, 2025 16:49 — with GitHub Actions Inactive
@jer96 jer96 temporarily deployed to auto-approve June 18, 2025 16:51 — with GitHub Actions Inactive
@jer96 jer96 temporarily deployed to auto-approve June 18, 2025 16:53 — with GitHub Actions Inactive
@jer96 jer96 temporarily deployed to auto-approve June 18, 2025 16:58 — with GitHub Actions Inactive
@jer96 jer96 marked this pull request as ready for review June 18, 2025 17:11
@jer96 jer96 changed the title [WIP] Initial A2A Integration Initial A2A Integration Jun 18, 2025
@@ -108,7 +108,7 @@ format-fix = [
]
lint-check = [
"ruff check",
"mypy -p src"
"mypy -p src --exclude src/strands/multiagent"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥺 What's this needed for?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, is this because of the dependency conflict stuff? If so, can you add comments throughout the file indicating that?

Also, do we have a tracking issue for the dependency issue. I think the long-term fix is to remove the http exporter dependency from strands?

@@ -216,6 +216,9 @@ def __init__(
record_direct_tool_call: bool = True,
load_tools_from_directory: bool = True,
trace_attributes: Optional[Mapping[str, AttributeValue]] = None,
*,
name: str | None = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we switch to Optional[str] for consistency elsewhere? (I like this syntax more TBH, but we use Optional everywhere else - issue to switch the library over maybe?)

from ...agent.agent import Agent as SAAgent
from .executor import StrandsA2AExecutor

log = logging.getLogger(__name__)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit -> I think our convention is logger =

log = logging.getLogger(__name__)


class A2AAgent:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we also wanted the concept of an A2A agent that Strands can talk to like a StrandsAgent - what were we thinking with naming there?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As is, this seems more like a A2AAgentAdapter or something. A2AAgent makes me think it's a native A2AAgent that I can talk to like an Agent

agent: SAAgent,
*,
# AgentCard
host: str = "0.0.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have a 4th zero or is this not an ip?

agent: The Strands Agent to wrap with A2A compatibility.
name: The name of the agent, used in the AgentCard.
description: A description of the agent's capabilities, used in the AgentCard.
host: The hostname or IP address to bind the A2A server to. Defaults to "localhost".
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now it hosts to "0.0.0" actually

class A2AAgent:
"""A2A-compatible wrapper for Strands Agent."""

def __init__(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anything we can do to mark this as experiemental/in-development? Maybe log an info message in the constructor or something?

"""
return A2AFastAPIApplication(agent_card=self.public_agent_card, http_handler=self.request_handler).build()

def serve(self, app_type: Literal["fastapi", "starlette"] = "starlette", **kwargs: Any) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required by A2A or is this our thing?

Now that we have this concept, I sort of want it for Strands natively - or does that not make sense now that A2A exists?

if "text" in content_block:
await event_queue.enqueue_event(new_agent_text_message(content_block["text"]))

async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this as a use case for #81

@@ -216,6 +216,9 @@ def __init__(
record_direct_tool_call: bool = True,
load_tools_from_directory: bool = True,
trace_attributes: Optional[Mapping[str, AttributeValue]] = None,
*,
name: str | None = None,
description: str | None = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description has this example:

# Log that we're starting
logging.info("Starting A2A server with root logger")

strands_agent = Agent(model="us.anthropic.claude-3-haiku-20240307-v1:0", callback_handler=None)
strands_a2a_agent = A2AAgent(agent=strands_agent, name="Hello World Agent", description="Just a hello world agent")
strands_a2a_agent.serve()

But per this code, the name/description are now part of Agent correct? Could the PR description be updated?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants