Skip to content

Conversation

@tbarbugli
Copy link
Member

@tbarbugli tbarbugli commented Nov 12, 2025

Summary by CodeRabbit

  • Chores
    • Restructured the audio input pipeline: introduced an incoming-audio queue, new consumer flow, and simplified audio event handling.
    • Updated audio processor interface to accept PCM-style audio payloads and adjusted logging to reflect PCM data.
    • Exposed new audio/video track attributes used by media handling.
    • Bumped getstream dependency to >=2.5.15 and applied a minor comment formatting tweak.

@coderabbitai
Copy link

coderabbitai bot commented Nov 12, 2025

Walkthrough

Audio handling refactored to use PcmData objects across agents-core and the getstream plugin: new incoming audio queue and consumer, processor API signatures changed to accept PcmData, getstream transport handler updated to derive participant from PcmData, and getstream dependency bumped.

Changes

Cohort / File(s) Summary
Build Configuration
agents-core/pyproject.toml
Bumped getstream dependency from >=2.5.14 to >=2.5.15; minor whitespace change in a commented local dev line.
Audio Queue & Processing Refactor
agents-core/vision_agents/core/agents/agents.py
Added _incoming_audio_queue, _audio_track, _video_track; on_audio_received now enqueues PcmData; added _consume_incoming_audio consumer with turn-detection; join() starts new consumer; processors invoked with PcmData.
Processor API Signature Update
agents-core/vision_agents/core/processors/base_processor.py
Imported PcmData; changed AudioProcessorMixin.process_audio to process_audio(self, audio_data: PcmData); updated AudioLogger.process_audio to accept PcmData, derive user_id from audio_data.participant, and compute byte length via audio_data.to_bytes().
GetStream Transport Handler
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py
Removed Participant import/param; on_audio_received now accepts only PcmData and derives participant from pcm.participant when constructing AudioReceivedEvent; skips event when pcm is None.

Sequence Diagram(s)

sequenceDiagram
    participant StreamEdge as Stream Edge Transport
    participant Agent as Agent Audio Handler
    participant Queue as Incoming Audio Queue
    participant Consumer as Audio Consumer
    participant Processor as Audio Processor

    StreamEdge->>Agent: on_audio_received(pcm: PcmData)
    Agent->>Queue: enqueue(pcm)
    Queue->>Consumer: consume()
    Consumer->>Consumer: turn_detection(pcm)
    Consumer->>Processor: process_audio(pcm: PcmData)
    Note right of Processor `#bfe1d9`: processor reads pcm.participant\nand uses pcm.to_bytes() as needed
    Processor-->>Consumer: processed
Loading
sequenceDiagram
    actor User as Human
    participant OldFlow as Old Flow\n(bytes + participant)
    participant NewFlow as New Flow\n(PcmData)

    User->>OldFlow: audio bytes + participant
    OldFlow->>OldFlow: processors receive (bytes, Participant)

    User->>NewFlow: audio PCM wrapped as PcmData
    NewFlow->>NewFlow: single `PcmData` carries audio + participant
    NewFlow->>NewFlow: processors receive PcmData and extract participant
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Review _consume_incoming_audio lifecycle, concurrency, and turn-detection correctness in agents.py.
  • Verify all processors and external callsites updated to the new process_audio(PcmData) signature.
  • Confirm pcm.participant is present in all transport/event paths and guarded where pcm may be None.
  • Check STT/realtime branches for correct use of PcmData.to_bytes() and participant extraction.

Possibly related PRs

Suggested labels

plugin-getstream

Suggested reviewers

  • dangusev

Poem

I folded the voice into a cold, small vessel,
its ribs of bytes and silence clinking like bone.
The queue breathes, an honest dark that names the speaker,
and the new signatures, slow and clean, listen for the ruin.
In the hush the participant is carried inside the sound.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: updating audio processor signatures from bytes to PcmData across multiple files and components.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch audio-processor

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@tbarbugli tbarbugli marked this pull request as ready for review November 12, 2025 13:50
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
agents-core/vision_agents/core/agents/agents.py (1)

899-903: Keep the consumer running when a processor misbehaves.
Line 899-903: If one audio processor raises, _consume_incoming_audio exits after the outer handler logs the error, and we stop forwarding audio for the remainder of the call. Wrapping each processor invocation lets us log the offender but keep the loop alive.

