Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7d0a0a2
Add OpenTelemetry GenAI auto-instrumentation for MLflow compatibility
Ladas Jan 30, 2026
81dcf06
Update uv.lock for opentelemetry-instrumentation-openai
Ladas Jan 31, 2026
64ca355
Add A2A context_id propagation to traces for MLflow session tracking
Ladas Feb 3, 2026
f8712b6
Use GenAI semantic convention attributes for MLflow
Ladas Feb 3, 2026
78ba8dc
🌱 Remove OpenInference, use GenAI-only instrumentation
Ladas Feb 3, 2026
56e6270
🌱 Add input/output attributes for MLflow compatibility
Ladas Feb 3, 2026
5446ab0
feat: Break trace chain for MLflow UI column support
Ladas Feb 4, 2026
15af22f
fix: Use INVALID_SPAN to properly create new trace
Ladas Feb 4, 2026
fcc9d66
fix: Use start_span with manual context for new root trace
Ladas Feb 4, 2026
96fc3da
fix: Attach empty OTEL context before creating root span
Ladas Feb 4, 2026
87a0e87
feat: Add observability module for OTEL/OpenInference setup
Ladas Feb 4, 2026
baf3312
refactor: Use observability module in weather agent
Ladas Feb 4, 2026
c1c7c52
fix: Preserve parent chain in agent spans for proper trace tree
Ladas Feb 4, 2026
8f9e366
feat: Add enrich_current_span to modify A2A root span in-place
Ladas Feb 4, 2026
c2f2db8
feat: Add all MLflow attributes directly in agent code
Ladas Feb 4, 2026
1c8b614
fix: Create span if no recording span exists in enrich_current_span
Ladas Feb 4, 2026
652be4d
feat: Add tracing middleware for root span control
Ladas Feb 4, 2026
a881c5e
fix: Break parent chain in middleware to create true root span
Ladas Feb 4, 2026
33a5eac
debug: Add logging to tracing middleware
Ladas Feb 4, 2026
7a892b1
debug: Add print statements to middleware
Ladas Feb 4, 2026
954f1c6
chore: Remove debug logging from tracing middleware
Ladas Feb 4, 2026
1cba214
feat: Add missing MLflow trace metadata attributes
Ladas Feb 4, 2026
6eabd0e
fix: Always recreate response after consuming body iterator
Ladas Feb 4, 2026
db74114
fix: Call set_span_output for streaming response capture
Ladas Feb 4, 2026
5f4b7ff
fix: Use ContextVar to pass root span for streaming output
Ladas Feb 5, 2026
6cb1100
fix: Use correct GenAI span naming convention
Ladas Feb 5, 2026
7249809
fix: Use Docker Hub base image instead of GHCR (auth issues)
Ladas Feb 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion a2a/weather_service/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim
FROM python:3.12-slim-bookworm
ARG RELEASE_VERSION="main"

# Install uv
RUN pip install --no-cache-dir uv

WORKDIR /app
COPY . .
RUN uv sync --no-cache --locked --link-mode copy
Expand Down
7 changes: 6 additions & 1 deletion a2a/weather_service/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@ dependencies = [
"langchain-community>=0.3.9",
"langchain-ollama>=0.2.1",
"langchain-openai>=0.3.7",
"openinference-instrumentation-langchain>=0.1.36",
"pydantic-settings>=2.8.1",
"langchain-mcp-adapters>=0.1.0",
"python-keycloak>=5.5.1",
"opentelemetry-exporter-otlp",
# OpenTelemetry GenAI semantic convention instrumentation
# Emits spans with gen_ai.* attributes for MLflow compatibility
"opentelemetry-instrumentation-openai>=0.34b0",
# OpenInference for LangChain instrumentation and AGENT span semantics
"openinference-semantic-conventions>=0.1.12",
"openinference-instrumentation-langchain>=0.1.27",
]

[project.scripts]
Expand Down
18 changes: 4 additions & 14 deletions a2a/weather_service/src/weather_service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
from opentelemetry.sdk.resources import Resource
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
"""Weather Service - OpenTelemetry Observability Setup"""

