Skip to content

a2a streaming #366

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 8, 2025
Merged

a2a streaming #366

merged 4 commits into from
Jul 8, 2025

Conversation

jer96
Copy link
Member

@jer96 jer96 commented Jul 7, 2025

Description

  • Implemented streaming support for A2AServer.
    • Clients can now use the A2AServer in streaming and non-streaming mode. The underlying executor supports both modes of execution and uses the stream_async and invoke_async functions accordingly.
  • The A2AServer also supports Tasks where the request status is incrementally updated via the Task interface. This works gracefully in either streaming or synchronous mode. In streaming mode updates are sent via SSE, in sync mode an A2A response payload is returned with the task once it is complete.

Related Issues

Documentation PR

  • N/A coming soon

Type of Change

New feature

Testing

How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli

  • I ran hatch run prepare

Sample Code

A2A server with in sync or streaming mode

import logging
import sys

from strands import Agent
from strands.multiagent.a2a import A2AServer

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)],
    force=True,
)

# Log that we're starting
logging.info("Starting A2A server with root logger")

strands_agent = Agent(
    model="us.anthropic.claude-3-haiku-20240307-v1:0",
    name="Hello World Agent",
    description="Just a hello world agent",
    callback_handler=None
)

# uncomment to enable streaming
# strands_a2a_agent = A2AServer(agent=strands_agent, streaming=True)

# sync enabled
strands_a2a_agent = A2AServer(agent=strands_agent)

strands_a2a_agent.serve()

A2A client script in sync mode

import asyncio
import json
import logging
from typing import Any
from uuid import uuid4

