-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
334 lines (303 loc) · 14.6 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
import cv2
from datetime import datetime
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
import os
import glob
from threading import Thread
class WebCam:
"""Class to monitor a specified area using a webcam and to send
email alerts based on detected movement within that area.
"""
# Duration threshold (2-minute) in SECONDS for staying in the camera view
DURATION_THRESHOLD = 5
def __init__(self, camera_index: int = 0):
"""Method to initialise an instance of the WebCam class. It sets
up the webcam device for capturing video and initializes various
attribute variables used for monitoring and motion detection within
the specified area.
Arguments:
- camera_index (int): The index of the camera device to be used for
video capture. The default value, 0, typically refers to the built-in
webcam on a laptop or the first external webcam detected.
"""
# Initialize the video capture on the specified camera index
self.video = cv2.VideoCapture(camera_index)
# Initialize first_frame to None, to be set on first loop of ...
# ...video capture
self.first_frame = None
# Initialize an empty list to track changes in motion detection ...
# ...status
self.status_list = []
# Initialize entry_time to None, to be set and updated when ...
# ...motion is first detected
self.entry_time = None
# Set the initial state of the email alert flag to False
self.alert_email_sent = False
def send_email(
self, entry_time: datetime, in_camera: bool,
attachment_filename: str):
"""Method to compose and send an email notification based on the
motion detection status within the monitored area.
Args:
- entry_time (datetime): The timestamp when the motion was
first detected.
- in_camera (bool): A flag indicating whether the object is in the
camera view or has left.
- attachment_filename (str): The file path of the image captured
during the detection, used as an email attachment.
"""
# Generate a unique incident ID based on the entry time
incident_id = "incident-" + entry_time.strftime("%Y%m%d-%H%M%S")
# Format the entry time to string
entry_time_str = entry_time.strftime("%b %d, %Y, at %I:%M:%S %p")
# Check if the detected object is in the camera view
if in_camera:
# Calculate the duration threshold in minutes
duration_threshold_mins = round(self.DURATION_THRESHOLD / 60)
# Define the email subject line for the security alert email
subject = (
f"Security Alert {incident_id}: Unusual Prolonged "
"Presence Detected"
)
# Read the content from the template for alert emails
with open(
"./email-body-templates/alert-email-html-body.txt", "r"
) as file:
body = file.read()
# Format the email template string to construct the HTML body ...
# ...of the alert email
body = body%(
entry_time_str,
duration_threshold_mins,
incident_id,
entry_time_str,
duration_threshold_mins,
)
else:
# Capture the current time as the exit time
exit_time = datetime.now()
# Format the exit time to string
exit_time_str = exit_time.strftime("%b %d, %Y, at %I:%M:%S %p")
# Define the email subject line for the security update email
subject = (
f"Security Update on {incident_id}: Detected Individual Has "
"Left the Monitored Area"
)
# Read the content from the template for update emails
with open(
"./email-body-templates/update-email-html-body.txt", "r"
) as file:
body = file.read()
# Format the email template string to construct the HTML body ...
# ...of the update email
body = body%(
incident_id,
entry_time_str,
exit_time_str,
round((exit_time - entry_time).seconds / 60),
)
# Set up Email account credentials and SMTP server details
sender_email = "[SENDER-EMAIL-ADDRESS]"
recipient_email = "[RECIPIENT-EMAIL-ADDRESS]"
# To generate an app password for this program, go to ...
# ...https://myaccount.google.com/security, search ...
# ..."App Passwords", and create one
sender_password = "[YOUR-APP-PASSWORD]"
smtp_server = "smtp.gmail.com"
# Port number for SSL
smtp_port = 465
path_to_attachment = attachment_filename
# Create a MIMEMultipart object to combine different parts ...
# ...of the email
message = MIMEMultipart()
message["Subject"] = subject
message["From"] = sender_email
message["To"] = recipient_email
# Attach the HTML body part to the email
body_part = MIMEText(body, "html")
message.attach(body_part)
# Check if the file name of attachment is provided
if attachment_filename:
# If so, attach the file to the email
with open(path_to_attachment, "rb") as file:
message.attach(
MIMEApplication(file.read(),
Name=attachment_filename)
)
# Login to the SMTP server and send the email
with smtplib.SMTP_SSL(smtp_server, smtp_port) as server:
server.login(sender_email, sender_password)
server.sendmail(sender_email,
recipient_email,
message.as_string())
# Print a console message confirming the email has been sent
if in_camera:
print(
f"{incident_id}: An alert email containing a captured image "
f"of the detected object has been sent to {recipient_email}."
)
else:
print(
f"{incident_id}: An update email has been sent to notify "
"that the detected individual has left the monitored area."
)
def clean_folder(self):
"""Method to clean up any temporary JPEG image files that were saved
during the monitoring and alerting process.
"""
# Find all JPEG files in the current directory
for image in glob.glob("*.jpg"):
# Delete each temporary image
os.remove(image)
def execute_concurrently_in_background(
self, target_process,
process_args: tuple = ()):
"""Execute a given process concurrently in a background thread
Args:
- target_process (function): The function to be executed in the
background.
- process_args (tuple, optional): Arguments to pass to the
target_process function.
"""
# Create a new thread to run the given function with its arguments
thread = Thread(target=target_process, args=process_args)
# Set the given function to be executed in the background
thread.daemon = True
# Start to execute the function concurrently in the background
thread.start()
def monitor(self):
"""
Continuously monitors the webcam feed to detect significant motion
and manage alerts.
This method captures frames from the webcam, processes them to detect
motion in real-time, and takes action based on the presence or
absence of significant motion. Actions include capturing images,
sending alert emails, and managing internal state related to motion
detection.
"""
# Start an infinite loop for continuous monitoring
while True:
# Initiate the detection status variable with 0 indicating ...
# ...no motion detected initially
status = 0
# Read the current frame from the video capture
_, frame = self.video.read()
# Convert the current frame to grayscale to simplify the ...
# ...image processing
gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Apply Gaussian Blur to smooth the current frame, reducing ...
# ...detail and noise
gray_frame_gau = cv2.GaussianBlur(gray_frame, (21, 21), 0)
# Initialize the first frame to be used as a baseline for ...
# ...motion detection
if self.first_frame is None:
self.first_frame = gray_frame_gau
# Append initial status to the status_list
self.status_list.append(status)
# Skip to process the next frame directly
continue
# Calculate the absolute difference between the current frame ...
# ...and the first frame
delta_frame = cv2.absdiff(self.first_frame, gray_frame_gau)
# Apply a threshold to the delta frame to raise the array value ...
# ...that is higher than 60 to 255. The resulted frame should ...
# ...only consist of either 0 (black) or 255 (white)
thresh_frame = cv2.threshold(delta_frame, 60, 255,
cv2.THRESH_BINARY)[1]
# Dilate the thresholded frame to fill in holes, making ...
# ...contours more detectable
dil_frame = cv2.dilate(thresh_frame, None, iterations=2)
# Find contours in the dilated frame which represent ...
# ...boundaries of moving objects
contours, _ = cv2.findContours(
dil_frame, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
# Iterate through each found contour
for contour in contours:
# Check the area of the contour to determine whether ...
# ...it's a target object
if cv2.contourArea(contour) >= 5000:
# If so, compute the attributes of the bounding ...
# ...box of the contour, including (x, y) of the ...
# ...top left corner, and width and height of the ...
# ...bounding box
x, y, w, h = cv2.boundingRect(contour)
# Draw a rectangle on the current original frame ...
# ...based on the bounding box attributes
cv2.rectangle(frame, (x, y), (x + w, y + h),
(0, 255, 0), 3)
# Set status to 1 to indicate motion detected
status = 1
# Check if there is a transition from no motion to motion detected
if (status == 1) and (self.status_list[-1] == 0):
# Record the current time as the entry time when motion is ...
# ...first detected
self.entry_time = datetime.now()
# Check if there is a transition from motion detected to no motion
if (status == 0) and (self.status_list[-1] == 1):
# Check if an alert email has already been sent for the ...
# ...detected motion
if self.alert_email_sent:
# Execute the send_email method concurrently in the ...
# ...background to send a security update email that ...
# ...notifies the object has left the monitored area
self.execute_concurrently_in_background(
target_process=self.send_email,
process_args=(self.entry_time, False, None),
)
# Reset the alert-email-sent flag back to False
self.alert_email_sent = False
# Reset the entry time back to None
self.entry_time = None
# Check if an object has been detected to enter the monitored area
if self.entry_time is not None:
# Capture the current time
current_time = datetime.now()
# Calculate the duration in seconds since the motion ...
# ...was first detected
duration_sec = (current_time - self.entry_time).seconds
# Compare the duration against the predefined threshold ...
# ...to determine if it is an extended presence
if duration_sec >= self.DURATION_THRESHOLD:
# If so, check if an alert email has not already been ...
# ...sent for this presence
if not self.alert_email_sent:
# Generate a filename for saving an image of the ...
# ...detected object using the current timestamp
file_name = "detected_object_{}.jpg".format(
current_time.strftime("%Y%m%d%H%M%S")
)
# Save the current frame as a JPEG file
cv2.imwrite(file_name, frame)
# Execute the send_email method concurrently in ...
# ...the background to send a security alert email ...
# ...that notifies users of the extended presence
self.execute_concurrently_in_background(
target_process=self.send_email,
process_args=(self.entry_time, True, file_name),
)
# Set the alert-email-sent flag to True to prevent ...
# ...sending duplicate alert emails
self.alert_email_sent = True
# Append the current status to the status list
self.status_list.append(status)
# Display the frame with detected motion rectangles
cv2.imshow("Monitoring video with rectangles", frame)
# Wait for a key press for interruption
key = cv2.waitKey(1)
# If 'q' is pressed, break the loop
if key == ord("q"):
break
# Release the video capture device
self.video.release()
# Clean up stored temporary images to free space
self.clean_folder()
# Check if this script is being run directly (and not imported as a module)
if __name__ == "__main__":
# Initialize the WebCam object
camera = WebCam(camera_index=0)
# Start the monitoring process
camera.monitor()