def setup_tracer():
resource = Resource.create(attributes={
"service.name": "a2a-server",
})
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(OTLPSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
from weather_service.observability import setup_observability

setup_tracer()
# Initialize observability before importing agent
setup_observability()
89 changes: 50 additions & 39 deletions a2a/weather_service/src/weather_service/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@
from a2a.server.tasks import InMemoryTaskStore, TaskUpdater
from a2a.types import AgentCapabilities, AgentCard, AgentSkill, TaskState, TextPart
from a2a.utils import new_agent_text_message, new_task
from openinference.instrumentation.langchain import LangChainInstrumentor
from langchain_core.messages import HumanMessage

from starlette.middleware.base import BaseHTTPMiddleware

from weather_service.graph import get_graph, get_mcpclient
from weather_service.observability import create_tracing_middleware, set_span_output, get_root_span

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

LangChainInstrumentor().instrument()


def get_agent_card(host: str, port: int):
"""Returns the Agent Card for the AG2 Agent."""
Expand Down Expand Up @@ -104,46 +104,55 @@ async def execute(self, context: RequestContext, event_queue: EventQueue):
task_updater = TaskUpdater(event_queue, task.id, task.context_id)
event_emitter = A2AEvent(task_updater)

# Get user input for the agent
user_input = context.get_user_input()

# Parse Messages
messages = [HumanMessage(content=context.get_user_input())]
messages = [HumanMessage(content=user_input)]
input = {"messages": messages}
logger.info(f'Processing messages: {input}')

task_updater = TaskUpdater(event_queue, task.id, task.context_id)
# Note: Root span with MLflow attributes is created by tracing middleware
# Here we just run the agent logic - spans from LangChain are auto-captured
output = None

# Test MCP connection first
logger.info(f'Attempting to connect to MCP server at: {os.getenv("MCP_URL", "http://localhost:8000/sse")}')

mcpclient = get_mcpclient()

# Try to get tools to verify connection
try:
output = None
# Test MCP connection first
logger.info(f'Attempting to connect to MCP server at: {os.getenv("MCP_URL", "http://localhost:8000/sse")}')

mcpclient = get_mcpclient()

# Try to get tools to verify connection
try:
tools = await mcpclient.get_tools()
logger.info(f'Successfully connected to MCP server. Available tools: {[tool.name for tool in tools]}')
except Exception as tool_error:
logger.error(f'Failed to connect to MCP server: {tool_error}')
await event_emitter.emit_event(f"Error: Cannot connect to MCP weather service at {os.getenv('MCP_URL', 'http://localhost:8000/sse')}. Please ensure the weather MCP server is running. Error: {tool_error}", failed=True)
return

graph = await get_graph(mcpclient)
async for event in graph.astream(input, stream_mode="updates"):
await event_emitter.emit_event(
"\n".join(
f"🚶‍♂️{key}: {str(value)[:256] + '...' if len(str(value)) > 256 else str(value)}"
for key, value in event.items()
)
+ "\n"
tools = await mcpclient.get_tools()
logger.info(f'Successfully connected to MCP server. Available tools: {[tool.name for tool in tools]}')
except Exception as tool_error:
logger.error(f'Failed to connect to MCP server: {tool_error}')
await event_emitter.emit_event(f"Error: Cannot connect to MCP weather service at {os.getenv('MCP_URL', 'http://localhost:8000/sse')}. Please ensure the weather MCP server is running. Error: {tool_error}", failed=True)
return

graph = await get_graph(mcpclient)
async for event in graph.astream(input, stream_mode="updates"):
await event_emitter.emit_event(
"\n".join(
f"🚶‍♂️{key}: {str(value)[:256] + '...' if len(str(value)) > 256 else str(value)}"
for key, value in event.items()
)
output = event
logger.info(f'event: {event}')
output = output.get("assistant", {}).get("final_answer")
await event_emitter.emit_event(str(output), final=True)
except Exception as e:
logger.error(f'Graph execution error: {e}')
await event_emitter.emit_event(f"Error: Failed to process weather request. {str(e)}", failed=True)
raise Exception(str(e))
+ "\n"
)
output = event
logger.info(f'event: {event}')
output = output.get("assistant", {}).get("final_answer")

# Set span output BEFORE emitting final event (for streaming response capture)
# This populates mlflow.spanOutputs, output.value, gen_ai.completion
# Use get_root_span() to get the middleware-created root span, not the
# current A2A span (trace.get_current_span() would return wrong span)
if output:
root_span = get_root_span()
if root_span and root_span.is_recording():
set_span_output(root_span, str(output))

await event_emitter.emit_event(str(output), final=True)

async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""
Expand All @@ -166,7 +175,7 @@ def run():
agent_card=agent_card,
http_handler=request_handler,
)

# Build the Starlette app
app = server.build()

Expand All @@ -178,8 +187,10 @@ def run():
name='agent_card_new',
))

# Add middleware to log all incoming requests with headers

# Add tracing middleware - creates root span with MLflow/GenAI attributes
app.add_middleware(BaseHTTPMiddleware, dispatch=create_tracing_middleware())

# Add logging middleware
@app.middleware("http")
async def log_authorization_header(request, call_next):
auth_header = request.headers.get("authorization", "No Authorization header")
Expand Down
Loading