Skip to content

Commit

Permalink
saving video output in stream is enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
Mehrab-Shahbazi committed Jan 1, 2025
1 parent 33a502f commit 2f9ef19
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 121 deletions.
137 changes: 71 additions & 66 deletions deepface/DeepFace.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,18 @@ def build_model(model_name: str, task: str = "facial_recognition") -> Any:


def verify(
img1_path: Union[str, np.ndarray, List[float]],
img2_path: Union[str, np.ndarray, List[float]],
model_name: str = "VGG-Face",
detector_backend: str = "opencv",
distance_metric: str = "cosine",
enforce_detection: bool = True,
align: bool = True,
expand_percentage: int = 0,
normalization: str = "base",
silent: bool = False,
threshold: Optional[float] = None,
anti_spoofing: bool = False,
img1_path: Union[str, np.ndarray, List[float]],
img2_path: Union[str, np.ndarray, List[float]],
model_name: str = "VGG-Face",
detector_backend: str = "opencv",
distance_metric: str = "cosine",
enforce_detection: bool = True,
align: bool = True,
expand_percentage: int = 0,
normalization: str = "base",
silent: bool = False,
threshold: Optional[float] = None,
anti_spoofing: bool = False,
) -> Dict[str, Any]:
"""
Verify if an image pair represents the same person or different persons.
Expand Down Expand Up @@ -164,14 +164,14 @@ def verify(


def analyze(
img_path: Union[str, np.ndarray],
actions: Union[tuple, list] = ("emotion", "age", "gender", "race"),
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
silent: bool = False,
anti_spoofing: bool = False,
img_path: Union[str, np.ndarray],
actions: Union[tuple, list] = ("emotion", "age", "gender", "race"),
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
silent: bool = False,
anti_spoofing: bool = False,
) -> List[Dict[str, Any]]:
"""
Analyze facial attributes such as age, gender, emotion, and race in the provided image.
Expand Down Expand Up @@ -263,20 +263,20 @@ def analyze(


def find(
img_path: Union[str, np.ndarray],
db_path: str,
model_name: str = "VGG-Face",
distance_metric: str = "cosine",
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
threshold: Optional[float] = None,
normalization: str = "base",
silent: bool = False,
refresh_database: bool = True,
anti_spoofing: bool = False,
batched: bool = False,
img_path: Union[str, np.ndarray],
db_path: str,
model_name: str = "VGG-Face",
distance_metric: str = "cosine",
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
threshold: Optional[float] = None,
normalization: str = "base",
silent: bool = False,
refresh_database: bool = True,
anti_spoofing: bool = False,
batched: bool = False,
) -> Union[List[pd.DataFrame], List[List[Dict[str, Any]]]]:
"""
Identify individuals in a database
Expand Down Expand Up @@ -369,15 +369,15 @@ def find(


def represent(
img_path: Union[str, np.ndarray],
model_name: str = "VGG-Face",
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
normalization: str = "base",
anti_spoofing: bool = False,
max_faces: Optional[int] = None,
img_path: Union[str, np.ndarray],
model_name: str = "VGG-Face",
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
normalization: str = "base",
anti_spoofing: bool = False,
max_faces: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""
Represent facial images as multi-dimensional vector embeddings.
Expand Down Expand Up @@ -441,15 +441,16 @@ def represent(


def stream(
db_path: str = "",
model_name: str = "VGG-Face",
detector_backend: str = "opencv",
distance_metric: str = "cosine",
enable_face_analysis: bool = True,
source: Any = 0,
time_threshold: int = 5,
frame_threshold: int = 5,
anti_spoofing: bool = False,
db_path: str = "",
model_name: str = "VGG-Face",
detector_backend: str = "opencv",
distance_metric: str = "cosine",
enable_face_analysis: bool = True,
source: Any = 0,
time_threshold: int = 5,
frame_threshold: int = 5,
anti_spoofing: bool = False,
output_path: Optional[str] = None, # New parameter
) -> None:
"""
Run real time face recognition and facial attribute analysis
Expand Down Expand Up @@ -478,6 +479,9 @@ def stream(
frame_threshold (int): The frame threshold for face recognition (default is 5).
anti_spoofing (boolean): Flag to enable anti spoofing (default is False).
output_path (str): Path to save the output video. If None, no video is saved.
Returns:
None
"""
Expand All @@ -495,19 +499,20 @@ def stream(
time_threshold=time_threshold,
frame_threshold=frame_threshold,
anti_spoofing=anti_spoofing,
output_path=output_path, # Pass the output_path to analysis
)


def extract_faces(
img_path: Union[str, np.ndarray],
detector_backend: str = "opencv",
enforce_detection: bool = True,
align: bool = True,
expand_percentage: int = 0,
grayscale: bool = False,
color_face: str = "rgb",
normalize_face: bool = True,
anti_spoofing: bool = False,
img_path: Union[str, np.ndarray],
detector_backend: str = "opencv",
enforce_detection: bool = True,
align: bool = True,
expand_percentage: int = 0,
grayscale: bool = False,
color_face: str = "rgb",
normalize_face: bool = True,
anti_spoofing: bool = False,
) -> List[Dict[str, Any]]:
"""
Extract faces from a given image
Expand Down Expand Up @@ -584,11 +589,11 @@ def cli() -> None:


def detectFace(
img_path: Union[str, np.ndarray],
target_size: tuple = (224, 224),
detector_backend: str = "opencv",
enforce_detection: bool = True,
align: bool = True,
img_path: Union[str, np.ndarray],
target_size: tuple = (224, 224),
detector_backend: str = "opencv",
enforce_detection: bool = True,
align: bool = True,
) -> Union[np.ndarray, None]:
"""
Deprecated face detection function. Use extract_faces for same functionality.
Expand Down
99 changes: 44 additions & 55 deletions deepface/modules/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,42 +34,29 @@ def analysis(
time_threshold=5,
frame_threshold=5,
anti_spoofing: bool = False,
output_path: Optional[str] = None, # New parameter
):
"""
Run real time face recognition and facial attribute analysis
Run real-time face recognition and facial attribute analysis, with optional video output.
Args:
db_path (string): Path to the folder containing image files. All detected faces
in the database will be considered in the decision-making process.
model_name (str): Model for face recognition. Options: VGG-Face, Facenet, Facenet512,
OpenFace, DeepFace, DeepID, Dlib, ArcFace, SFace and GhostFaceNet (default is VGG-Face)
detector_backend (string): face detector backend. Options: 'opencv', 'retinaface',
'mtcnn', 'ssd', 'dlib', 'mediapipe', 'yolov8', 'yolov11n', 'yolov11s', 'yolov11m',
'centerface' or 'skip' (default is opencv).
distance_metric (string): Metric for measuring similarity. Options: 'cosine',
'euclidean', 'euclidean_l2' (default is cosine).
enable_face_analysis (bool): Flag to enable face analysis (default is True).
source (Any): The source for the video stream (default is 0, which represents the
default camera).
time_threshold (int): The time threshold (in seconds) for face recognition (default is 5).
frame_threshold (int): The frame threshold for face recognition (default is 5).
anti_spoofing (boolean): Flag to enable anti spoofing (default is False).
db_path (str): Path to the folder containing image files.
model_name (str): Model for face recognition.
detector_backend (str): Face detector backend.
distance_metric (str): Metric for measuring similarity.
enable_face_analysis (bool): Flag to enable face analysis.
source (Any): The source for the video stream (camera index or video file path).
time_threshold (int): Time threshold (in seconds) for face recognition.
frame_threshold (int): Frame threshold for face recognition.
anti_spoofing (bool): Flag to enable anti-spoofing.
output_path (str): Path to save the output video. If None, no video is saved.
Returns:
None
"""
# initialize models
# Initialize models
build_demography_models(enable_face_analysis=enable_face_analysis)
build_facial_recognition_model(model_name=model_name)
# call a dummy find function for db_path once to create embeddings before starting webcam
_ = search_identity(
detected_face=np.zeros([224, 224, 3]),
db_path=db_path,
Expand All @@ -78,35 +65,40 @@ def analysis(
model_name=model_name,
)

cap = cv2.VideoCapture(source if isinstance(source, str) else int(source))
if not cap.isOpened():
logger.error(f"Cannot open video source: {source}")
return

# Get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*"mp4v") # Codec for output file

# Initialize video writer if output_path is provided
video_writer = None
if output_path:
video_writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

freezed_img = None
freeze = False
num_frames_with_faces = 0
tic = time.time()

# If source is an integer, use it as a webcam index. Otherwise, treat it as a video file path.
if isinstance(source, int):
cap = cv2.VideoCapture(source) # webcam
else:
cap = cv2.VideoCapture(str(source)) # video file
while True:
has_frame, img = cap.read()
if not has_frame:
break

# we are adding some figures into img such as identified facial image, age, gender
# that is why, we need raw image itself to make analysis
raw_img = img.copy()

faces_coordinates = []
if freeze is False:

if not freeze:
faces_coordinates = grab_facial_areas(
img=img, detector_backend=detector_backend, anti_spoofing=anti_spoofing
)

# we will pass img to analyze modules (identity, demography) and add some illustrations
# that is why, we will not be able to extract detected face from img clearly
detected_faces = extract_facial_areas(img=img, faces_coordinates=faces_coordinates)

img = highlight_facial_areas(img=img, faces_coordinates=faces_coordinates)
img = countdown_to_freeze(
img=img,
Expand All @@ -116,22 +108,18 @@ def analysis(
)

num_frames_with_faces = num_frames_with_faces + 1 if len(faces_coordinates) else 0

freeze = num_frames_with_faces > 0 and num_frames_with_faces % frame_threshold == 0

if freeze:
# add analyze results into img - derive from raw_img
img = highlight_facial_areas(
img=raw_img, faces_coordinates=faces_coordinates, anti_spoofing=anti_spoofing
)

# age, gender and emotion analysis
img = perform_demography_analysis(
enable_face_analysis=enable_face_analysis,
img=raw_img,
faces_coordinates=faces_coordinates,
detected_faces=detected_faces,
)
# facial recogntion analysis
img = perform_facial_recognition(
img=img,
faces_coordinates=faces_coordinates,
Expand All @@ -141,30 +129,31 @@ def analysis(
distance_metric=distance_metric,
model_name=model_name,
)

# freeze the img after analysis
freezed_img = img.copy()

# start counter for freezing
tic = time.time()
logger.info("freezed")
logger.info("Image frozen for analysis")

elif freeze is True and time.time() - tic > time_threshold:
elif freeze and time.time() - tic > time_threshold:
freeze = False
freezed_img = None
# reset counter for freezing
tic = time.time()
logger.info("freeze released")
logger.info("Freeze released")

freezed_img = countdown_to_release(img=freezed_img, tic=tic, time_threshold=time_threshold)
display_img = img if freezed_img is None else freezed_img

cv2.imshow("img", img if freezed_img is None else freezed_img)
# Save the frame to output video if writer is initialized
if video_writer:
video_writer.write(display_img)

if cv2.waitKey(1) & 0xFF == ord("q"): # press q to quit
cv2.imshow("img", display_img)
if cv2.waitKey(1) & 0xFF == ord("q"):
break

# kill open cv things
# Release resources
cap.release()
if video_writer:
video_writer.release()
cv2.destroyAllWindows()


Expand Down

0 comments on commit 2f9ef19

Please sign in to comment.