Apply this diff to contain failures per processor:

         for processor in self.audio_processors:
             if processor is None:
                 continue
-            await processor.process_audio(pcm)
+            try:
+                await processor.process_audio(pcm)
+            except Exception:
+                self.logger.exception(
+                    "Audio processor %s failed; skipping chunk",
+                    type(processor).__name__,
+                )
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 0eeba28 and 53138af.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (4)
  • agents-core/pyproject.toml (2 hunks)
  • agents-core/vision_agents/core/agents/agents.py (7 hunks)
  • agents-core/vision_agents/core/processors/base_processor.py (4 hunks)
  • plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
agents-core/vision_agents/core/processors/base_processor.py (1)
plugins/krisp/vision_agents/plugins/krisp/turn_detection.py (1)
  • process_audio (78-119)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (2)
agents-core/vision_agents/core/agents/agents.py (1)
  • on_audio_received (305-309)
conftest.py (1)
  • participant (132-134)
agents-core/vision_agents/core/agents/agents.py (12)
agents-core/vision_agents/core/utils/audio_queue.py (3)
  • AudioQueue (12-274)
  • put (50-83)
  • get_duration (239-258)
agents-core/vision_agents/core/edge/types.py (1)
  • OutputAudioTrack (39-49)
agents-core/vision_agents/core/processors/base_processor.py (2)
  • process_audio (56-63)
  • process_audio (155-168)
agents-core/vision_agents/core/stt/stt.py (1)
  • process_audio (129-132)
agents-core/vision_agents/core/turn_detection/turn_detection.py (1)
  • process_audio (73-85)
plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py (1)
  • process_audio (83-119)
plugins/smart_turn/vision_agents/plugins/smart_turn/smart_turn_detection.py (1)
  • process_audio (151-163)
plugins/vogent/vision_agents/plugins/vogent/vogent_turn_detection.py (1)
  • process_audio (164-175)
plugins/fish/vision_agents/plugins/fish/stt.py (1)
  • process_audio (53-140)
plugins/wizper/vision_agents/plugins/wizper/stt.py (1)
  • process_audio (62-137)
plugins/krisp/vision_agents/plugins/krisp/turn_detection.py (1)
  • process_audio (78-119)
agents-core/vision_agents/core/vad/vad.py (1)
  • process_audio (140-189)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: unit / Test "not integration"
  • GitHub Check: unit / Ruff & mypy
  • GitHub Check: unit / Test "not integration"
  • GitHub Check: unit / Ruff & mypy
🔇 Additional comments (5)
agents-core/pyproject.toml (1)

24-24: Dependency bump aligns with PCM support.
Line 24: Raising the minimum to getstream[webrtc,telemetry]>=2.5.15 matches the newly required PcmData API, so downstream imports stay consistent.

plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1)

296-303: Nice to derive participant from the PCM envelope.
Line 296-303: Pulling participant straight off the PcmData keeps the signature minimal while preserving the event payload we rely on downstream.

agents-core/vision_agents/core/agents/agents.py (1)

305-310: Queueing inbound PCM keeps consumers consistent.
Line 305-310: Feeding AudioReceivedEvent chunks through _incoming_audio_queue ensures every downstream path operates on uniform PcmData windows; thanks for tightening that flow.

agents-core/vision_agents/core/processors/base_processor.py (2)

56-62: Interface docs now match the PCM contract.
Line 56-62: Updating the abstract signature and docstring to PcmData makes the expectations explicit for implementers adopting the new pipeline.


155-168: Audio logger adapts cleanly to PcmData.
Line 155-168: The logger now derives user id from audio_data.participant and reports length via the PCM conversion, keeping observability intact after the API shift.

self.audio_count += 1
user_id = "unknown"
if hasattr(audio_data, "participant") and audio_data.participant:
user_id = getattr(audio_data.participant, "user_id", "unknown")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
user_id = getattr(audio_data.participant, "user_id", "unknown")
user_id = audio_data.participant.user_id or "unknown"

pcm_data=pcm,
participant=participant,
async def on_audio_received(pcm: PcmData):
if pcm is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it would be None here?

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 70e986f and 38edcd1.

📒 Files selected for processing (1)
  • plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (3)
agents-core/vision_agents/core/agents/agents.py (1)
  • on_audio_received (305-309)
agents-core/vision_agents/core/edge/events.py (1)
  • AudioReceivedEvent (9-14)
agents-core/vision_agents/core/edge/sfu_events.py (10)
  • participant (1496-1501)
  • participant (1504-1507)
  • participant (1545-1550)
  • participant (1553-1556)
  • participant (1625-1630)
  • participant (1633-1636)
  • participant (2100-2105)
  • participant (2108-2111)
  • participant (2156-2161)
  • participant (2164-2167)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: unit / Ruff & mypy
  • GitHub Check: unit / Test "not integration"
  • GitHub Check: unit / Ruff & mypy
  • GitHub Check: unit / Test "not integration"

Comment on lines +296 to 303
async def on_audio_received(pcm: PcmData):
self.events.send(
events.AudioReceivedEvent(
plugin_name="getstream",
pcm_data=pcm,
participant=participant,
participant=pcm.participant,
)
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify that pcm is never None, or add a defensive check.

The old signature allowed PcmData | None, but the new signature removes the | None union. If the getstream library can invoke this callback with None, line 301 will raise an AttributeError when accessing pcm.participant.

Run the following script to verify None handling patterns:

If the library can pass None, consider adding:

 @connection.on("audio")
 async def on_audio_received(pcm: PcmData):
+    if pcm is None:
+        logger.warning("Received None pcm data, skipping")
+        return
     self.events.send(
         events.AudioReceivedEvent(

🏁 Script executed:

#!/bin/bash
# Search for None checks on PcmData or audio events in the codebase
rg -nP --type=py -C3 'pcm.*is None|pcm_data.*is None|on_audio.*None'

# Search for getstream library documentation or examples of audio callback usage
rg -nP --type=py -C3 '@.*\.on\("audio"\)|on_audio_received'

Length of output: 3555



Let me also check if there's a requirements or setup file that shows the getstream library version:

[/run_scripts]


🏁 Script executed:

#!/bin/bash
# Search for getstream imports and any type hints related to audio callbacks
rg -nP --type=py 'from.*getstream|import.*getstream' plugins/getstream/ | head -20

# Look for any type stubs or annotations for the connection object
rg -nP --type=py 'connection.*:.*|\.on\("audio"\)' plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py -C5

Length of output: 3377


🏁 Script executed:

#!/bin/bash
# Find dependency specifications
fd -e txt -e toml -e cfg -e lock plugins/getstream/ --exec cat {} \;

Length of output: 377


[/web_search]


🌐 Web query:

getstream SDK audio callback PcmData None handling documentation

💡 Result:

I couldn't find any official GetStream doc that explicitly describes receiving None for the PcmData audio callback. The SDK docs show the audio callback signature (connection.on("audio") with PcmData) and plugin examples but do not document a None case — see GetStream Python Video docs and plugin integration pages. [1][2]

If you want, I can:

  • show a short defensive Python handler that safely ignores or handles pcm == None (e.g., treat as silence/end-of-stream), or
  • open the GetStream SDK source (GitHub) to look for where None might be emitted and cite exact behavior.

Which would you prefer?

Sources:
[1] GetStream Python Video docs — audio callback examples.
[2] GetStream Python AI/plugin integration pages (Deepgram/FAL/Silero examples).


Add defensive None check before accessing pcm.participant.

The downstream consumer (agents.py:306-307) explicitly guards against event.pcm_data is None, proving the event system carries None values. The old signature accepted PcmData | None, but the new code removes this without adding a guard. Accessing pcm.participant at line 301 will raise AttributeError if pcm is None.

Add:

@connection.on("audio")
async def on_audio_received(pcm: PcmData):
    if pcm is None:
        return
    self.events.send(
        events.AudioReceivedEvent(
            plugin_name="getstream",
            pcm_data=pcm,
            participant=pcm.participant,
        )
    )

Alternatively, update the type hint to pcm: PcmData | None if the library can pass None, and add the guard above.

🤖 Prompt for AI Agents
In plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py
around lines 296 to 303, the on_audio_received handler assumes pcm is never None
and directly accesses pcm.participant which will raise AttributeError if the
event payload is None; add a defensive guard so the handler returns early when
pcm is None (or change the type hint to PcmData | None and then check for None)
before calling self.events.send, ensuring you only construct AudioReceivedEvent
with pcm_data and participant when pcm is present.

@tbarbugli tbarbugli merged commit 4630585 into main Nov 12, 2025
6 checks passed
@tbarbugli tbarbugli deleted the audio-processor branch November 12, 2025 15:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants