Skip to content

Commit

Permalink
Merge branch 'develop' into gh-167-qaurt-review-response-route
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Unger <44368989+MyStackOverflows@users.noreply.github.com>
  • Loading branch information
MyStackOverflows authored Nov 24, 2023
2 parents 5e67cd2 + bc8337b commit 43aba3e
Show file tree
Hide file tree
Showing 19 changed files with 369 additions and 198 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/vidserver-lint.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: Video Processing Server Lint

concurrency:
group: ${{ github.run_id }}
cancel-in-progress: true

on:
push:
branches:
- "develop"
- "master"
- "main"
pull_request:
types:
- opened
- reopened
- synchronize

jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: Check out the repository
uses: actions/checkout@v4
with:
token: ${{ github.token }}
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
cache: 'pip'
cache-dependency-path: "**/requirements*.txt"
- name: Install dependencies
run: |
pip install -r app/video-processing/requirements.txt -r app/video-processing/requirements_dev.txt
- name: Run tests
run: cd app/video-processing/ && make lint
4 changes: 2 additions & 2 deletions .github/workflows/vidserver-test-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ jobs:
cache: 'pip'
cache-dependency-path: "**/requirements.txt"
- name: Install dependencies
run: pip install -r app/video-processing/requirements.txt
run: pip install -r app/video-processing/requirements.txt -r app/video-processing/requirements_dev.txt
- name: Run tests
run: cd app/video-processing/ && python3 -m unittest discover -s .
run: cd app/video-processing/ && make test
3 changes: 3 additions & 0 deletions app/video-processing/.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,6 @@ Makefile
**/*.png
test-samples/
*-videos/

# Test
test_*
13 changes: 13 additions & 0 deletions app/video-processing/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ IMAGE_BUILDER ?= podman
help: ## Display this help.
@awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m<target>\033[0m\n"} /^[a-zA-Z_0-9-]+:.*?##/ { printf " \033[36m%-20s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m%s\033[0m\n", substr($$0, 5) } ' $(MAKEFILE_LIST)


##@ Development

FLAKE ?= flake8
.PHONY: lint
lint: ## Lint source files
$(FLAKE) *.py

PYTEST ?= pytest
.PHONY: test
test: ## Run unit tests
$(PYTEST)

##@ Build
.PHONY: oci-build
oci-build: ## Build the OCI image for video processing server.
Expand Down
6 changes: 3 additions & 3 deletions app/video-processing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ PrivacyPal Video Processing service can be configured via the following environm
- gunicorn v21.2.0

You can installl these dependencies by navigating to `app/video-processing` and running:

```bash
pip install -r requirements.txt
pip install -r requirements.txt -r requirements_dev.txt
```


### Running tests

```bash
cd app/video-processing/ && python3 -m unittest discover -s .
cd app/video-processing/ && make test
```
3 changes: 0 additions & 3 deletions app/video-processing/env.py

This file was deleted.

37 changes: 23 additions & 14 deletions app/video-processing/process_tracker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import multiprocessing as mp, time, os
import multiprocessing as mp
import time
from utils import get_env


class ProcessTrackerObject():
process: mp.Process
Expand All @@ -11,7 +14,7 @@ class ProcessTrackerObject():
"""a float representing the number of hours this process has before it can be pruned"""

def __init__(self, process: mp.Process, expiry_period: float = 1):
self.timestamp = time.time() # number of seconds since unix epoch (jan 1 1970)
self.timestamp = time.time() # number of seconds since unix epoch
self.process = process
self.expiry_period = expiry_period

Expand All @@ -21,21 +24,22 @@ def is_expired(self):
`self.expiry_period` number of hours
"""
return (time.time() - self.timestamp) > (self.expiry_period * 3600)
def terminate_process(self):

def terminate(self):
"""
This function should be called when the ProcessTracker object is being pruned to terminate
the subprocess object contained within it or the process could be left dangling
"""
if self.process.is_alive():
self.process.terminate()

def process_is_alive(self):
def is_alive(self):
"""
This function returns the status of the process object, whether it is running or not
"""
return self.process.is_alive()


class ProcessTracker():
processes: 'dict[str, ProcessTrackerObject]' = {}
"""internal dict of `ProcessTrackerObject`s the ProcessTracker is keeping track of"""
Expand All @@ -46,19 +50,18 @@ class ProcessTracker():
is_running: bool
"""indicates if main() is running"""

_instance: "ProcessTracker" = None

def __init__(self):
self.prune_interval = 60
try:
self.prune_interval = int(os.environ["PRIVACYPAL_STATE_PRUNE_INTERVAL"])
except KeyError: pass
self.prune_interval = int(get_env("PRIVACYPAL_STATE_PRUNE_INTERVAL", 60))
self.is_running = False

def add(self, filename: str, p: ProcessTrackerObject):
"""
Adds a `ProcessTrackerObject` to be tracked.
"""
self.processes[filename] = p

def prune(self):
"""
Removes all expired `ProcessTrackerObject`s from the internal list.
Expand All @@ -67,7 +70,7 @@ def prune(self):
p = self.processes[f]
if p.is_expired():
print(f"Process on {f} has expired, pruning.")
p.terminate_process()
p.terminate()
self.processes.pop(f)

def get_process(self, filename: str):
Expand All @@ -82,12 +85,12 @@ def get_process(self, filename: str):

def terminate_processes(self):
for p in self.processes.values():
if p.process_is_alive():
p.terminate_process()
if p.is_alive():
p.terminate()

def is_any_alive(self):
for p in self.processes.values():
if p.process_is_alive():
if p.is_alive():
return True
return False

Expand All @@ -103,3 +106,9 @@ def main(self):
self.prune()
time.sleep(self.prune_interval)
print(f"ProcessTracker prune finished. Next prune in {self.prune_interval}s.")

@staticmethod
def get_instance():
if ProcessTracker._instance is None:
ProcessTracker._instance = ProcessTracker()
return ProcessTracker._instance
4 changes: 4 additions & 0 deletions app/video-processing/requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
flake8==6.1.0
pytest==7.4.3
pytest-asyncio==0.21.1
pytest-env==1.1.1
107 changes: 64 additions & 43 deletions app/video-processing/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,52 @@
from video_processor import VideoProcessor
from process_tracker import ProcessTrackerObject, ProcessTracker
from quart import Quart, request, jsonify, utils
from env import input_path, out_path
from utils import get_env

app = Quart(__name__)
vp = VideoProcessor()
tracker = ProcessTracker()
is_stateless = False
try:
is_stateless = True if str(os.environ["PRIVACYPAL_IS_STATELESS"]) == "true" else False # if env variable not defined, will raise KeyError
except KeyError:
pass


def start_process(file: str, final: str):
def start_process(file_path: str, final: str):
"""
Expects `file` to be the name of the file, such as '23-yeehaw-1698360721.mp4'. Synchronously runs video processing and returns
"""
vp.process(f"{input_path}/{file}", final)
print(f"Done processing {file}.")
vp: VideoProcessor = app.config["PROCESSOR"]
vp.process(f"{file_path}", final)
print(f"Done processing {os.path.basename(file_path)}.")
return "Video finished processing.", 200 # indicate processing is completed


@app.route("/process_video", methods=["POST"])
async def handle_request():
file = (await request.data).decode() # expects the filename, in the form <uid>-<file name>-<epoch time> such as "23-yeehaw-1698360721.mp4"
if os.path.isfile(f"{input_path}/{file}"): # check if the file exists
final = f"{out_path}/{file[:-4]}-processed{file[-4:]}"
if not app.testing: # if we're running Flask unit tests, don't run the video processing method
if not is_stateless: # start process and send response immediately
process = mp.Process(target=vp.process, args=(f"{input_path}/{file}", final)) # define a new process pointing to VideoProcessor.process()
tracker.add(file, ProcessTrackerObject(process))
process.start() # start the process on another thread
print(f"Process started on {file}")
return "Success, file exists.", 202 # indicate processing has started
else: # redundant else but makes the code cleaner to read
print(f"Process started on {file}")
response = await utils.run_sync(start_process)(file, final)
return response
return "Success, file exists.", 202
else:
input_path = app.config["INPUT_DIR"]
output_path = app.config["OUTPUT_DIR"]

file_path = f"{input_path}/{file}"
if not os.path.isfile(file_path): # check if the file exists
return "Error: file not found", 404

dest_path = f"{output_path}/{file[:-4]}-processed{file[-4:]}"
if not app.config["IS_STATELESS"]: # start process and send response immediately
process = mp.Process(target=start_process, args=(file_path, dest_path), daemon=True)
tracker: ProcessTracker = app.config["TRACKER"]
tracker.add(file, ProcessTrackerObject(process))
process.start() # start the process on another thread
print(f"Process started on {file}")
return "Success: file exists.", 202 # indicate processing has started
else:
print(f"Process started on {file}")
response = await utils.run_sync(start_process)(file, dest_path)
return response


@app.route("/process_status", methods=["GET"])
async def return_status():
if not is_stateless: # only enable this route if we're running in stateless mode
if not app.config["IS_STATELESS"]: # only enable this route if we're running in stateless mode
tracker: ProcessTracker = app.config["TRACKER"]
process = tracker.get_process(request.args["filename"])
if process is None:
return "Process does not exist", 404 # shouldn't ever happen, but just in case

if process.process_is_alive():
return "false", 200 # return false to the request for "is the video finished processing"
else:
Expand Down Expand Up @@ -81,7 +78,6 @@ async def cancel_process():
os.remove(f)

return "Success", 200

else:
print("Not running in stateless mode, returning 501")
return "", 501
Expand All @@ -93,23 +89,48 @@ async def return_health():


@app.before_serving
async def lifespan():
prune: mp.Process = mp.Process(target=tracker.main)
async def before():
# Initiate process tracker
app.config["TRACKER"] = ProcessTracker.get_instance()
app.config["PROCESSOR"] = VideoProcessor.get_instance()
app.config["CLEANUP_DELAY"] = 3 # 3s delay
app.config["INPUT_DIR"] = get_env("PRIVACYPAL_INPUT_VIDEO_DIR", "/opt/privacypal/input_videos")
app.config["OUTPUT_DIR"] = get_env("PRIVACYPAL_OUTPUT_VIDEO_DIR", "/opt/privacypal/output_videos")
app.config["IS_STATELESS"] = get_env("PRIVACYPAL_IS_STATELESS", "true").lower() == "true"
app.config["ENVIRONMENT"] = get_env("ENVIRONMENT", "production")

tracker: ProcessTracker = app.config["TRACKER"]

# Launch periodic prune subprocess
prune: mp.Process = mp.Process(target=tracker.main, daemon=True)
prune.start()
old_int_handler = signal.getsignal(signal.SIGINT)
old_term_handler = signal.getsignal(signal.SIGTERM)

# Overwrite SIGINT and SIGTERM handler to terminate subprocesses
old_handlers = {
signal.SIGINT: signal.getsignal(signal.SIGINT),
signal.SIGTERM: signal.getsignal(signal.SIGTERM)
}

def process_cleanup(_signal, _stack):
tracker.terminate_processes()
prune.kill()
while prune.is_alive() or tracker.is_any_alive():
print("Waiting on process to be terminate")
time.sleep(3)
print("All sub-processes terminated")
if (_signal == signal.SIGTERM):
old_term_handler(_signal, _stack)
elif (_signal == signal.SIGINT):
old_int_handler(_signal, _stack)
# FIXME: Workaround pytest pid issues
if app.config["ENVIRONMENT"] == "testing" or mp.current_process().name == "MainProcess": # Only on main process
# Terminate all video processing processes
tracker.terminate_processes()
# Kill prune process
prune.kill()
# Ensure all subprocesses are terminated/killed
while prune.is_alive() or tracker.is_any_alive():
print(f"Sub-processes are still running. Retry in {app.config['CLEANUP_DELAY']}s.")
time.sleep(app.config["CLEANUP_DELAY"])
print("All subprocesses are terminated.")

# Call other handlers
try:
old_handler = old_handlers[_signal]
if callable(old_handler):
old_handler(_signal, _stack)
except Exception:
pass

signal.signal(signal.SIGINT, process_cleanup)
signal.signal(signal.SIGTERM, process_cleanup)
21 changes: 21 additions & 0 deletions app/video-processing/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[flake8]
ignore = D203
max-line-length = 200
exclude =
__pycache__

[tool:pytest]
testpaths = .
env =
PRIVACYPAL_INPUT_VIDEO_DIR={PWD}/samples
PRIVACYPAL_OUTPUT_VIDEO_DIR={PWD}/samples
AWS_ACCESS_KEY_ID=some-key-id
AWS_SECRET_ACCESS_KEY=some-access-key
AWS_SESSION_TOKEN=some-session-token
AWS_DEFAULT_REGION=ca-central-1
PRIVACYPAL_IS_STATELESS=false
ENVIRONMENT=testing
log_cli = true
log_cli_level = INFO
log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"
log_cli_date_format = "%Y-%m-%d %H:%M:%S
Loading

0 comments on commit 43aba3e

Please sign in to comment.