Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logging webrtc statistics on host received over datachannel #141

Merged
merged 17 commits into from
Apr 15, 2024
Merged
Prev Previous commit
Next Next commit
Move WebRTC Statistics to metrics.py
  • Loading branch information
ehfd authored Apr 14, 2024
commit abae13645fdf0082f27041c59a14e1c030cf3763
15 changes: 13 additions & 2 deletions src/selkies_gstreamer/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,12 @@ def main():
parser.add_argument('--cursor_size',
default=os.environ.get('SELKIES_CURSOR_SIZE', os.environ.get('XCURSOR_SIZE', '-1')),
help='Cursor size in points for the local cursor, set instead XCURSOR_SIZE without of this argument to configure the cursor size for both the local and remote cursors')
parser.add_argument('--enable_webrtc_csv',
default=os.environ.get('SELKIES_ENABLE_WEBRTC_CSV, 'false'),
help='Enable WebRTC Statistics CSV dumping to the directory --webrtc_statistics_dir with filenames selkies-stats-video-[timestamp].csv and selkies-stats-audio-[timestamp].csv')
parser.add_argument('--webrtc_csv_dir',
default=os.environ.get('SELKIES_WEBRTC_CSV_DIR, '/tmp'),
help='Directory to save WebRTC Statistics CSV from client with filenames selkies-stats-video-[timestamp].csv and selkies-stats-audio-[timestamp].csv')
parser.add_argument('--enable_metrics_http',
default=os.environ.get('SELKIES_ENABLE_METRICS_HTTP', 'false'),
help='Enable the Prometheus HTTP metrics port')
Expand Down Expand Up @@ -471,7 +477,8 @@ def main():

# Initialize metrics server.
using_metrics_http = args.enable_metrics_http.lower() == 'true'
metrics = Metrics(int(args.metrics_http_port))
using_webrtc_csv = args.enable_webrtc_csv.lower() == 'true'
metrics = Metrics(int(args.metrics_http_port), using_webrtc_csv)

# Initialize the signalling client
using_https = args.enable_https.lower() == 'true'
Expand Down Expand Up @@ -727,6 +734,9 @@ def enable_resize_handler(enabled, enable_res):
# Send client latency to metrics
webrtc_input.on_client_latency = lambda latency_ms: metrics.set_latency(latency_ms)

# Send WebRTC stats to metrics
webrtc_input.on_client_webrtc_stats = lambda webrtc_stat_type, webrtc_stats: metrics.set_webrtc_stats(webrtc_stat_type, webrtc_stats)

# Initialize GPU monitor
gpu_mon = GPUMonitor(enabled=args.encoder.startswith("nv"))

Expand Down Expand Up @@ -828,7 +838,8 @@ def mon_rtc_config(stun_servers, turn_servers, rtc_config):
loop.run_in_executor(None, lambda: system_mon.start())

while True:
webrtc_input.initialize_webrtc_stats_files()
if using_webrtc_csv:
metrics.initialize_webrtc_csv_file(args.webrtc_csv_dir)
asyncio.ensure_future(app.handle_bus_calls(), loop=loop)
asyncio.ensure_future(audio_app.handle_bus_calls(), loop=loop)

Expand Down
140 changes: 138 additions & 2 deletions src/selkies_gstreamer/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
# limitations under the License.

from prometheus_client import start_http_server, Summary
from prometheus_client import Gauge, Histogram
from prometheus_client import Gauge, Histogram, Info
from datetime import datetime
import csv
import json
import logging
import random
import time
Expand All @@ -30,13 +33,19 @@
FPS_HIST_BUCKETS = (0, 20, 40, 60)

class Metrics:
def __init__(self, port=8000):
def __init__(self, port=8000, using_webrtc_csv=False):
self.port = port

self.fps = Gauge('fps', 'Frames per second observed by client')
self.fps_hist = Histogram('fps_hist', 'Histogram of FPS observed by client', buckets=FPS_HIST_BUCKETS)
self.gpu_utilization = Gauge('gpu_utilization', 'Utilization percentage reported by GPU')
self.latency = Gauge('latency', 'Latency observed by client')
self.webrtc_statistics = Info('webrtc_statistics', 'WebRTC Statistics from the client')
self.using_webrtc_csv = using_webrtc_csv
self.stats_video_file_path = None
self.stats_audio_file_path = None
self.prev_stats_video_header_len = None
self.prev_stats_audio_header_len = None

def set_fps(self, fps):
self.fps.set(fps)
Expand All @@ -51,6 +60,133 @@ def set_latency(self, latency_ms):
def start_http(self):
start_http_server(self.port)

def set_webrtc_stats(self, webrtc_stat_type, webrtc_stats):
webrtc_stats_obj = json.loads(webrtc_stats)
if self.using_webrtc_csv:
if webrtc_stat_type == "_stats_audio":
self.write_webrtc_stats_csv(webrtc_stats_obj, self.stats_audio_file_path)
else:
self.write_webrtc_stats_csv(webrtc_stats_obj, self.stats_video_file_path)
self.webrtc_statistics.info(webrtc_stats_obj)

def write_webrtc_stats_csv(self, obj_list, file_path):
"""Writes the WebRTC statistics to a CSV file.

Arguments:
obj_list {[list of object]} -- list of Python objects/dictionary
"""

dt = datetime.now()
timestamp = dt.strftime("%d/%B/%Y:%H:%M:%S")
try:
with open(file_path, 'a+') as stats_file:
csv_writer = csv.writer(stats_file, quotechar='"')

# Prepare the data
headers = ["timestamp"]
for obj in obj_list:
headers.extend(list(obj.keys()))
values = [timestamp]
for obj in obj_list:
values.extend(['"{}"'.format(val) if isinstance(val, str) and ';' in val else val for val in obj.values()])

if 'audio' in file_path:
# Audio stats
if self.prev_stats_audio_header_len == None:
csv_writer.writerow(headers)
csv_writer.writerow(values)
self.prev_stats_audio_header_len = len(headers)
elif self.prev_stats_audio_header_len == len(headers):
csv_writer.writerow(values)
else:
# We got new fields so update the data
self.update_webrtc_stats_csv(file_path, headers, values)
self.prev_stats_audio_header_len = len(headers)
else:
# Video stats
if self.prev_stats_video_header_len == None:
csv_writer.writerow(headers)
csv_writer.writerow(values)
self.prev_stats_video_header_len = len(headers)
elif self.prev_stats_video_header_len == len(headers):
csv_writer.writerow(values)
else:
# We got new fields so update the data
self.update_webrtc_stats_csv(file_path, headers, values)
self.prev_stats_video_header_len = len(headers)

except Exception as e:
logger.error("writing WebRTC Statistics to CSV file: " + str(e))

def update_webrtc_stats_csv(self, file_path, headers, values):
"""Copies data from one CSV file to another to facilite dynamic updates to the data structure
by handling empty values and appending new data.
"""
prev_headers = None
prev_values = []

try:
with open(file_path, 'r') as stats_file:
csv_reader = csv.reader(stats_file, delimiter=',')

# Fetch all existing data
header_indicator = 0
for row in csv_reader:
if header_indicator == 0:
prev_headers = row
header_indicator += 1
else:
prev_values.append(row)

i, j = 0, 0
while i < len(headers):
if headers[i] != prev_headers[j]:
# If there is a mismatch update all previous rows with a placeholder to represent an empty value, using `-1` here
for row in prev_values:
row.insert(i, -1)
i += 1
else:
i += 1
j += 1

# When new files are at the end
while j < i - 1:
for row in prev_values:
row.insert(j, -1)
j += 1

# Validation check to confirm modified rows are of same length
if len(prev_values[0]) != len(values):
logger.warn("There's a mismatch; columns could be misaligned with headers")

# Purge existing file
if os.path.exists(file_path):
os.remove(file_path)
else:
logger.warn("File {} doesn't exist to purge".format(file_path))

# create a new file with updated data
with open(file_path, "a") as stats_file:
csv_writer = csv.writer(stats_file)

csv_writer.writerow(headers)
csv_writer.writerows(prev_values)
csv_writer.writerow(values)

logger.info("WebRTC Statistics file {} created with updated data".format(stats_file))
except Exception as e:
logger.error("writing WebRTC Statistics to CSV file: " + str(e))

def initialize_webrtc_csv_file(self, webrtc_stats_dir='/tmp'):
"""Initializes the WebRTC Statistics file upon every new WebRTC connection
"""
dt = datetime.now()
timestamp = dt.strftime("%Y-%m-%d:%H:%M:%S")
self.stats_video_file_path = '{}/selkies-stats-video-{}.csv'.format(webrtc_stats_dir, timestamp)
self.stats_audio_file_path = '{}/selkies-stats-audio-{}.csv'.format(webrtc_stats_dir, timestamp)
self.prev_stats_video_header_len = None
self.prev_stats_audio_header_len = None

if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)

Expand Down
140 changes: 5 additions & 135 deletions src/selkies_gstreamer/webrtc_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@
from PIL import Image
from gamepad import SelkiesGamepad

from datetime import datetime
import csv
import json

import logging
logger = logging.getLogger("webrtc_input")
logger.setLevel(logging.INFO)
Expand Down Expand Up @@ -117,10 +113,6 @@ def __init__(self, uinput_mouse_socket_path="", js_socket_path="", enable_clipbo
self.button_mask = 0

self.ping_start = None
self.stats_video_file_path = None
self.stats_audio_file_path = None
self.prev_stats_video_header_len = None
self.prev_stats_audio_header_len = None

self.on_video_encoder_bit_rate = lambda bitrate: logger.warn(
'unhandled on_video_encoder_bit_rate')
Expand All @@ -146,6 +138,8 @@ def __init__(self, uinput_mouse_socket_path="", js_socket_path="", enable_clipbo
'unhandled on_ping_response')
self.on_cursor_change = lambda msg: logger.warn(
'unhandled on_cursor_change')
self.on_webrtc_stats = lambda msg: logger.warn(
'unhandled on_webrtc_stats')

def __keyboard_connect(self):
self.keyboard = pynput.keyboard.Controller()
Expand Down Expand Up @@ -728,134 +722,10 @@ def on_message(self, msg):
logger.error(
"failed to parse latency report from client" + str(toks))
elif toks[0] == "_stats_video" or toks[0] == "_stats_audio":
# Webrtc Statistics API data from client
# WebRTC Statistics API data from client
try:
stats_obj = json.loads(",".join(toks[1:]))
if toks[0] == "_stats_video":
self.write_webrtc_stats(stats_obj, self.stats_video_file_path)
else:
self.write_webrtc_stats(stats_obj, self.stats_audio_file_path)
self.on_webrtc_stats(toks[0], ",".join(toks[1:]))
except:
logger.error("failed to deserialize JSON object to python object")
logger.error("failed to parse WebRTC Statistics JSON object")
else:
logger.info('unknown data channel message: %s' % msg)

def write_webrtc_stats(self, obj_list, file_path):
"""Writes the webrtc statistics to a CSV file.

Arguments:
obj_list {[list of object]} -- list of python objects/dict.
"""

dt = datetime.now()
timestamp = dt.strftime("%d/%B/%Y:%H:%M:%S")
try:
with open(file_path, 'a+') as stats_file:
csv_writer = csv.writer(stats_file, quotechar='"')

# Prepare the data
headers = ["timestamp"]
for obj in obj_list:
headers.extend(list(obj.keys()))
values = [timestamp]
for obj in obj_list:
values.extend(['"{}"'.format(val) if isinstance(val, str) and ';' in val else val for val in obj.values()])

# Video stats
if 'video' in file_path:
if self.prev_stats_video_header_len == None:
csv_writer.writerow(headers)
csv_writer.writerow(values)
self.prev_stats_video_header_len = len(headers)
elif self.prev_stats_video_header_len == len(headers):
csv_writer.writerow(values)
else:
# We got new fields so update the data
self.update_the_stats(file_path, headers, values)
self.prev_stats_video_header_len = len(headers)
else:
# Audio stats
if self.prev_stats_audio_header_len == None:
csv_writer.writerow(headers)
csv_writer.writerow(values)
self.prev_stats_audio_header_len = len(headers)
elif self.prev_stats_audio_header_len == len(headers):
csv_writer.writerow(values)
else:
# We got new fields so update the data
self.update_the_stats(file_path, headers, values)
self.prev_stats_audio_header_len = len(headers)


except Exception as e:
logger.error("writing webrtc stats to csv file: " + str(e))

def update_the_stats(self, file_path, headers, values):
"""Copies data from one csv file to another to facilite dynamic updates to the data structure
by handling empty values and appending new data.
"""
prev_headers = None
prev_values = []

try:
with open(file_path, 'r') as stats_file:
csv_reader = csv.reader(stats_file, delimiter=',')

# Get all existing data
header_indicator = 0
for row in csv_reader:
if header_indicator == 0:
prev_headers = row
header_indicator += 1
else:
prev_values.append(row)

i, j = 0, 0
while i < len(headers):
if headers[i] != prev_headers[j]:
# If there's a mismatch then we encountered a new filed, so update all previous rows with
# a placeholder to represent an empty value, using `-1` here
for row in prev_values:
row.insert(i, -1)
i += 1
else:
i += 1
j += 1

# if the new fileds are the end then take care of those as well
while j<i-1:
for row in prev_values:
row.insert(j, -1)
j += 1

# Just some validation check to see if modified rows are of same lenght as new
if len(prev_values[0]) != len(values):
logger.warn("There's a mismatch; the columns could be misaligned with headers")

# Purge the existing file
if os.path.exists(file_path):
os.remove(file_path)
else:
logger.warn("File {} doesn't exist to purge".format(file_path))

# create a new file with updated data
with open(file_path, 'a') as stats_file:
csv_writer = csv.writer(stats_file)

csv_writer.writerow(headers)
csv_writer.writerows(prev_values)
csv_writer.writerow(values)

logger.info("File {} created with updated data".format(stats_file))
except Exception as e:
raise e

def initialize_webrtc_stats_files(self):
"""Initializes the Webrtc Statistics file upon every new webrtc connection
"""
dt = datetime.now()
timestamp = dt.strftime("%Y-%m-%d:%H:%M:%S")
self.stats_video_file_path = '/tmp/stats-video-{}.csv'.format(timestamp)
self.stats_audio_file_path = '/tmp/stats-audio-{}.csv'.format(timestamp)
self.prev_stats_video_header_len = None
self.prev_stats_audio_header_len = None