Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion a2a/weather_service/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@ dependencies = [
"pydantic-settings>=2.8.1",
"langchain-mcp-adapters>=0.1.0",
"python-keycloak>=5.5.1",
"opentelemetry-exporter-otlp",
# OpenTelemetry Core
"opentelemetry-api>=1.20.0",
"opentelemetry-sdk>=1.20.0",
"opentelemetry-exporter-otlp>=1.20.0",
"opentelemetry-exporter-otlp-proto-grpc>=1.20.0",
"opentelemetry-instrumentation>=0.41b0",
# OpenInference Semantic Conventions
"openinference-semantic-conventions>=0.1.0",
]

[project.scripts]
Expand Down
24 changes: 10 additions & 14 deletions a2a/weather_service/src/weather_service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
from opentelemetry.sdk.resources import Resource
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
"""
Weather Service - OpenTelemetry Observability Setup

def setup_tracer():
resource = Resource.create(attributes={
"service.name": "a2a-server",
})
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(OTLPSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
This module initializes OpenTelemetry tracing with OpenInference instrumentation
for automatic LLM observability in Phoenix.
"""

setup_tracer()
from weather_service.observability import setup_observability

# Set up OpenTelemetry tracing with OpenInference
# This must run before importing agent code to ensure instrumentation is active
setup_observability()
188 changes: 143 additions & 45 deletions a2a/weather_service/src/weather_service/agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import uvicorn
import uuid
from textwrap import dedent

from a2a.server.agent_execution import AgentExecutor, RequestContext
Expand All @@ -10,16 +11,27 @@
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import AgentCapabilities, AgentCard, AgentSkill, TaskState, TextPart
from a2a.utils import new_agent_text_message, new_task
from openinference.instrumentation.langchain import LangChainInstrumentor
from langchain_core.messages import HumanMessage

from weather_service.graph import get_graph, get_mcpclient
from weather_service.observability import (
set_baggage_context,
extract_baggage_from_headers,
log_trace_info,
create_agent_span,
trace_context_from_headers,
)

# Import OpenInference context manager for adding attributes to all spans
try:
from openinference.instrumentation import using_attributes
except ImportError:
# Fallback if openinference-instrumentation not installed
from contextlib import nullcontext as using_attributes

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

LangChainInstrumentor().instrument()


