-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdebug_monitor.py
89 lines (66 loc) · 2.29 KB
/
debug_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""Tests the debug monitor module"""
# Add parent directory to the Python path
import os.path as path
import sys
sys.path.append(path.normpath(path.join(path.dirname(path.abspath(__file__)), '..', "src")))
import multiprocessing as mp
from misc.monitor import *
import time
import cv2
def stream_frames(monitor_server, stop):
# Open the camera
cap = cv2.VideoCapture(0)
assert cap.isOpened()
try:
while not stop.is_set():
ret, frame = cap.read()
assert ret, "Bad frame"
# frame = np.random.randint(0, 255, (480, 640, 3), dtype='uint8')
# Show the frame using MonitorServer
monitor_server.show(frame, 12347)
# Simulate processing time
time.sleep(10e-3)
else:
raise KeyboardInterrupt
except BaseException as err:
cap.release()
if type(err) != type(KeyboardInterrupt): raise
def main():
try:
# Create a MonitorServer
monitor_server = MonitorServer()
# Create a MonitorClient
monitor_client = MonitorClient(12347) # Same port as MonitorServer
# Suspend event
stop = mp.Event()
# Create a multiprocessing process
process = mp.Process(target=stream_frames, args=(monitor_server, stop,))
# Start the process
process.start()
time.sleep(10)
# Create window
cv2.namedWindow("monitor", cv2.WINDOW_NORMAL)
# Read frames from MonitorClient
while True:
success, frame = monitor_client.read() # 1 second timeout
if not success:
continue
# Display the received frame
cv2.imshow("monitor", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
raise KeyboardInterrupt
except:
# Stop the client/server sockets
monitor_client.stop()
monitor_server.stop()
# Stop the process
stop.set()
try: process.join(timeout=3)
except TimeoutError:
print("Process did not terminate gracefully")
process.terminate()
# Close OpenCV windows
cv2.destroyAllWindows()
raise
if __name__ == "__main__":
main()