Skip to content

Commit 1550422

Browse files
committed
Implement async parallel detection for Moondream, Roboflow, and YOLO processors
- Add detection_fps parameter to control inference rate independently of video FPS - Remove fps parameter from processors (video now runs at native rate) - Implement parallel detection with timestamp-based result ordering - Reduce queue sizes to minimize latency (max_buffer=5, max_queue_size=5) - Remove redundant MoondreamVideoTrack and YOLOPoseVideoTrack classes - Reduce ThreadPoolExecutor max_workers from 10/24 to 2 - Fix indentation in roboflow_cloud_processor.py
1 parent 1e7b65f commit 1550422

File tree

10 files changed

+224
-299
lines changed

10 files changed

+224
-299
lines changed

plugins/moondream/tests/test_moondream.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121

2222
from vision_agents.plugins.moondream import (
2323
CloudDetectionProcessor,
24-
MoondreamVideoTrack,
2524
)
25+
from vision_agents.core.utils.video_track import QueuedVideoTrack
2626
from vision_agents.plugins.moondream.moondream_utils import annotate_detections
2727

2828

@@ -48,7 +48,7 @@ def test_processor_initialization():
4848
@pytest.mark.asyncio
4949
async def test_video_track_frame_queuing(sample_frame):
5050
"""Test that video track can queue and receive frames."""
51-
track = MoondreamVideoTrack()
51+
track = QueuedVideoTrack(fps=30, max_queue_size=5)
5252
await track.add_frame(sample_frame)
5353
received_frame = await track.recv()
5454
assert received_frame is not None
@@ -58,10 +58,10 @@ async def test_video_track_frame_queuing(sample_frame):
5858

5959

6060
def test_processor_publishes_track():
61-
"""Test that processor publishes a MoondreamVideoTrack."""
61+
"""Test that processor publishes a QueuedVideoTrack."""
6262
processor = CloudDetectionProcessor(api_key="test_key")
6363
track = processor.publish_video_track()
64-
assert isinstance(track, MoondreamVideoTrack)
64+
assert isinstance(track, QueuedVideoTrack)
6565
processor.close()
6666

6767

@@ -231,9 +231,9 @@ async def mock_inference(frame_array):
231231
# Process a frame
232232
await processor._process_and_add_frame(sample_frame)
233233

234-
# Verify results were stored
235-
assert hasattr(processor, "_last_results")
236-
assert "detections" in processor._last_results
234+
# Verify cached results were stored
235+
assert hasattr(processor, "_cached_results")
236+
assert "detections" in processor._cached_results
237237
processor.close()
238238

239239

plugins/moondream/vision_agents/plugins/moondream/__init__.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,6 @@
1111
from vision_agents.plugins.moondream.detection.moondream_local_processor import (
1212
LocalDetectionProcessor,
1313
)
14-
from vision_agents.plugins.moondream.detection.moondream_video_track import (
15-
MoondreamVideoTrack,
16-
)
1714
from vision_agents.plugins.moondream.vlm.moondream_cloud_vlm import CloudVLM
1815
from vision_agents.plugins.moondream.vlm.moondream_local_vlm import LocalVLM
1916

@@ -25,5 +22,4 @@
2522
"CloudVLM",
2623
"LocalVLM",
2724
"LocalDetectionProcessor",
28-
"MoondreamVideoTrack",
2925
]

plugins/moondream/vision_agents/plugins/moondream/detection/moondream_cloud_processor.py

Lines changed: 28 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919
annotate_detections,
2020
parse_detection_bbox,
2121
)
22-
from vision_agents.plugins.moondream.detection.moondream_video_track import (
23-
MoondreamVideoTrack,
24-
)
2522
from vision_agents.core.utils.video_forwarder import VideoForwarder
23+
from vision_agents.core.utils.video_track import QueuedVideoTrack
2624
import moondream as md
2725

2826

@@ -54,7 +52,7 @@ class CloudDetectionProcessor(
5452
detection_fps: Rate at which to send frames for detection (default: 5.0).
5553
Lower values reduce API calls while maintaining smooth video.
5654
interval: Processing interval in seconds (default: 0)
57-
max_workers: Number of worker threads for CPU-intensive operations (default: 10)
55+
max_workers: Number of worker threads for CPU-intensive operations (default: 2)
5856
"""
5957

6058
name = "moondream_cloud"
@@ -66,7 +64,7 @@ def __init__(
6664
detect_objects: Union[str, List[str]] = "person",
6765
detection_fps: float = 5.0,
6866
interval: int = 0,
69-
max_workers: int = 10,
67+
max_workers: int = 2,
7068
):
7169
super().__init__(interval=interval, receive_audio=False, receive_video=True)
7270

@@ -76,14 +74,9 @@ def __init__(
7674
self.max_workers = max_workers
7775
self._shutdown = False
7876

79-
# Initialize state tracking attributes
80-
self._last_results: Dict[str, Any] = {}
81-
self._last_frame_time: Optional[float] = None
82-
self._last_frame_pil: Optional[Image.Image] = None
83-
84-
# Async detection state
85-
self._detection_in_progress = False
77+
# Parallel detection state - track when results were requested to handle out-of-order completion
8678
self._last_detection_time: float = 0.0
79+
self._last_result_time: float = 0.0
8780
self._cached_results: Dict[str, Any] = {"detections": []}
8881

8982
# Font configuration constants for drawing efficiency
@@ -110,8 +103,8 @@ def __init__(
110103
max_workers=max_workers, thread_name_prefix="moondream_processor"
111104
)
112105

113-
# Video track for publishing (if used as video publisher)
114-
self._video_track: MoondreamVideoTrack = MoondreamVideoTrack()
106+
# Video track for publishing at 30 FPS with minimal buffering
107+
self._video_track: QueuedVideoTrack = QueuedVideoTrack(fps=30, max_queue_size=5)
115108
self._video_forwarder: Optional[VideoForwarder] = None
116109

117110
# Initialize model
@@ -145,10 +138,10 @@ async def process_video(
145138
self._process_and_add_frame, name="moondream"
146139
)
147140
else:
148-
# Create our own VideoForwarder at default FPS
141+
# Create our own VideoForwarder at default FPS with minimal buffering
149142
self._video_forwarder = VideoForwarder(
150143
incoming_track, # type: ignore[arg-type]
151-
max_buffer=30,
144+
max_buffer=5,
152145
name="moondream_forwarder",
153146
)
154147

@@ -225,24 +218,18 @@ async def _process_and_add_frame(self, frame: av.VideoFrame):
225218
frame_array = frame.to_ndarray(format="rgb24")
226219
now = asyncio.get_event_loop().time()
227220

228-
# Check if we should start a new detection
221+
# Check if we should start a new detection based on detection_fps
229222
detection_interval = (
230223
1.0 / self.detection_fps if self.detection_fps > 0 else float("inf")
231224
)
232-
should_detect = (
233-
not self._detection_in_progress
234-
and (now - self._last_detection_time) >= detection_interval
235-
)
225+
should_detect = (now - self._last_detection_time) >= detection_interval
236226

237227
if should_detect:
238-
# Start detection in background (don't await)
239-
self._detection_in_progress = True
228+
# Start detection in background (don't await) - runs in parallel
240229
self._last_detection_time = now
241-
asyncio.create_task(self._run_detection_background(frame_array.copy()))
242-
243-
# Always use cached results for annotation (don't wait for detection)
244-
self._last_frame_time = now
245-
self._last_frame_pil = Image.fromarray(frame_array)
230+
asyncio.create_task(
231+
self._run_detection_background(frame_array.copy(), now)
232+
)
246233

247234
# Annotate frame with cached detections
248235
if self._cached_results.get("detections"):
@@ -265,19 +252,23 @@ async def _process_and_add_frame(self, frame: av.VideoFrame):
265252
# Pass through original frame on error
266253
await self._video_track.add_frame(frame)
267254

268-
async def _run_detection_background(self, frame_array: np.ndarray):
269-
"""Run detection in background and update cached results."""
255+
async def _run_detection_background(
256+
self, frame_array: np.ndarray, request_time: float
257+
):
258+
"""Run detection in background and update cached results if newer."""
270259
try:
271260
results = await self._run_inference(frame_array)
272-
self._cached_results = results
273-
self._last_results = results
274-
logger.debug(
275-
f"🔍 Detection complete: {len(results.get('detections', []))} objects"
276-
)
261+
# Only update cache if this result is newer than current cached result
262+
if request_time > self._last_result_time:
263+
self._cached_results = results
264+
self._last_result_time = request_time
265+
logger.debug(
266+
f"🔍 Detection complete: {len(results.get('detections', []))} objects"
267+
)
268+
else:
269+
logger.debug("🔍 Detection complete but discarded (newer result exists)")
277270
except Exception as e:
278271
logger.warning(f"⚠️ Background detection failed: {e}")
279-
finally:
280-
self._detection_in_progress = False
281272

282273
def close(self):
283274
"""Clean up resources."""

plugins/moondream/vision_agents/plugins/moondream/detection/moondream_local_processor.py

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@
2424
annotate_detections,
2525
handle_device,
2626
)
27-
from vision_agents.plugins.moondream.detection.moondream_video_track import (
28-
MoondreamVideoTrack,
29-
)
27+
from vision_agents.core.utils.video_track import QueuedVideoTrack
3028

3129
logger = logging.getLogger(__name__)
3230

@@ -56,7 +54,7 @@ class LocalDetectionProcessor(
5654
detection_fps: Rate at which to run detection (default: 10.0).
5755
Lower values reduce CPU/GPU load while maintaining smooth video.
5856
interval: Processing interval in seconds (default: 0)
59-
max_workers: Number of worker threads for CPU-intensive operations (default: 10)
57+
max_workers: Number of worker threads for CPU-intensive operations (default: 2)
6058
force_cpu: If True, force CPU usage even if CUDA/MPS is available (default: False).
6159
Auto-detects CUDA, then MPS (Apple Silicon), then defaults to CPU. We recommend running on CUDA for best performance.
6260
model_name: Hugging Face model identifier (default: "moondream/moondream3-preview")
@@ -72,7 +70,7 @@ def __init__(
7270
detect_objects: Union[str, List[str]] = "person",
7371
detection_fps: float = 10.0,
7472
interval: int = 0,
75-
max_workers: int = 10,
73+
max_workers: int = 2,
7674
force_cpu: bool = False,
7775
model_name: str = "moondream/moondream3-preview",
7876
options: Optional[AgentOptions] = None,
@@ -94,13 +92,9 @@ def __init__(
9492
else:
9593
self._device, self._dtype = handle_device()
9694

97-
self._last_results: Dict[str, Any] = {}
98-
self._last_frame_time: Optional[float] = None
99-
self._last_frame_pil: Optional[Image.Image] = None
100-
101-
# Async detection state
102-
self._detection_in_progress = False
95+
# Parallel detection state - track when results were requested to handle out-of-order completion
10396
self._last_detection_time: float = 0.0
97+
self._last_result_time: float = 0.0
10498
self._cached_results: Dict[str, Any] = {"detections": []}
10599

106100
# Font configuration constants for drawing efficiency
@@ -122,8 +116,8 @@ def __init__(
122116
max_workers=max_workers, thread_name_prefix="moondream_local_processor"
123117
)
124118

125-
# Video track for publishing (if used as video publisher)
126-
self._video_track: MoondreamVideoTrack = MoondreamVideoTrack()
119+
# Video track for publishing at 30 FPS with minimal buffering
120+
self._video_track: QueuedVideoTrack = QueuedVideoTrack(fps=30, max_queue_size=5)
127121
self._video_forwarder: Optional[VideoForwarder] = None
128122

129123
# Model will be loaded in start() method
@@ -245,10 +239,10 @@ async def process_video(
245239
self._process_and_add_frame, name="moondream_local"
246240
)
247241
else:
248-
# Create our own VideoForwarder at default FPS
242+
# Create our own VideoForwarder at default FPS with minimal buffering
249243
self._video_forwarder = VideoForwarder(
250244
incoming_track, # type: ignore[arg-type]
251-
max_buffer=30,
245+
max_buffer=5,
252246
name="moondream_local_forwarder",
253247
)
254248

@@ -317,24 +311,18 @@ async def _process_and_add_frame(self, frame: av.VideoFrame):
317311
frame_array = frame.to_ndarray(format="rgb24")
318312
now = asyncio.get_event_loop().time()
319313

320-
# Check if we should start a new detection
314+
# Check if we should start a new detection based on detection_fps
321315
detection_interval = (
322316
1.0 / self.detection_fps if self.detection_fps > 0 else float("inf")
323317
)
324-
should_detect = (
325-
not self._detection_in_progress
326-
and (now - self._last_detection_time) >= detection_interval
327-
)
318+
should_detect = (now - self._last_detection_time) >= detection_interval
328319

329320
if should_detect:
330-
# Start detection in background (don't await)
331-
self._detection_in_progress = True
321+
# Start detection in background (don't await) - runs in parallel
332322
self._last_detection_time = now
333-
asyncio.create_task(self._run_detection_background(frame_array.copy()))
334-
335-
# Always use cached results for annotation (don't wait for detection)
336-
self._last_frame_time = now
337-
self._last_frame_pil = Image.fromarray(frame_array)
323+
asyncio.create_task(
324+
self._run_detection_background(frame_array.copy(), now)
325+
)
338326

339327
# Annotate frame with cached detections
340328
if self._cached_results.get("detections"):
@@ -356,19 +344,23 @@ async def _process_and_add_frame(self, frame: av.VideoFrame):
356344
logger.exception(f"❌ Frame processing failed: {e}")
357345
await self._video_track.add_frame(frame)
358346

359-
async def _run_detection_background(self, frame_array: np.ndarray):
360-
"""Run detection in background and update cached results."""
347+
async def _run_detection_background(
348+
self, frame_array: np.ndarray, request_time: float
349+
):
350+
"""Run detection in background and update cached results if newer."""
361351
try:
362352
results = await self._run_inference(frame_array)
363-
self._cached_results = results
364-
self._last_results = results
365-
logger.debug(
366-
f"🔍 Detection complete: {len(results.get('detections', []))} objects"
367-
)
353+
# Only update cache if this result is newer than current cached result
354+
if request_time > self._last_result_time:
355+
self._cached_results = results
356+
self._last_result_time = request_time
357+
logger.debug(
358+
f"🔍 Detection complete: {len(results.get('detections', []))} objects"
359+
)
360+
else:
361+
logger.debug("🔍 Detection complete but discarded (newer result exists)")
368362
except Exception as e:
369363
logger.warning(f"⚠️ Background detection failed: {e}")
370-
finally:
371-
self._detection_in_progress = False
372364

373365
def close(self):
374366
"""Clean up resources."""

0 commit comments

Comments
 (0)