def get_agent_card(host: str, port: int):
"""Returns the Agent Card for the AG2 Agent."""
Expand Down Expand Up @@ -95,54 +107,140 @@ async def execute(self, context: RequestContext, event_queue: EventQueue):
The agent allows to retrieve weather info through a natural language conversational interface
"""

# Setup Event Emitter
task = context.current_task
if not task:
task = new_task(context.message) # type: ignore
await event_queue.enqueue_event(task)
task_updater = TaskUpdater(event_queue, task.id, task.context_id)
event_emitter = A2AEvent(task_updater)
# ============================================================
# OTEL TRACE CONTEXT AND BAGGAGE PROPAGATION
# Extract trace context (traceparent) and baggage from HTTP headers
# This enables proper parent-child span relationships across services
# ============================================================

# Extract headers from request context (if available)
# A2A SDK may expose headers differently, so we handle both cases
headers = {}
if hasattr(context, 'headers'):
headers = dict(context.headers)
elif hasattr(context, 'message') and hasattr(context.message, 'headers'):
headers = dict(context.message.headers) if context.message.headers else {}

# Log incoming trace context for debugging
traceparent = headers.get('traceparent', headers.get('Traceparent', 'none'))
logger.info(f"🔗 Incoming traceparent: {traceparent}")

# Extract baggage from headers
baggage_data = extract_baggage_from_headers(headers)

# If no request_id in headers, generate one
if 'request_id' not in baggage_data:
baggage_data['request_id'] = f"req-{uuid.uuid4()}"

# If no user_id in headers, use anonymous
if 'user_id' not in baggage_data:
baggage_data['user_id'] = "anonymous"

# Add task/context IDs from A2A if available
if context.current_task:
baggage_data['task_id'] = context.current_task.id
baggage_data['context_id'] = context.current_task.context_id

logger.info(f"🔍 Baggage context: {baggage_data}")

# ============================================================
# WRAP ENTIRE EXECUTION WITH TRACE CONTEXT
# This ensures all spans created are children of incoming trace
# ============================================================
with trace_context_from_headers(headers):
# Set baggage context within the trace context
set_baggage_context(baggage_data)

# Parse Messages
messages = [HumanMessage(content=context.get_user_input())]
input = {"messages": messages}
logger.info(f'Processing messages: {input}')
# Log trace info for debugging
log_trace_info()

task_updater = TaskUpdater(event_queue, task.id, task.context_id)
# ============================================================
# Setup Event Emitter
# ============================================================
task = context.current_task
if not task:
task = new_task(context.message) # type: ignore
await event_queue.enqueue_event(task)
task_updater = TaskUpdater(event_queue, task.id, task.context_id)
event_emitter = A2AEvent(task_updater)

try:
output = None
# Test MCP connection first
logger.info(f'Attempting to connect to MCP server at: {os.getenv("MCP_URL", "http://localhost:8000/sse")}')
# Parse Messages
messages = [HumanMessage(content=context.get_user_input())]
input = {"messages": messages}
logger.info(f'Processing messages: {input}')

mcpclient = get_mcpclient()
task_updater = TaskUpdater(event_queue, task.id, task.context_id)

# Try to get tools to verify connection
try:
tools = await mcpclient.get_tools()
logger.info(f'Successfully connected to MCP server. Available tools: {[tool.name for tool in tools]}')
except Exception as tool_error:
logger.error(f'Failed to connect to MCP server: {tool_error}')
await event_emitter.emit_event(f"Error: Cannot connect to MCP weather service at {os.getenv('MCP_URL', 'http://localhost:8000/sse')}. Please ensure the weather MCP server is running. Error: {tool_error}", failed=True)
return

graph = await get_graph(mcpclient)
async for event in graph.astream(input, stream_mode="updates"):
await event_emitter.emit_event(
"\n".join(
f"🚶‍♂️{key}: {str(value)[:100] + '...' if len(str(value)) > 100 else str(value)}"
for key, value in event.items()
)
+ "\n"
)
output = event
logger.info(f'event: {event}')
output = output.get("assistant", {}).get("final_answer")
await event_emitter.emit_event(str(output), final=True)
except Exception as e:
logger.error(f'Graph execution error: {e}')
await event_emitter.emit_event(f"Error: Failed to process weather request. {str(e)}", failed=True)
raise Exception(str(e))
output = None
# Test MCP connection first
logger.info(f'Attempting to connect to MCP server at: {os.getenv("MCP_URL", "http://localhost:8000/sse")}')

mcpclient = get_mcpclient()

# Try to get tools to verify connection
try:
tools = await mcpclient.get_tools()
logger.info(f'Successfully connected to MCP server. Available tools: {[tool.name for tool in tools]}')
except Exception as tool_error:
logger.error(f'Failed to connect to MCP server: {tool_error}')
await event_emitter.emit_event(f"Error: Cannot connect to MCP weather service at {os.getenv('MCP_URL', 'http://localhost:8000/sse')}. Please ensure the weather MCP server is running. Error: {tool_error}", failed=True)
return

graph = await get_graph(mcpclient)

# ============================================================
# OPENINFERENCE TRACE CONTEXT
# Wrap graph execution with using_attributes and create_agent_span
# This ensures all LangChain spans have session.id, user.id, etc.
# and are nested under a root AGENT span for easy filtering
# ============================================================
user_input = context.get_user_input()

# Prepare OpenInference attributes
oi_session_id = task.context_id if task else baggage_data.get('context_id')
oi_user_id = baggage_data.get('user_id', 'anonymous')
oi_metadata = {
"task_id": task.id if task else baggage_data.get('task_id'),
"request_id": baggage_data.get('request_id'),
}

# using_attributes adds session.id, user.id to ALL spans in scope
# create_agent_span creates a root AGENT span for the conversation
with using_attributes(
session_id=oi_session_id,
user_id=oi_user_id,
metadata=oi_metadata,
):
with create_agent_span(
name="weather_agent_task",
task_id=task.id if task else None,
context_id=task.context_id if task else None,
user_id=oi_user_id,
input_text=user_input,
) as agent_span:
async for event in graph.astream(input, stream_mode="updates"):
await event_emitter.emit_event(
"\n".join(
f"🚶‍♂️{key}: {str(value)[:100] + '...' if len(str(value)) > 100 else str(value)}"
for key, value in event.items()
)
+ "\n"
)
output = event
logger.info(f'event: {event}')

output = output.get("assistant", {}).get("final_answer")

# Set output on the agent span
if output:
agent_span.set_attribute("output.value", str(output))

await event_emitter.emit_event(str(output), final=True)
except Exception as e:
logger.error(f'Graph execution error: {e}')
await event_emitter.emit_event(f"Error: Failed to process weather request. {str(e)}", failed=True)
raise Exception(str(e))

async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""
Expand Down
Loading