Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ streamlined and repeatable process to monitor signs and signals along any roadwa
* Example CSV templates are provided to help get started making the static roadway object input file for both static objects and traffic signals.
* Video Synchronization Helper Tools: Options are provided to export the video frames and help to synchronize the video file.
* Image Labeling and animated GIF image tools: Selectable options are included to label images or create an animated GIF from multiple images.
* Extracted images now embed GPS coordinates in their EXIF metadata for easier mapping.

## Requirements
- Python 3.9
Expand Down Expand Up @@ -146,6 +147,7 @@ For Each GPX Point:
* If no, go to next GPX point

From the sight distance timestamp and synchronized video file, the frame is extracted that is closest to that time.
The resulting image file contains GPS EXIF information so coordinates can be viewed in external photo applications.

## Contributions
Contributions are welcome to the SSOSS project! If you have an idea for a new feature or have found a bug, please open an issue or submit a pull request.
Expand Down
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ tqdm
lxml
pillow
python-dateutil
piexif

30 changes: 16 additions & 14 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,43 @@
# This file is autogenerated by pip-compile with Python 3.11
# by the following command:
#
# pip-compile
# pip-compile requirements.in
#
geographiclib==2.0
# via geopy
geopy==2.4.0
geopy==2.4.1
# via -r requirements.in
gpxpy==1.5.0
gpxpy==1.6.2
# via -r requirements.in
imageio==2.31.5
imageio==2.37.0
# via -r requirements.in
lxml==4.9.3
lxml==5.4.0
# via -r requirements.in
numpy==1.26.0
numpy==2.3.0
# via
# -r requirements.in
# imageio
# opencv-python
# pandas
opencv-python==4.8.1.78
opencv-python==4.11.0.86
# via -r requirements.in
pandas==2.1.1
pandas==2.3.0
# via -r requirements.in
pillow==10.0.1
piexif==1.1.3
# via -r requirements.in
pillow==11.2.1
# via
# -r requirements.in
# imageio
python-dateutil==2.8.2
python-dateutil==2.9.0.post0
# via
# -r requirements.in
# pandas
pytz==2023.3.post1
pytz==2025.2
# via pandas
six==1.16.0
six==1.17.0
# via python-dateutil
tqdm==4.66.1
tqdm==4.67.1
# via -r requirements.in
tzdata==2023.3
tzdata==2025.2
# via pandas
26 changes: 26 additions & 0 deletions src/ssoss/process_road_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,32 @@ def get_speed_at_timestamp(self, ts):
break
return speed

def get_location_at_timestamp(self, ts):
"""Interpolate latitude and longitude for a given timestamp."""
point_list = self.gpx_listDF
last_point = len(point_list) - 1

if ts < point_list.loc[0][0].get_timestamp():
return None
if ts > point_list.loc[last_point][0].get_timestamp():
return None

for i in range(len(point_list) - 1):
p0 = point_list.loc[i][0]
p1 = point_list.loc[i + 1][0]
t0 = p0.get_timestamp()
t1 = p1.get_timestamp()
if t0 <= ts <= t1:
if t1 == t0:
return p0.get_location().latitude, p0.get_location().longitude
ratio = (ts - t0) / (t1 - t0)
lat0, lon0 = p0.get_location().latitude, p0.get_location().longitude
lat1, lon1 = p1.get_location().latitude, p1.get_location().longitude
lat = lat0 + (lat1 - lat0) * ratio
lon = lon0 + (lon1 - lon0) * ratio
return lat, lon
return None




Expand Down
78 changes: 65 additions & 13 deletions src/ssoss/process_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,32 @@
from tqdm import tqdm
import cv2
import imageio
from PIL import Image
import piexif


def _deg_to_dms_rational(deg_float):
deg = int(abs(deg_float))
min_float = (abs(deg_float) - deg) * 60
minute = int(min_float)
sec = int(round((min_float - minute) * 60 * 10000))
return ((deg, 1), (minute, 1), (sec, 10000))


def add_gps_exif(path, lat, lon):
"""Insert GPS EXIF tags into an image file."""
if lat is None or lon is None:
return
try:
exif_dict = {"0th": {}, "Exif": {}, "GPS": {}, "1st": {}, "thumbnail": None}
exif_dict["GPS"][piexif.GPSIFD.GPSLatitudeRef] = b"N" if lat >= 0 else b"S"
exif_dict["GPS"][piexif.GPSIFD.GPSLatitude] = _deg_to_dms_rational(lat)
exif_dict["GPS"][piexif.GPSIFD.GPSLongitudeRef] = b"E" if lon >= 0 else b"W"
exif_dict["GPS"][piexif.GPSIFD.GPSLongitude] = _deg_to_dms_rational(lon)
exif_bytes = piexif.dump(exif_dict)
piexif.insert(exif_bytes, str(path))
except Exception as exc:
print(f"Error adding GPS EXIF to {path}: {exc}")


class ProcessVideo:
Expand Down Expand Up @@ -87,9 +113,10 @@ def sync(self, frame: int, ts):
return None

def create_pic_list_from_zip(self, i_desc_timestamps):
"""returns sight distance description text and frame of video to extract as 2 lists"""
"""Return lists of descriptions, frames, and timestamps for extraction."""
intersection_desc = []
frames = []
timestamps = []
prev_frame = 0
filename_description, time_of_sd = zip(*i_desc_timestamps)

