Skip to content

Conversation

@duncankmckinnon
Copy link

@duncankmckinnon duncankmckinnon commented Nov 13, 2025

Add OpenInference Instrumentation for Pipecat

This PR implements comprehensive OpenTelemetry tracing for Pipecat voice agents using OpenInference semantic conventions, enabling production-ready observability for voice AI applications.

Overview

Adds automatic instrumentation for Pipecat pipelines that captures:

  • Turn-level spans: Complete conversation exchanges with user input/output
  • Service-level spans: Individual LLM, TTS, and STT operations with proper directionality
  • Flat span hierarchy: All service spans as siblings under turn spans for clear visualization
  • Rich attributes: Model names, providers, token counts, latency metrics, and full conversation history in Arize-compatible format

Key Features

1. Automatic Instrumentation via Observer Pattern

The instrumentor wraps PipelineTask.__init__ to automatically inject an observer into every task:

from openinference.instrumentation.pipecat import PipecatInstrumentor
from arize.otel import register

tracer_provider = register(
    space_id=os.getenv("ARIZE_SPACE_ID"),
    api_key=os.getenv("ARIZE_API_KEY"),
    project_name=os.getenv("ARIZE_PROJECT_NAME"),
)

PipecatInstrumentor().instrument(
    tracer_provider=tracer_provider,
    debug_log_filename="debug.log"  # Optional
)

No code changes needed in your pipeline - just instrument once and all PipelineTask instances get automatic tracing.

2. Intelligent Turn Tracking

Implements conversation turn tracking using speaking events to define natural conversation boundaries:

  • Start turn: UserStartedSpeakingFrame or StartFrame (first pipeline frame)
  • End turn: Timeout after BotStoppedSpeakingFrame (configurable, default 2.5s)
  • Interruption handling: New turn starts immediately when user interrupts bot
  • Auto-start: First service activity auto-starts a turn if none active

This approach ensures one turn span per actual conversation exchange with proper handling of multi-part bot responses (e.g., function calls causing multiple TTS segments).

3. Bidirectional Frame Processing with Deduplication

Captures frames both entering (input) and leaving (output) services with intelligent deduplication:

  • Directional filtering: INPUT_VALUE only captured when is_input=True, OUTPUT_VALUE only when is_input=False
  • Streaming accumulation: LLM and TTS streaming chunks accumulated with smart deduplication
  • Special handling for STT: TranscriptionFrame is OUTPUT from STT but recorded as INPUT for observability
  • TTS filtering: Only captures TTS text when going to BaseOutputTransport (final output)

Deduplication handles cumulative chunks (e.g., "Hello" → "Hello world" → "Hello world!") by detecting overlaps and extracting only new content.

4. Multiple LLM Invocations Per Turn

Properly handles multiple LLM calls within a single turn (e.g., function calling flows):

  • Detects new invocations via LLMContextFrame
  • Finishes previous LLM span before starting new one
  • Each LLM call gets its own span with full message context
  • Prevents output accumulation across different invocations

5. Flattened Message Format for Arize

Message history is exported in the flattened format that Arize expects:

# Instead of a single JSON string:
"llm.input_messages": "[{role: 'user', content: '...'}]"

# We set individual attributes:
"llm.input_messages.0.message.role": "system"
"llm.input_messages.0.message.content": "You are a helpful assistant"
"llm.input_messages.1.message.role": "user"
"llm.input_messages.1.message.content": "What is quantum computing?"
# ... and so on

This enables proper display in Arize's UI with message-level filtering and analysis.

6. Consistent Time Handling

Uses time.time_ns() (Unix epoch) consistently for all span timestamps:

  • Span start time: Recorded at service span creation
  • Span end time: Calculated from start_time_ns + processing_time_seconds when metrics available
  • Avoids mixing clocks: Does not use Pipecat's monotonic_ns() timestamps which are relative to pipeline start

Ensures end_time >= start_time invariant required by OpenTelemetry.

7. Multi-Provider Service Detection

Automatically detects and attributes service types and providers:

  • LLM Services: OpenAI, Anthropic (sets llm.provider, llm.model_name)
  • TTS Services: OpenAI, ElevenLabs, Cartesia (sets audio.voice, audio.voice_id)
  • STT Services: OpenAI, Deepgram, Cartesia
  • Generic detection: Works with any service inheriting from Pipecat base classes

Sets service.name to the actual service class name for unique identification.

8. Session Tracking

Automatically extracts conversation_id from PipelineTask and sets as session.id attribute on all spans, enabling conversation-level filtering in observability platforms.

Implementation Details

Core Components

PipecatInstrumentor (init.py)

  • Wraps PipelineTask.__init__ using wrapt
  • Injects OpenInferenceObserver into each task
  • Supports optional debug_log_filename parameter for detailed frame logging
  • Thread-safe: creates separate observer instance per task

