-
-
Notifications
You must be signed in to change notification settings - Fork 18
/
main.py
355 lines (282 loc) · 10.7 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
from __future__ import annotations
import argparse
import gc
import logging
import os
import re
import threading
import time
from datetime import datetime
from multiprocessing import Process
from typing import TypedDict
import cv2
import yaml
from dotenv import load_dotenv
from src.danger_detector import DangerDetector
from src.drawing_manager import DrawingManager
from src.live_stream_detection import LiveStreamDetector
from src.monitor_logger import LoggerConfig
from src.notifiers.line_notifier import LineNotifier
from src.stream_capture import StreamCapture
# Load environment variables
load_dotenv()
is_windows = os.name == 'nt'
if not is_windows:
import redis
from redis import Redis
# Redis configuration
redis_host: str = os.getenv('redis_host', 'localhost')
redis_port: int = int(os.getenv('redis_port', '6379'))
redis_password: str | None = os.getenv('redis_password', None)
# Connect to Redis
r: Redis = redis.StrictRedis(
host=redis_host,
port=redis_port,
password=redis_password,
decode_responses=True,
)
class StreamConfig(TypedDict):
video_url: str
model_key: str
label: str
image_name: str
line_token: str
run_local: bool
def main(
logger: logging.Logger,
video_url: str,
model_key: str = 'yolov8x',
label: str | None = None,
image_name: str = 'prediction_visual',
line_token: str | None = None,
run_local: bool = True,
language: str = 'en',
) -> None:
"""
Main function to detect hazards, notify, log, save images (optional).
Args:
logger (logging.Logger): A logger instance for logging messages.
video_url (str): The URL of the live stream to monitor.
label (Optional[str]): The label of image_name.
image_name (str, optional): Image file name for notifications.
Defaults to 'demo_data/{label}/prediction_visual.png'.
line_token (Optional[str]): The LINE token for sending notifications.
Defaults to None.
run_local (bool): Whether to run detection using a local model.
Defaults to True.
language (str): The language for the notifications.
"""
# Initialise the stream capture object
streaming_capture = StreamCapture(stream_url=video_url)
# Get the API URL from environment variables
api_url = os.getenv('API_URL', 'http://localhost:5000')
# Initialise the live stream detector
live_stream_detector = LiveStreamDetector(
api_url=api_url,
model_key=model_key,
output_folder=label,
run_local=run_local,
)
# Initialise the drawing manager
drawing_manager = DrawingManager(language=language)
# Initialise the LINE notifier
line_notifier = LineNotifier(line_token=line_token)
# Initialise the DangerDetector
danger_detector = DangerDetector(language=language)
# Init last_notification_time to 300s ago, no microseconds
last_notification_time = int(time.time()) - 300
# Use the generator function to process detections
for frame, timestamp in streaming_capture.execute_capture():
start_time = time.time()
# Convert UNIX timestamp to datetime object and format it as string
detection_time = datetime.fromtimestamp(
timestamp,
).strftime('%Y-%m-%d %H:%M:%S')
# Detect hazards in the frame
datas, _ = live_stream_detector.generate_detections(frame)
# Check for warnings and send notifications if necessary
warnings, controlled_zone_polygon = danger_detector.detect_danger(
datas,
)
# Draw the detections on the frame
frame_with_detections = (
drawing_manager.draw_detections_on_frame(
frame, controlled_zone_polygon, datas,
)
)
# Save the frame with detections
# save_file_name = f'{label}_{image_name}_{detection_time}'
# drawing_manager.save_frame(
# frame_with_detections,
# save_file_name
# )
# Convert the frame to a byte array
_, buffer = cv2.imencode('.png', frame_with_detections)
frame_bytes = buffer.tobytes()
# Log the detection results
logger.info(f"{label} - {image_name}")
logger.info(f"Detection time: {detection_time}")
# Get the current hour
current_hour = datetime.now().hour
# Check if it is outside the specified time range
# and if warnings contaim a warning for people in the controlled zone
if (timestamp - last_notification_time) > 300:
# Check if there is a warning for people in the controlled zone
controlled_zone_warning_template = danger_detector.get_text(
'warning_people_in_controlled_area', count='',
)
# Create a regex pattern to match the warning
pattern = re.escape(controlled_zone_warning_template).replace(
re.escape('{count}'), r'\d+',
)
controlled_zone_warning = next(
(
warning for warning in warnings
if re.match(pattern, warning)
),
None,
)
# If it is outside working hours and there is
# a warning for people in the controlled zone
if controlled_zone_warning and not (7 <= current_hour < 18):
message = (
f"{image_name}\n[{detection_time}]\n"
f"{controlled_zone_warning}"
)
elif warnings and (7 <= current_hour < 18):
# During working hours, combine all warnings
message = (
f"{image_name}\n[{detection_time}]\n"
+ '\n'.join(warnings)
)
else:
message = None
# If a notification needs to be sent
if message:
notification_status = line_notifier.send_notification(
message, image=frame_bytes
if frame_bytes is not None
else None,
)
# If you want to connect to the broadcast system, do it here:
# broadcast_status = (
# broadcast_notifier.broadcast_message(message)
# )
# logger.info(f"Broadcast status: {broadcast_status}")
if notification_status == 200:
logger.info(
f"Notification sent successfully: {message}",
)
else:
logger.error(f"Failed to send notification: {message}")
# Update the last notification time
last_notification_time = int(timestamp)
else:
logger.info('No warnings or outside notification time.')
if not is_windows:
try:
# Use a unique key for each thread or process
key = f"{label}_{image_name}".encode()
# Store the frame in Redis
r.set(key, frame_bytes)
except Exception as e:
logger.error(f"Failed to store frame in Redis: {e}")
end_time = time.time()
# Calculate the processing time
processing_time = end_time - start_time
# Update the capture interval based on the processing time
new_interval = int(processing_time) + 5
streaming_capture.update_capture_interval(new_interval)
# Log the processing time
logger.info(f"Processing time: {processing_time:.2f} seconds")
# Clear variables to free up memory
del datas, frame, timestamp, detection_time
del frame_with_detections, buffer, frame_bytes
gc.collect()
# Release resources after processing
streaming_capture.release_resources()
gc.collect()
def process_stream(config: StreamConfig) -> None:
"""
Process a video stream based on the given configuration.
Args:
config (StreamConfig): The configuration for the stream processing.
Returns:
None
"""
# Load the logger configuration
logger_config = LoggerConfig()
# Initialise the logger
logger = logger_config.get_logger()
try:
# Run hazard detection on a single video stream
main(logger, **config)
finally:
if not is_windows:
label = config.get('label')
image_name = config.get('image_name', 'prediction_visual')
key = f"{label}_{image_name}"
r.delete(key)
logger.info(f"Deleted Redis key: {key}")
def start_process(config: StreamConfig) -> Process:
"""
Start a new process for processing a video stream.
Args:
config (StreamConfig): The configuration for the stream processing.
Returns:
Process: The newly started process.
"""
p = Process(target=process_stream, args=(config,))
p.start()
return p
def stop_process(process: Process) -> None:
"""
Stop a running process.
Args:
process (Process): The process to be terminated.
Returns:
None
"""
process.terminate()
process.join()
def run_multiple_streams(config_file: str) -> None:
"""
Manage multiple video streams based on a config file.
Args:
config_file (str): The path to the YAML configuration file.
Returns:
None
"""
running_processes: dict[str, Process] = {}
lock = threading.Lock()
while True:
with open(config_file, encoding='utf-8') as file:
configurations = yaml.safe_load(file)
current_configs = {
config['video_url']: config for config in configurations
}
with lock:
# Stop processes for removed configurations
for video_url in list(running_processes.keys()):
if video_url not in current_configs:
print(f"Stop workflow: {video_url}")
stop_process(running_processes[video_url])
del running_processes[video_url]
# Start processes for new configurations
for video_url, config in current_configs.items():
if video_url not in running_processes:
print(f"Launch new workflow: {video_url}")
running_processes[video_url] = start_process(config)
time.sleep(3600)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Run hazard detection on multiple video streams.',
)
parser.add_argument(
'--config',
type=str,
default='config/configuration.yaml',
help='Configuration file path',
)
args = parser.parse_args()
run_multiple_streams(args.config)