Skip to content

Commit facedf2

Browse files
committed
fix: critical video feed mismatch and VideoForwarder resource leaks
CRITICAL FIXES: 1. Video Feed Mismatch (LLM getting wrong video) - When YOLO/video processors are used, LLM was receiving empty processed track - Root cause: shared_forwarder was created from RAW track but LLM was given processed track - Fix: Create separate forwarders for raw and processed video tracks - Now LLM correctly receives YOLO-annotated frames when using pose detection 2. VideoForwarder Resource Leaks - Consumer tasks were never removed from _tasks set (memory leak) - Fix: Add task.add_done_callback(self._task_done) to clean up tasks - Producer exceptions were silently swallowed - Fix: Log and re-raise exceptions for proper error handling 3. Race Condition in VideoForwarder.stop() - Used list() snapshot for cancellation but original set for gather() - Fix: Use tasks_snapshot consistently throughout stop() 4. Multiple start() Protection - No guard against calling start() multiple times - Fix: Add _started flag and early return with warning 5. Missing VideoForwarder Cleanup in Agent - Forwarders were created but never stopped on agent.close() - Fix: Track all forwarders and stop them in close() method These fixes prevent resource leaks, ensure correct video routing, and improve error visibility for production debugging.
1 parent fbc1759 commit facedf2

File tree

2 files changed

+80
-28
lines changed

2 files changed

+80
-28
lines changed

agents-core/vision_agents/core/agents/agents.py

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,15 @@ async def close(self):
302302

303303
for processor in self.processors:
304304
processor.close()
305+
306+
# Stop all video forwarders
307+
if hasattr(self, '_video_forwarders'):
308+
for forwarder in self._video_forwarders:
309+
try:
310+
await forwarder.stop()
311+
except Exception as e:
312+
self.logger.error(f"Error stopping video forwarder: {e}")
313+
self._video_forwarders.clear()
305314

306315
# Close Realtime connection
307316
if self._realtime_connection:
@@ -579,44 +588,59 @@ async def _process_track(self, track_id: str, track_type: str, participant):
579588
# Import VideoForwarder
580589
from ..utils.video_forwarder import VideoForwarder
581590