import httpx
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import (
    AgentCard,
    MessageSendParams,
    SendMessageRequest,
    SendStreamingMessageRequest,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
PUBLIC_AGENT_CARD_PATH = "/.well-known/agent.json"
BASE_URL = "http://localhost:9000"


async def main() -> None:
    async with httpx.AsyncClient() as httpx_client:
        # Initialize A2ACardResolver
        resolver = A2ACardResolver(
            httpx_client=httpx_client,
            base_url=BASE_URL,
        )

        # Fetch Public Agent Card and Initialize Client
        agent_card: AgentCard | None = None

        try:
            logger.info("Attempting to fetch public agent card from: {} {}", BASE_URL, PUBLIC_AGENT_CARD_PATH)
            agent_card = await resolver.get_agent_card()  # Fetches from default public path
            logger.info("Successfully fetched public agent card:")
            logger.info(agent_card.model_dump_json(indent=2, exclude_none=True))
        except Exception as e:
            logger.exception("Critical error fetching public agent card")
            raise RuntimeError("Failed to fetch the public agent card. Cannot continue.") from e

        client = A2AClient(httpx_client=httpx_client, agent_card=agent_card)
        logger.info("A2AClient initialized.")

        send_message_payload: dict[str, Any] = {
            "message": {
                "role": "user",
                "parts": [{"kind": "text", "text": "how much is 10 USD in INR?"}],
                "messageId": uuid4().hex,
            },
        }
        request = SendMessageRequest(id=str(uuid4()), params=MessageSendParams(**send_message_payload))
        response = await client.send_message(request)
		print(response)

asyncio.run(main())

Sample output

... <omitted previous streamed events> ...
{
  "id": "dceae647-c661-4203-a99b-1ce2b28f8804",
  "jsonrpc": "2.0",
  "result": {
    "contextId": "0f99d6c9-2792-4277-a00f-ef07c4ab6b27",
    "final": false,
    "kind": "status-update",
    "status": {
      "message": {
        "contextId": "0f99d6c9-2792-4277-a00f-ef07c4ab6b27",
        "kind": "message",
        "messageId": "d10e403e-d103-449f-9a4e-e2e02a7585c0",
        "parts": [
          {
            "kind": "text",
            "text": " conversions."
          }
        ],
        "role": "agent",
        "taskId": "48d715cf-c11e-4c03-90da-22be14bc6e05"
      },
      "state": "working",
      "timestamp": "2025-07-07T15:48:36.118118+00:00"
    },
    "taskId": "48d715cf-c11e-4c03-90da-22be14bc6e05"
  }
}
{
  "id": "dceae647-c661-4203-a99b-1ce2b28f8804",
  "jsonrpc": "2.0",
  "result": {
    "artifact": {
      "artifactId": "9823991a-ae14-41e5-a3bb-d51b3a2d35dc",
      "name": "agent_response",
      "parts": [
        {
          "kind": "text",
          "text": "As of the current exchange rate, 10 USD (United States Dollars) is approximately equivalent to 821.50 INR (Indian Rupees).\n\nThe exact conversion rate can vary slightly depending on the source and the time of the conversion, but the current approximate conversion rate is:\n\n1 USD \u2248 82.15 INR\n\nTherefore, 10 USD \u2248 10 \u00d7 82.15 INR \u2248 821.50 INR.\n\nIt's worth noting that exchange rates can fluctuate over time, so the exact conversion value may change. It's always a good idea to check the current exchange rate when making currency conversions."
        }
      ]
    },
    "contextId": "0f99d6c9-2792-4277-a00f-ef07c4ab6b27",
    "kind": "artifact-update",
    "taskId": "48d715cf-c11e-4c03-90da-22be14bc6e05"
  }
}
{
  "id": "dceae647-c661-4203-a99b-1ce2b28f8804",
  "jsonrpc": "2.0",
  "result": {
    "contextId": "0f99d6c9-2792-4277-a00f-ef07c4ab6b27",
    "final": true,
    "kind": "status-update",
    "status": {
      "state": "completed",
      "timestamp": "2025-07-07T15:48:36.173200+00:00"
    },
    "taskId": "48d715cf-c11e-4c03-90da-22be14bc6e05"
  }
}

A2A client script in streaming mode

Modify the script above and replace the sync request with the following streaming request

request = SendStreamingMessageRequest(id=str(uuid4()), params=MessageSendParams(**send_message_payload))
async for event in client.send_message_streaming(request):
    print(json.dumps(event.model_dump(mode="json", exclude_none=True), indent=2))

Sample output

{
  "id": "2af88a6a-82a7-41a7-95b7-f8817da38bc9",
  "jsonrpc": "2.0",
  "result": {
    "artifacts": [
      {
        "artifactId": "db6f9fbf-afb4-4e97-a782-28c659d5b6e2",
        "name": "agent_response",
        "parts": [
          {
            "kind": "text",
            "text": "Okay, let me re-check the current exchange rate and provide you with the updated conversion of 10 USD to INR.\n\nAs of today's exchange rate, 10 USD is equivalent to approximately 830 INR.\n\nThe current exchange rate is approximately 1 USD = 83 INR.\n\nSo, to calculate 10 USD in INR:\n10 USD x 83 INR/USD = 830 INR\n\nTherefore, 10 USD is equal to approximately 830 Indian Rupees (INR) at the current exchange rate.\n\nPlease note that currency exchange rates fluctuate regularly, so it's always a good idea to check the latest rate when making currency conversions."
          }
        ]
      }
    ],
    "contextId": "e8d7ef9e-ccb9-4224-bbb0-aa21585d6803",
    "history": [
      {
        "contextId": "e8d7ef9e-ccb9-4224-bbb0-aa21585d6803",
        "kind": "message",
        "messageId": "6306e0b7dc3d41af8c5d1dbef56bc0ad",
        "parts": [
          {
            "kind": "text",
            "text": "how much is 10 USD in INR?"
          }
        ],
        "role": "user",
        "taskId": "ce2e498b-365f-48c3-9b32-1a52c71cce65"
      }
    ],
    "id": "ce2e498b-365f-48c3-9b32-1a52c71cce65",
    "kind": "task",
    "status": {
      "state": "completed",
      "timestamp": "2025-07-07T15:46:44.430558+00:00"
    }
  }
}

Checklist

  • I have read the CONTRIBUTING document
  • I have added any necessary tests that prove my fix is effective or my feature works
  • I have updated the documentation accordingly
  • I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@jer96 jer96 temporarily deployed to auto-approve July 7, 2025 15:19 — with GitHub Actions Inactive
@jer96 jer96 temporarily deployed to auto-approve July 7, 2025 15:30 — with GitHub Actions Inactive
@jer96 jer96 requested a review from zastrowm July 7, 2025 15:51
@jer96 jer96 self-assigned this Jul 7, 2025
@jer96 jer96 requested review from pgrayy and poshinchen July 7, 2025 15:52
@jer96 jer96 marked this pull request as ready for review July 7, 2025 16:02
@jer96 jer96 temporarily deployed to auto-approve July 8, 2025 13:35 — with GitHub Actions Inactive
@jer96 jer96 requested a review from zastrowm July 8, 2025 13:38
@jer96 jer96 enabled auto-merge (squash) July 8, 2025 13:40
@jer96 jer96 merged commit fd3752d into strands-agents:main Jul 8, 2025
22 checks passed
jsamuel1 pushed a commit to jsamuel1/sdk-python that referenced this pull request Jul 9, 2025
Co-authored-by: jer <jerebill@amazon.com>
malte-aws pushed a commit to malte-aws/sdk-python that referenced this pull request Jul 10, 2025
Co-authored-by: jer <jerebill@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants