Skip to content

🐛 Bug Report: Trace graph is incorrectly stitched for Langgraph agents. #3203

@yash025

Description

@yash025

Which component is this bug for?

Langchain Instrumentation

📜 Description

Trace graph is incorrect, the span created inside the nodes are not listed under the node spans, they list under parent/root span.

👟 Reproduction steps

Run the project, with opentelmetry-instrumentation, below is the command which i'm using to run the code

opentelemetry-instrument
--traces_exporter otlp
--metrics_exporter none
--service_name test_service
--exporter_otlp_endpoint http://localhost:4317
--exporter_otlp_insecure true
poetry run python test_agent.py

👍 Expected behavior

Ideally the spans which got created inside node should come under the spans which are created from the traceloop callback handler or langchain spans.

👎 Actual Behavior with Screenshots

Image here the POST span should have come under http_call.task span and test_agent_span should've come under otel_span.task span

🤖 Python Version

3.11

📃 Provide any additional context for the Bug.

below is the python code to reproduce

import asyncio
import json
import logging
from typing import TypedDict

import httpx
from langgraph.graph import END, START, StateGraph
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Set up OpenTelemetry
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

from opentelemetry.instrumentation.langchain import LangchainInstrumentor

LangchainInstrumentor().instrument()



class TestAgentState(TypedDict):
    """Simple state for the test agent"""
    http_result: str
    span_result: str
    messages: list


async def http_call_node(state: TestAgentState) -> dict:
    """
    Node that makes an HTTP call to a sum endpoint
    """
    logger.info("Starting HTTP call node...")
    
    try:
        data = {"a": 10, "b": 25}
        
        async with httpx.AsyncClient() as client:
            response = await client.post(
                "https://httpbin.org/post",
                json=data,
                timeout=10.0
            )
            
            if response.status_code == 200:
                result = response.json()
                posted_data = result.get("json", {})
                sum_result = posted_data.get("a", 0) + posted_data.get("b", 0)
                
                http_result = f"HTTP call successful! Sum of {posted_data.get('a')} + {posted_data.get('b')} = {sum_result}"
                logger.info(http_result)
            else:
                http_result = f"HTTP call failed with status: {response.status_code}"
                logger.error(http_result)
                
    except Exception as e:
        http_result = f"HTTP call error: {str(e)}"
        logger.error(http_result)
    
    return {"http_result": http_result}


async def opentelemetry_span_node(state: TestAgentState) -> dict:
    """
    Node that creates an OpenTelemetry span and doesn't do much else
    """
    logger.info("Starting OpenTelemetry span node...")
    
    with tracer.start_as_current_span("test_agent_span") as span:
        span.set_attribute("node.name", "opentelemetry_span_node")
        span.set_attribute("agent.type", "test_agent")
        span.set_attribute("operation.type", "span_creation")
        
        span.add_event("Starting span processing")
        
        await asyncio.sleep(0.1)
        
        http_result = state.get("http_result", "No HTTP result available")
        span.set_attribute("previous.http_result", http_result)
        
        span.add_event("Processing HTTP result from previous node")
        
        span_result = f"OpenTelemetry span created successfully! Span ID: {span.get_span_context().span_id}"
        
        span.add_event("Span processing completed")
        span.set_attribute("processing.status", "completed")
        
        logger.info(span_result)
    
    return {"span_result": span_result}


def create_test_agent():
    """
    Create a simple LangGraph agent with 2 nodes
    """
    logger.info("Creating test agent...")
    
    builder = StateGraph(TestAgentState)
    
    builder.add_node("http_call", http_call_node)
    builder.add_node("otel_span", opentelemetry_span_node)
    
    builder.add_edge(START, "http_call")
    builder.add_edge("http_call", "otel_span")
    builder.add_edge("otel_span", END)
    
    agent = builder.compile()
    
    logger.info("Test agent created successfully!")
    return agent


async def run_test_agent():
    """
    Run the test agent and print results - wrapped in a root span
    """
    logger.info("Starting test agent execution...")
    
    with tracer.start_as_current_span("test_agent_execution_root") as root_span:
        root_span.set_attribute("agent.name", "test_agent")
        root_span.set_attribute("agent.version", "1.0.0")
        root_span.set_attribute("execution.type", "full_agent_run")
        
        root_span.add_event("Agent execution started")
        
        try:
            root_span.add_event("Creating agent graph")
            agent = create_test_agent()
            root_span.set_attribute("agent.nodes_count", 2)
            
            initial_state = {
                "http_result": "",
                "span_result": "",
                "messages": []
            }
            root_span.add_event("Initial state prepared")
            
            root_span.add_event("Starting agent invocation")
            final_state = await agent.ainvoke(initial_state)
            
            root_span.set_attribute("execution.status", "completed")
            root_span.set_attribute("http_call.success", "success" in final_state.get('http_result', ''))
            root_span.set_attribute("span_creation.success", "successfully" in final_state.get('span_result', ''))
            
            root_span.add_event("Agent execution completed successfully")
            
            print("\n" + "="*60)
            print("TEST AGENT EXECUTION COMPLETED!")
            print("="*60)
            print(f"HTTP Result: {final_state.get('http_result', 'N/A')}")
            print(f"Span Result: {final_state.get('span_result', 'N/A')}")
            print(f"Root Span ID: {root_span.get_span_context().span_id}")
            print(f"Trace ID: {root_span.get_span_context().trace_id}")
            print("="*60)
            
            return final_state
            
        except Exception as e:
            root_span.set_attribute("execution.status", "failed")
            root_span.set_attribute("error.type", type(e).__name__)
            root_span.set_attribute("error.message", str(e))
            root_span.add_event("Agent execution failed", {"error": str(e)})
            
            logger.error(f"Test agent execution failed: {e}")
            raise


async def main():
    """
    Main function to run the test
    """
    try:
        await run_test_agent()
        print("\nAll spans have been created and should be visible in your tracing backend!")
    except KeyboardInterrupt:
        print("\nTest interrupted by user")
    except Exception as e:
        print(f"\nTest failed: {e}")


if __name__ == "__main__":
    asyncio.run(main()) 

👀 Have you spent some time to check if this bug has been raised before?

  • I checked and didn't find similar issue

Are you willing to submit PR?

None

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions