Skip to content

Commit 73350a3

Browse files
committed
test(vision): add unit test for camera reopen on parameter updates (backend and camera_index) using mocked VideoCapture/YOLO/threads
1 parent 4296217 commit 73350a3

File tree

1 file changed

+102
-0
lines changed

1 file changed

+102
-0
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import types
2+
import numpy as np
3+
import rclpy
4+
from rclpy.parameter import Parameter
5+
6+
7+
class _DummyThread:
8+
def __init__(self, *args, **kwargs):
9+
pass
10+
11+
def start(self):
12+
# Do not start background work in tests
13+
return None
14+
15+
def join(self, *args, **kwargs):
16+
return None
17+
18+
19+
class _FakeCap:
20+
instances = []
21+
22+
def __init__(self, src, backend=None):
23+
self.src = src
24+
self.backend = backend
25+
self._opened = True
26+
self.props = {}
27+
_FakeCap.instances.append(self)
28+
29+
def isOpened(self):
30+
return self._opened
31+
32+
def set(self, prop, value):
33+
self.props[prop] = value
34+
return True
35+
36+
def get(self, prop):
37+
return self.props.get(prop, 0)
38+
39+
def read(self):
40+
# Return a small dummy frame
41+
frame = np.zeros((10, 10, 3), dtype=np.uint8)
42+
return True, frame
43+
44+
def release(self):
45+
self._opened = False
46+
47+
48+
class _FakeYOLO:
49+
def __init__(self, name):
50+
self.names = ['person', 'bottle', 'cup']
51+
52+
def to(self, device):
53+
return self
54+
55+
def half(self):
56+
return self
57+
58+
def __call__(self, frame, verbose=False, conf=0.5):
59+
# Minimal result with no detections
60+
r = types.SimpleNamespace()
61+
r.boxes = []
62+
return [r]
63+
64+
65+
def test_camera_reopen_on_param_update(monkeypatch):
66+
# Patch heavy/IO dependencies
67+
import cv2
68+
import ultralytics
69+
monkeypatch.setattr(cv2, 'VideoCapture', _FakeCap)
70+
monkeypatch.setattr(ultralytics, 'YOLO', _FakeYOLO)
71+
monkeypatch.setattr('threading.Thread', _DummyThread)
72+
73+
rclpy.init()
74+
try:
75+
from robo_pointer_visual.vision_node import VisionNode
76+
node = VisionNode()
77+
try:
78+
# Initially one VideoCapture instance created on first _configure_camera call
79+
# (not invoked automatically because we stubbed threads), so call it manually
80+
node._configure_camera()
81+
initial_instances = len(_FakeCap.instances)
82+
83+
# Update backend -> should trigger reopen
84+
res = node._on_set_parameters([
85+
Parameter('camera_backend', Parameter.Type.STRING, 'v4l2'),
86+
])
87+
assert res.successful
88+
assert len(_FakeCap.instances) >= initial_instances + 1
89+
90+
# Update camera_index -> should also trigger reopen and change source
91+
res = node._on_set_parameters([
92+
Parameter('camera_index', Parameter.Type.STRING, '/dev/camera_robot'),
93+
])
94+
assert res.successful
95+
assert node.camera_capture_source == '/dev/camera_robot'
96+
assert len(_FakeCap.instances) >= initial_instances + 2
97+
finally:
98+
node.destroy_node()
99+
finally:
100+
if rclpy.ok():
101+
rclpy.shutdown()
102+

0 commit comments

Comments
 (0)