-
Notifications
You must be signed in to change notification settings - Fork 168
feat(pipecat): add auto instrumentor for pipecat #2441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
...openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/_observer.py
Outdated
Show resolved
Hide resolved
...eninference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/_attributes.py
Outdated
Show resolved
Hide resolved
...nce-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_instrumentor.py
Outdated
Show resolved
Hide resolved
...nce-instrumentation-pipecat/tests/openinference/instrumentation/pipecat/test_instrumentor.py
Outdated
Show resolved
Hide resolved
...eninference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/_attributes.py
Outdated
Show resolved
Hide resolved
...eninference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/_attributes.py
Show resolved
Hide resolved
python/instrumentation/openinference-instrumentation-pipecat/pyproject.toml
Show resolved
Hide resolved
...eninference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/_attributes.py
Show resolved
Hide resolved
...eninference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/_attributes.py
Show resolved
Hide resolved
|
|
||
| # If we've exceeded our history size, rebuild the set from deque | ||
| if len(self._processed_frames) > len(self._frame_history): | ||
| self._processed_frames = set(self._frame_history) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Frame ID type mismatch breaks duplicate detection
Frame ID handling is inconsistent between the membership check and storage operations. Line 183 checks frame.id in self._processed_frames using raw frame.id, but line 188 adds int(frame.id) to the set. The deque at line 189 stores raw frame.id, and when the set is rebuilt at line 193 from the deque, it contains raw values. If frame.id is ever a non-integer type (e.g., string "123"), the membership check "123" in {123} returns False, causing the same frame to be processed multiple times and defeating the duplicate detection mechanism.
...openinference-instrumentation-pipecat/src/openinference/instrumentation/pipecat/_observer.py
Show resolved
Hide resolved
|
|
||
| # Handle frames received by a service (inputs) | ||
| # Only process if destination is different from source to avoid double-counting | ||
| if dest_service_type and data.destination != data.source: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Truthy string check always passes for service type
The checks if source_service_type: and if dest_service_type: on lines 242 and 247 are always truthy because detect_service_type returns the string "unknown" (truthy) when no service type is found, rather than None or empty string. This causes _handle_service_frame to be called for every frame regardless of whether the source/destination is a recognized service. While there's a correct check inside _handle_service_frame (if service_type != "unknown":), the inconsistency wastes function calls and may cause subtle issues. The checks should be if source_service_type != "unknown": and if dest_service_type != "unknown": to match the internal check.
Additional Locations (1)
| # Check if destination is the final output transport | ||
| if not isinstance(data.destination, BaseOutputTransport): | ||
| self._log_debug(" Skipping TTS chunk (not going to output transport)") | ||
| text_chunk = "" # Skip this chunk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: TTS input chunks always skipped due to incorrect condition
The TTS input text accumulation logic will never accumulate text because the condition is checking the wrong thing. When is_input=True and service_type == "tts", the service (and thus data.destination) is the TTS service itself (set at line 319). The check isinstance(data.destination, BaseOutputTransport) will always be False because TTS services don't inherit from BaseOutputTransport. This causes text_chunk to always be set to empty string, meaning TTS spans will never have their accumulated_input populated. The condition doesn't match the stated intent of "only accumulate if going to output transport."
Add OpenInference Instrumentation for Pipecat
This PR implements comprehensive OpenTelemetry tracing for Pipecat voice agents using OpenInference semantic conventions, enabling production-ready observability for voice AI applications.
Overview
Adds automatic instrumentation for Pipecat pipelines that captures:
Key Features
1. Automatic Instrumentation via Observer Pattern
The instrumentor wraps
PipelineTask.__init__to automatically inject an observer into every task:No code changes needed in your pipeline - just instrument once and all
PipelineTaskinstances get automatic tracing.2. Intelligent Turn Tracking
Implements conversation turn tracking using speaking events to define natural conversation boundaries:
UserStartedSpeakingFrameorStartFrame(first pipeline frame)BotStoppedSpeakingFrame(configurable, default 2.5s)This approach ensures one turn span per actual conversation exchange with proper handling of multi-part bot responses (e.g., function calls causing multiple TTS segments).
3. Bidirectional Frame Processing with Deduplication
Captures frames both entering (input) and leaving (output) services with intelligent deduplication:
is_input=True, OUTPUT_VALUE only whenis_input=FalseTranscriptionFrameis OUTPUT from STT but recorded as INPUT for observabilityBaseOutputTransport(final output)Deduplication handles cumulative chunks (e.g., "Hello" → "Hello world" → "Hello world!") by detecting overlaps and extracting only new content.
4. Multiple LLM Invocations Per Turn
Properly handles multiple LLM calls within a single turn (e.g., function calling flows):
LLMContextFrame5. Flattened Message Format for Arize
Message history is exported in the flattened format that Arize expects:
This enables proper display in Arize's UI with message-level filtering and analysis.
6. Consistent Time Handling
Uses
time.time_ns()(Unix epoch) consistently for all span timestamps:start_time_ns + processing_time_secondswhen metrics availablemonotonic_ns()timestamps which are relative to pipeline startEnsures
end_time >= start_timeinvariant required by OpenTelemetry.7. Multi-Provider Service Detection
Automatically detects and attributes service types and providers:
llm.provider,llm.model_name)audio.voice,audio.voice_id)Sets
service.nameto the actual service class name for unique identification.8. Session Tracking
Automatically extracts
conversation_idfromPipelineTaskand sets assession.idattribute on all spans, enabling conversation-level filtering in observability platforms.Implementation Details
Core Components
PipecatInstrumentor(init.py)PipelineTask.__init__usingwraptOpenInferenceObserverinto each taskdebug_log_filenameparameter for detailed frame loggingOpenInferenceObserver(_observer.py)BaseObserverinterfaceon_push_frameevents with bidirectional processing (input/output)Frame Attribute Extractors (_attributes.py)
TranscriptionFrame,LLMContextFrame,LLMTextFrame,TTSTextFrame,MetricsFrame, etc.Span Hierarchy
Flat hierarchy: All service spans are siblings under the turn span (no nesting) for clearer visualization in tracing UIs. All spans within a turn share the same
trace_idand havesession.idattribute set.Context Propagation
Service spans are created with the turn span's context:
This ensures proper parent-child relationships and enables distributed tracing.
Testing
Test Coverage
69 tests covering:
Instrumentor Basics (
test_instrumentor.py):Turn Tracking (
test_turn_tracking.py):Service Detection (
test_service_detection.py):Provider Spans (
test_provider_spans.py):Mock Infrastructure
Comprehensive mocks in
conftest.py:All tests use in-memory span exporters for fast, isolated testing.
Example Usage
Complete Tracing Example
See examples/trace/001-trace.py for a full working example:
What Gets Traced
For a single user query → bot response with a follow-up question:
Turn Span (
pipecat.conversation.turn):session.id: "conversation-20251113_152502"input.value: "What is quantum computing?"output.value: "Quantum computing is a type of computing that uses quantum mechanics..."conversation.turn_number: 1conversation.turn_duration_seconds: 3.5conversation.end_reason: "completed"STT Span (
pipecat.stt):service.name: "OpenAISTTService"service.type: "stt"llm.provider: "openai"llm.model_name: "gpt-4o-transcribe"input.value: "What is quantum computing?" (transcribed text)audio.transcript: "What is quantum computing?"LLM Span (
pipecat.llm):service.name: "OpenAILLMService"service.type: "llm"llm.provider: "openai"llm.model_name: "gpt-4"input.value: "What is quantum computing?" (last user message)output.value: "Quantum computing is..." (accumulated streaming response)llm.input_messages.0.message.role: "system"llm.input_messages.0.message.content: "You are a helpful assistant"llm.input_messages.1.message.role: "user"llm.input_messages.1.message.content: "What is quantum computing?"llm.output_messages.0.message.role: "assistant"llm.output_messages.0.message.content: "Quantum computing is..."llm.token_count.total: 520llm.token_count.prompt: 380llm.token_count.completion: 140TTS Span (
pipecat.tts):service.name: "OpenAITTSService"service.type: "tts"llm.provider: "openai"llm.model_name: "gpt-4o-mini-tts"audio.voice: "ballad"audio.voice_id: "ballad"output.value: "Quantum computing is..." (synthesized text)service.processing_time_seconds: 1.57Note
Introduce OpenInference auto-instrumentation for Pipecat with turn- and service-level tracing, examples, and comprehensive tests, plus CI integration.
python/instrumentation/openinference-instrumentation-pipecat/):PipecatInstrumentorto auto-injectOpenInferenceObserverintoPipelineTaskfor Pipecat.examples/trace/001-trace.py) and env template.pyproject.tomlwith entry points and extras; updatepython/tox.inito include pipecat environments..gitignoreto ignore*.code-workspace.Written by Cursor Bugbot for commit f53ef4e. This will update automatically on new commits. Configure here.