OpenInferenceObserver (_observer.py)

  • Implements Pipecat's BaseObserver interface
  • Listens to on_push_frame events with bidirectional processing (input/output)
  • Creates turn spans and service spans with proper OpenTelemetry context propagation
  • Tracks turn state: active turn, user text, bot text, speaking status
  • Handles frame deduplication to avoid processing propagated frames twice
  • Auto-starts turns when first service activity detected
  • Finishes service spans before turn spans to maintain proper hierarchy

Frame Attribute Extractors (_attributes.py)

  • Extracts OpenInference-compliant attributes from Pipecat frames
  • Handles multiple frame types: TranscriptionFrame, LLMContextFrame, LLMTextFrame, TTSTextFrame, MetricsFrame, etc.
  • Captures: LLM messages (flattened format), audio metadata, token counts, processing times, errors
  • Service attribute extraction for span creation with provider-specific details

Span Hierarchy

pipecat.conversation.turn (trace_id: abc123)
├── pipecat.stt (parent_id: turn_span_id, trace_id: abc123)
├── pipecat.llm (parent_id: turn_span_id, trace_id: abc123)
├── pipecat.llm (parent_id: turn_span_id, trace_id: abc123)  # Second invocation
└── pipecat.tts (parent_id: turn_span_id, trace_id: abc123)

Flat hierarchy: All service spans are siblings under the turn span (no nesting) for clearer visualization in tracing UIs. All spans within a turn share the same trace_id and have session.id attribute set.

Context Propagation

Service spans are created with the turn span's context:

turn_context = trace_api.set_span_in_context(self._turn_span)
span = self._tracer.start_span(
    name=f"pipecat.{service_type}",
    context=turn_context,  # Links to turn span
)

This ensures proper parent-child relationships and enables distributed tracing.

Testing

Test Coverage

69 tests covering:

  1. Instrumentor Basics (test_instrumentor.py):

    • Initialization, instrumentation, uninstrumentation
    • Observer injection into tasks
    • Singleton behavior
    • Configuration handling
  2. Turn Tracking (test_turn_tracking.py):

    • Turn creation on user/bot speech
    • Multiple sequential turns
    • Turn interruption handling
    • Input/output text capture
    • Session ID attribution
    • Turn span hierarchy
  3. Service Detection (test_service_detection.py):

    • LLM/TTS/STT service type detection
    • Multi-provider detection (OpenAI, Anthropic, ElevenLabs, Deepgram)
    • Metadata extraction (models, voices, providers)
    • Custom service inheritance
  4. Provider Spans (test_provider_spans.py):

    • Span creation for different providers
    • Correct span attributes per service type
    • Input/output capture for each service
    • Mixed provider pipelines
    • Provider-specific attributes (model names, voice IDs)

Mock Infrastructure

Comprehensive mocks in conftest.py:

  • Mock LLM/TTS/STT services with configurable metadata
  • Helper functions for running pipeline tasks
  • Span extraction and assertion utilities
  • Support for multiple provider combinations

All tests use in-memory span exporters for fast, isolated testing.

Example Usage

Complete Tracing Example

See examples/trace/001-trace.py for a full working example:

from openinference.instrumentation.pipecat import PipecatInstrumentor
from arize.otel import register

# Generate unique conversation ID
conversation_id = f"conversation-{datetime.now().strftime('%Y%m%d_%H%M%S')}"
debug_log_filename = f"pipecat_frames_{conversation_id}.log"

# Set up tracing
tracer_provider = register(
    space_id=os.getenv("ARIZE_SPACE_ID"),
    api_key=os.getenv("ARIZE_API_KEY"),
    project_name=os.getenv("ARIZE_PROJECT_NAME"),
)

PipecatInstrumentor().instrument(
    tracer_provider=tracer_provider,
    debug_log_filename=debug_log_filename,
)

# Create your pipeline (STT -> LLM -> TTS)
pipeline = Pipeline([stt, llm, tts, transport.output()])

# Create task with conversation ID
task = PipelineTask(
    pipeline,
    conversation_id=conversation_id,
    params=PipelineParams(enable_metrics=True)
)

# Run - tracing happens automatically!
await runner.run(task)

What Gets Traced

For a single user query → bot response with a follow-up question:

Turn Span (pipecat.conversation.turn):

  • session.id: "conversation-20251113_152502"
  • input.value: "What is quantum computing?"
  • output.value: "Quantum computing is a type of computing that uses quantum mechanics..."
  • conversation.turn_number: 1
  • conversation.turn_duration_seconds: 3.5
  • conversation.end_reason: "completed"

STT Span (pipecat.stt):

  • service.name: "OpenAISTTService"
  • service.type: "stt"
  • llm.provider: "openai"
  • llm.model_name: "gpt-4o-transcribe"
  • input.value: "What is quantum computing?" (transcribed text)
  • audio.transcript: "What is quantum computing?"
  • Duration: 0.78 seconds