Expand All @@ -98,13 +125,13 @@ def create_pic_list_from_zip(self, i_desc_timestamps):
if time_of_picture > 0 and time_of_picture <= self.get_duration():
frame_of_video = time_of_picture * self.fps

# build up lists if not duplicate frame
if int(frame_of_video) > int(prev_frame):
intersection_desc.append(filename_description[sd_item])
frames.append(int(frame_of_video))
timestamps.append(time_of_sd[sd_item])
prev_frame = frame_of_video

return intersection_desc, frames
return intersection_desc, frames, timestamps

def save_frame_ffmpeg(self, frame_number: int, output_path: Path) -> None:
"""Save a specific frame quickly using ffmpeg."""
Expand Down Expand Up @@ -135,7 +162,8 @@ def extract_generic_so_sightings(
project: instance of ProcessRoadObjects() class
"""

generic_so_desc, extract_frames = self.create_pic_list_from_zip(desc_timestamps)
generic_so_desc, extract_frames, ts_list = self.create_pic_list_from_zip(desc_timestamps)
gps_list = [project.get_location_at_timestamp(ts) for ts in ts_list]
image_path = Path(
self.video_dir,
"out",
Expand All @@ -144,14 +172,16 @@ def extract_generic_so_sightings(
)
image_path.mkdir(exist_ok=True, parents=True)

for desc, frame_num in tqdm(
list(zip(generic_so_desc, extract_frames)),
for desc, frame_num, gps in tqdm(
list(zip(generic_so_desc, extract_frames, gps_list)),
desc="Frame Extraction",
unit=" frame",
):
frame_name = str(desc) + ".jpg"
frame_filepath = image_path / frame_name
self.save_frame_ffmpeg(frame_num, frame_filepath)
if gps:
add_gps_exif(frame_filepath, gps[0], gps[1])
print(
f"PICTURE CAPTURED AT {frame_num}: {desc}, Saved {generic_so_desc.index(desc) + 1} picture(s) of {len(extract_frames)}"
)
Expand All @@ -166,30 +196,33 @@ def extract_sightings(
):
"""Extract sighting images from a video."""

intersection_desc, extract_frames = self.create_pic_list_from_zip(
intersection_desc, extract_frames, ts_list = self.create_pic_list_from_zip(
desc_timestamps
)
gps_list = [project.get_location_at_timestamp(ts) for ts in ts_list]
image_path = Path(
self.video_dir, "out", self.video_filepath.stem, "signal_sightings/"
)
image_path.mkdir(exist_ok=True, parents=True)

self._save_frames(intersection_desc, extract_frames, image_path)
self._save_frames(intersection_desc, extract_frames, image_path, gps_list)

if label_img:
self.img_overlay_info_box(self.video_filename, project)
if gen_gif:
self.generate_gif(desc_timestamps, project)

def _save_frames(self, descriptions, frames, image_path: Path) -> None:
def _save_frames(self, descriptions, frames, image_path: Path, gps_list=None) -> None:
"""Save frames described by ``descriptions`` and ``frames`` to disk."""

for desc, frame_num in tqdm(
for idx, (desc, frame_num) in enumerate(tqdm(
list(zip(descriptions, frames)), desc="Frame Extraction", unit=" frame"
):
)):
frame_name = str(desc) + ".jpg"
frame_filepath = image_path / frame_name
self.save_frame_ffmpeg(frame_num, frame_filepath)
if gps_list and idx < len(gps_list) and gps_list[idx]:
add_gps_exif(frame_filepath, gps_list[idx][0], gps_list[idx][1])
print(
f"PICTURE CAPTURED AT {frame_num}: {desc}, Saved {descriptions.index(desc) + 1} picture(s) of {len(frames)}"
)
Expand Down Expand Up @@ -452,6 +485,7 @@ def labels(
descriptive_label,
height_percent: tuple,
ssoss_and_descriptive=True,
ro_info=None,
):

alpha = 1 # Transparency factor.
Expand Down Expand Up @@ -549,6 +583,11 @@ def labels(
)
# save image
cv2.imwrite(output_filename, ssoss_and_descriptive_label)
if ro_info is not None:
ts = float(Path(output_filename).stem.split("-")[-1])
loc = ro_info.get_location_at_timestamp(ts)
if loc:
add_gps_exif(output_filename, loc[0], loc[1])

else:
# no ssoss label, just descriptive label (not recommended)
Expand All @@ -570,6 +609,11 @@ def labels(
2,
)
cv2.imwrite(output_filename, img_new)
if ro_info is not None:
ts = float(Path(output_filename).stem.split("-")[-1])
loc = ro_info.get_location_at_timestamp(ts)
if loc:
add_gps_exif(output_filename, loc[0], loc[1])

@staticmethod
def generate_descriptive_label(
Expand Down Expand Up @@ -626,7 +670,11 @@ def generic_so_img_overlay_info_box(self, vid_filename_dir, ro_info):
)

self.labels(
img, label_img_name, descriptive_label, label_height_percents
img,
label_img_name,
descriptive_label,
label_height_percents,
ro_info=ro_info,
)

def img_overlay_info_box(self, vid_filename_dir, ro_info):
Expand Down Expand Up @@ -665,5 +713,9 @@ def img_overlay_info_box(self, vid_filename_dir, ro_info):
)

self.labels(
img, label_img_name, descriptive_label, label_height_percents
img,
label_img_name,
descriptive_label,
label_height_percents,
ro_info=ro_info,
)