-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patharducam_polling.py
129 lines (99 loc) · 3.84 KB
/
arducam_polling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""Arducam polling testbench"""
# Add parent directory to the Python path
import os, sys
sys.path.append(os.path.normpath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', "src")))
from constants import VISIBLE_SHAPE, STREAM_UDP_PORT
from misc.frame_event import NewFrameEvent
from multiprocessing import Array, Queue
from misc.monitor import MonitorClient
from ctypes import c_uint8
from misc.logs import *
import numpy as np
import logging
import cv2
from arducam import Arducam
# from stubs.arducam_random import Arducam
# from stubs.arducam_webcam import Arducam
def main():
# Configure logger
configure_main_log(False, True)
# Create logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Create queue for workers to log to
logging_queue = Queue(10)
# Create image array in shared memory
dummy = np.ndarray(shape=VISIBLE_SHAPE, dtype='uint8')
mem = Array(c_uint8, dummy.nbytes, lock=True)
# Create numpy array backed by shared memory
frame_src = np.ndarray(shape=VISIBLE_SHAPE, dtype='uint8', buffer=mem.get_obj())
# Create array for us to copy to
frame = np.empty_like(frame_src)
# Create master event object for new frames
new_frame_parent = NewFrameEvent()
# Get a different child event for each process that reads frame data
new_frame_child = new_frame_parent.get_child()
# Instantiate launchers
cam = Arducam()
# Open client monitor
monitor = MonitorClient(12352)
cam.streaming_ports.append(12352)
# Use this for testing visible livestreaming
cam.streaming_ports.append(STREAM_UDP_PORT)
# Create window to display frame
cv2.namedWindow("memory", cv2.WINDOW_NORMAL)
cv2.namedWindow("monitor", cv2.WINDOW_NORMAL)
try:
# Start thread to emit worker log messages
logging_thread = QueueListener(logging_queue)
logging_thread.start()
running = False
while True:
# Check log listener status
if not logging_thread.running():
logger.warning("Log listener died. Restarting...")
logging_thread.start()
# Check worker status
if (cam.running() != running):
ret = cam.handle_exceptions()
assert ret, "Arducam polling process not recoverable"
logger.warning("Attempting to restart Arducam process")
cam.start(mem, new_frame_parent, logging_queue)
# Check for new frame in memort
if running and new_frame_child.is_set():
new_frame_child.clear()
# Copy frame from shared memory
if not mem.get_lock().acquire(timeout=0.5): continue
np.copyto(frame, frame_src)
mem.get_lock().release()
# Show frame
cv2.imshow("memory", frame)
# Show output on debug monitor
ret, monitor_frame = monitor.read()
if ret:
cv2.imshow("monitor", monitor_frame)
# Handle commands
k = cv2.waitKey(10)
if k == ord('p'):
logger.info("stopping workers")
running = False
cam.stop()
assert cam.handle_exceptions(), "Arducam shutdown failed"
elif k == ord('s'):
logger.info("starting worker")
running = True
cam.start(mem, new_frame_parent, logging_queue)
elif k == ord('q'):
raise KeyboardInterrupt
except KeyboardInterrupt:
logger.info("quitting")
except:
logger.exception("")
finally:
monitor.stop()
cam.stop()
logging_thread.stop()
logger.info("test ended")
if __name__ == '__main__':
main()
cv2.destroyAllWindows()