LLM Span (pipecat.llm):

  • service.name: "OpenAILLMService"
  • service.type: "llm"
  • llm.provider: "openai"
  • llm.model_name: "gpt-4"
  • input.value: "What is quantum computing?" (last user message)
  • output.value: "Quantum computing is..." (accumulated streaming response)
  • llm.input_messages.0.message.role: "system"
  • llm.input_messages.0.message.content: "You are a helpful assistant"
  • llm.input_messages.1.message.role: "user"
  • llm.input_messages.1.message.content: "What is quantum computing?"
  • llm.output_messages.0.message.role: "assistant"
  • llm.output_messages.0.message.content: "Quantum computing is..."
  • llm.token_count.total: 520
  • llm.token_count.prompt: 380
  • llm.token_count.completion: 140
  • Duration: 2.77 seconds

TTS Span (pipecat.tts):

  • service.name: "OpenAITTSService"
  • service.type: "tts"
  • llm.provider: "openai"
  • llm.model_name: "gpt-4o-mini-tts"
  • audio.voice: "ballad"
  • audio.voice_id: "ballad"
  • output.value: "Quantum computing is..." (synthesized text)
  • service.processing_time_seconds: 1.57
  • Duration: 1.57 seconds

Note

Introduce OpenInference auto-instrumentation for Pipecat with turn- and service-level tracing, examples, and comprehensive tests, plus CI integration.

  • Instrumentation (new package python/instrumentation/openinference-instrumentation-pipecat/):
    • Core: Implement PipecatInstrumentor to auto-inject OpenInferenceObserver into PipelineTask for Pipecat.
    • Observer: Add turn tracking and service-level span creation with input/output accumulation and metrics handling.
    • Attribute Extraction: Map Pipecat frames/services to OpenInference/GenAI span attributes (LLM/STT/TTS, tools, tokens, processing times).
    • Metadata: Package versioning and instrumentation dependency declaration.
  • Docs & Examples:
    • Add README with quickstart; example tracing script (examples/trace/001-trace.py) and env template.
    • Add CHANGELOG and LICENSE.
  • Tests:
    • Extensive unit tests for instrumentor lifecycle, provider/service detection, span attributes, turn tracking, and mixed-provider pipelines.
  • Build/CI:
    • Add pyproject.toml with entry points and extras; update python/tox.ini to include pipecat environments.
  • Misc:
    • Update root .gitignore to ignore *.code-workspace.

Written by Cursor Bugbot for commit f53ef4e. This will update automatically on new commits. Configure here.

@duncankmckinnon duncankmckinnon requested a review from a team as a code owner November 13, 2025 18:26
@dosubot dosubot bot added the size:XXL This PR changes 1000+ lines, ignoring generated files. label Nov 13, 2025
@duncankmckinnon duncankmckinnon changed the title Incorporate pipecat (feat): add auto instrumentor for pipecat Nov 13, 2025
@duncankmckinnon duncankmckinnon changed the title (feat): add auto instrumentor for pipecat feat(pipecat): add auto instrumentor for pipecat Nov 13, 2025

# If we've exceeded our history size, rebuild the set from deque
if len(self._processed_frames) > len(self._frame_history):
self._processed_frames = set(self._frame_history)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Frame ID type mismatch breaks duplicate detection

Frame ID handling is inconsistent between the membership check and storage operations. Line 183 checks frame.id in self._processed_frames using raw frame.id, but line 188 adds int(frame.id) to the set. The deque at line 189 stores raw frame.id, and when the set is rebuilt at line 193 from the deque, it contains raw values. If frame.id is ever a non-integer type (e.g., string "123"), the membership check "123" in {123} returns False, causing the same frame to be processed multiple times and defeating the duplicate detection mechanism.

Fix in Cursor Fix in Web


# Handle frames received by a service (inputs)
# Only process if destination is different from source to avoid double-counting
if dest_service_type and data.destination != data.source:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Truthy string check always passes for service type

The checks if source_service_type: and if dest_service_type: on lines 242 and 247 are always truthy because detect_service_type returns the string "unknown" (truthy) when no service type is found, rather than None or empty string. This causes _handle_service_frame to be called for every frame regardless of whether the source/destination is a recognized service. While there's a correct check inside _handle_service_frame (if service_type != "unknown":), the inconsistency wastes function calls and may cause subtle issues. The checks should be if source_service_type != "unknown": and if dest_service_type != "unknown": to match the internal check.

Additional Locations (1)

Fix in Cursor Fix in Web

# Check if destination is the final output transport
if not isinstance(data.destination, BaseOutputTransport):
self._log_debug(" Skipping TTS chunk (not going to output transport)")
text_chunk = "" # Skip this chunk
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: TTS input chunks always skipped due to incorrect condition

The TTS input text accumulation logic will never accumulate text because the condition is checking the wrong thing. When is_input=True and service_type == "tts", the service (and thus data.destination) is the TTS service itself (set at line 319). The check isinstance(data.destination, BaseOutputTransport) will always be False because TTS services don't inherit from BaseOutputTransport. This causes text_chunk to always be set to empty string, meaning TTS spans will never have their accumulated_input populated. The condition doesn't match the stated intent of "only accumulate if going to output transport."

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

2 participants