582-
# Create a SHARED VideoForwarder that all consumers will subscribe to
591+
# Create a SHARED VideoForwarder for the RAW incoming track
583592
# This prevents multiple recv() calls competing on the same track
584-
shared_forwarder = VideoForwarder(
593+
raw_forwarder = VideoForwarder(
585594
track, # type: ignore[arg-type]
586595
max_buffer=30,
587596
fps=30, # Max FPS for the producer (individual consumers can throttle down)
588-
name="shared_video_forwarder",
597+
name=f"raw_video_forwarder_{track_id}",
589598
)
590-
await shared_forwarder.start()
591-
self.logger.info("🎥 Created shared VideoForwarder for track %s", track_id)
599+
await raw_forwarder.start()
600+
self.logger.info("🎥 Created raw VideoForwarder for track %s", track_id)
601+
602+
# Track forwarders for cleanup
603+
if not hasattr(self, '_video_forwarders'):
604+
self._video_forwarders = []
605+
self._video_forwarders.append(raw_forwarder)
592606

593-
# If Realtime provider supports video, tell it to watch the video
607+
# If Realtime provider supports video, determine which track to send
594608
if self.realtime_mode:
595-
# TODO: should we make this configurable? some use cases will want source, others processed track
596-
track_to_watch = track
597609
if self._video_track:
598-
self.logger.info("Forwarding processed video frames to Realtime provider")
599-
track_to_watch = self._video_track
610+
# We have a video publisher (e.g., YOLO processor)
611+
# Create a separate forwarder for the PROCESSED video track
612+
self.logger.info("🎥 Forwarding PROCESSED video frames to Realtime provider")
613+
processed_forwarder = VideoForwarder(
614+
self._video_track, # type: ignore[arg-type]
615+
max_buffer=30,
616+
fps=30,
617+
name=f"processed_video_forwarder_{track_id}",
618+
)
619+
await processed_forwarder.start()
620+
self._video_forwarders.append(processed_forwarder)
621+
622+
if isinstance(self.llm, Realtime):
623+
# Send PROCESSED frames with the processed forwarder
624+
await self.llm._watch_video_track(self._video_track, shared_forwarder=processed_forwarder)
600625
else:
601-
self.logger.info("Forwarding original video frames to Realtime provider")
602-
603-
if isinstance(self.llm, Realtime):
604-
# Pass the shared forwarder to the realtime provider
605-
await self.llm._watch_video_track(track_to_watch, shared_forwarder=shared_forwarder)
626+
# No video publisher, send raw frames
627+
self.logger.info("🎥 Forwarding RAW video frames to Realtime provider")
628+
if isinstance(self.llm, Realtime):
629+
await self.llm._watch_video_track(track, shared_forwarder=raw_forwarder)
606630

607631

608632
hasImageProcessers = len(self.image_processors) > 0
609633

610-
# video processors - pass the shared forwarder
634+
# video processors - pass the raw forwarder (they process incoming frames)
611635
for processor in self.video_processors:
612636
try:
613-
await processor.process_video(track, participant.user_id, shared_forwarder=shared_forwarder)
637+
await processor.process_video(track, participant.user_id, shared_forwarder=raw_forwarder)
614638
except Exception as e:
615639
self.logger.error(
616640
f"Error in video processor {type(processor).__name__}: {e}"
617641
)
618642

619-
# Use shared forwarder for image processors - only if there are image processors
643+
# Use raw forwarder for image processors - only if there are image processors
620644
if not hasImageProcessers:
621645
# No image processors, just keep the connection alive
622646
self.logger.info("No image processors, video processing handled by video processors only")
@@ -628,8 +652,8 @@ async def _process_track(self, track_id: str, track_type: str, participant):
628652

629653
while True:
630654
try:
631-
# Use the shared forwarder instead of competing for track.recv()
632-
video_frame = await shared_forwarder.next_frame(timeout=2.0)
655+
# Use the raw forwarder instead of competing for track.recv()
656+
video_frame = await raw_forwarder.next_frame(timeout=2.0)
633657

634658
if video_frame:
635659
# Reset error counts on successful frame processing

agents-core/vision_agents/core/utils/video_forwarder.py

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,31 @@ def __init__(self, input_track: VideoStreamTrack, *, max_buffer: int = 10, fps:
2424
self.fps = fps # None = unlimited, else forward at ~fps
2525
self._tasks: set[asyncio.Task] = set()
2626
self._stopped = asyncio.Event()
27+
self._started = False
2728
self.name = name
2829

2930
# ---------- lifecycle ----------
3031
async def start(self) -> None:
32+
if self._started:
33+
logger.warning("%s: start() called but already started", self.name)
34+
return
35+
self._started = True
3136
self._stopped.clear()
32-
self._tasks.add(asyncio.create_task(self._producer()))
37+
task = asyncio.create_task(self._producer())
38+
task.add_done_callback(self._task_done)
39+
self._tasks.add(task)
3340

3441
async def stop(self) -> None:
42+
if not self._started:
43+
return
3544
self._stopped.set()
36-
for t in list(self._tasks):
45+
self._started = False
46+
# Create snapshot of tasks to avoid race conditions
47+
tasks_snapshot = list(self._tasks)
48+
for t in tasks_snapshot:
3749
t.cancel()
38-
if self._tasks:
39-
await asyncio.gather(*self._tasks, return_exceptions=True)
50+
if tasks_snapshot:
51+
await asyncio.gather(*tasks_snapshot, return_exceptions=True)
4052
self._tasks.clear()
4153
# drain queue
4254
try:
@@ -45,6 +57,19 @@ async def stop(self) -> None:
4557
except asyncio.QueueEmpty:
4658
pass
4759

60+
def _task_done(self, task: asyncio.Task) -> None:
61+
"""Callback to remove completed tasks from the set."""
62+
self._tasks.discard(task)
63+
if task.cancelled():
64+
return
65+
# Log any exceptions from tasks
66+
try:
67+
exc = task.exception()
68+
if exc:
69+
logger.error("%s: Task failed with exception: %s", self.name, exc, exc_info=exc)
70+
except asyncio.CancelledError:
71+
pass
72+
4873
# ---------- producer (fills latest-N buffer) ----------
4974
async def _producer(self):
5075
try:
@@ -53,9 +78,9 @@ async def _producer(self):
5378
await self.queue.put_latest(frame)
5479
except asyncio.CancelledError:
5580
raise
56-
except Exception:
57-
# optional: log
58-
pass
81+
except Exception as e:
82+
logger.error("%s: Producer failed with exception: %s", self.name, e, exc_info=True)
83+
raise
5984

6085
# ---------- consumer API (pull one frame; coalesce backlog to newest) ----------
6186
async def next_frame(self, *, timeout: Optional[float] = None) -> av.VideoFrame:
@@ -165,5 +190,8 @@ async def _consumer():
165190
raise
166191
except Exception:
167192
logger.exception("unexpected error in video forwarder consumer [%s]", consumer_label)
193+
raise
168194

169-
self._tasks.add(asyncio.create_task(_consumer()))
195+
task = asyncio.create_task(_consumer())
196+
task.add_done_callback(self._task_done)
197+
self._tasks.add(task)

0 commit comments

Comments
 (0)