@@ -302,6 +302,15 @@ async def close(self):
302302
303303 for processor in self .processors :
304304 processor .close ()
305+
306+ # Stop all video forwarders
307+ if hasattr (self , '_video_forwarders' ):
308+ for forwarder in self ._video_forwarders :
309+ try :
310+ await forwarder .stop ()
311+ except Exception as e :
312+ self .logger .error (f"Error stopping video forwarder: { e } " )
313+ self ._video_forwarders .clear ()
305314
306315 # Close Realtime connection
307316 if self ._realtime_connection :
@@ -579,44 +588,59 @@ async def _process_track(self, track_id: str, track_type: str, participant):
579588 # Import VideoForwarder
580589 from ..utils .video_forwarder import VideoForwarder
581590
582- # Create a SHARED VideoForwarder that all consumers will subscribe to
591+ # Create a SHARED VideoForwarder for the RAW incoming track
583592 # This prevents multiple recv() calls competing on the same track
584- shared_forwarder = VideoForwarder (
593+ raw_forwarder = VideoForwarder (
585594 track , # type: ignore[arg-type]
586595 max_buffer = 30 ,
587596 fps = 30 , # Max FPS for the producer (individual consumers can throttle down)
588- name = "shared_video_forwarder " ,
597+ name = f"raw_video_forwarder_ { track_id } " ,
589598 )
590- await shared_forwarder .start ()
591- self .logger .info ("🎥 Created shared VideoForwarder for track %s" , track_id )
599+ await raw_forwarder .start ()
600+ self .logger .info ("🎥 Created raw VideoForwarder for track %s" , track_id )
601+
602+ # Track forwarders for cleanup
603+ if not hasattr (self , '_video_forwarders' ):
604+ self ._video_forwarders = []
605+ self ._video_forwarders .append (raw_forwarder )
592606
593- # If Realtime provider supports video, tell it to watch the video
607+ # If Realtime provider supports video, determine which track to send
594608 if self .realtime_mode :
595- # TODO: should we make this configurable? some use cases will want source, others processed track
596- track_to_watch = track
597609 if self ._video_track :
598- self .logger .info ("Forwarding processed video frames to Realtime provider" )
599- track_to_watch = self ._video_track
610+ # We have a video publisher (e.g., YOLO processor)
611+ # Create a separate forwarder for the PROCESSED video track
612+ self .logger .info ("🎥 Forwarding PROCESSED video frames to Realtime provider" )
613+ processed_forwarder = VideoForwarder (
614+ self ._video_track , # type: ignore[arg-type]
615+ max_buffer = 30 ,
616+ fps = 30 ,
617+ name = f"processed_video_forwarder_{ track_id } " ,
618+ )
619+ await processed_forwarder .start ()
620+ self ._video_forwarders .append (processed_forwarder )
621+
622+ if isinstance (self .llm , Realtime ):
623+ # Send PROCESSED frames with the processed forwarder
624+ await self .llm ._watch_video_track (self ._video_track , shared_forwarder = processed_forwarder )
600625 else :
601- self .logger .info ("Forwarding original video frames to Realtime provider" )
602-
603- if isinstance (self .llm , Realtime ):
604- # Pass the shared forwarder to the realtime provider
605- await self .llm ._watch_video_track (track_to_watch , shared_forwarder = shared_forwarder )
626+ # No video publisher, send raw frames
627+ self .logger .info ("🎥 Forwarding RAW video frames to Realtime provider" )
628+ if isinstance (self .llm , Realtime ):
629+ await self .llm ._watch_video_track (track , shared_forwarder = raw_forwarder )
606630
607631
608632 hasImageProcessers = len (self .image_processors ) > 0
609633
610- # video processors - pass the shared forwarder
634+ # video processors - pass the raw forwarder (they process incoming frames)
611635 for processor in self .video_processors :
612636 try :
613- await processor .process_video (track , participant .user_id , shared_forwarder = shared_forwarder )
637+ await processor .process_video (track , participant .user_id , shared_forwarder = raw_forwarder )
614638 except Exception as e :
615639 self .logger .error (
616640 f"Error in video processor { type (processor ).__name__ } : { e } "
617641 )
618642
619- # Use shared forwarder for image processors - only if there are image processors
643+ # Use raw forwarder for image processors - only if there are image processors
620644 if not hasImageProcessers :
621645 # No image processors, just keep the connection alive
622646 self .logger .info ("No image processors, video processing handled by video processors only" )
@@ -628,8 +652,8 @@ async def _process_track(self, track_id: str, track_type: str, participant):
628652
629653 while True :
630654 try :
631- # Use the shared forwarder instead of competing for track.recv()
632- video_frame = await shared_forwarder .next_frame (timeout = 2.0 )
655+ # Use the raw forwarder instead of competing for track.recv()
656+ video_frame = await raw_forwarder .next_frame (timeout = 2.0 )
633657
634658 if video_frame :
635659 # Reset error counts on successful frame processing
0 commit comments