Skip to content
110 changes: 100 additions & 10 deletions occameracontrol/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
from requests.auth import HTTPDigestAuth
from typing import Optional

from occameracontrol.agent import Agent
from agent import Agent
from occameracontrol.metrics import register_camera_move, \
register_camera_expectation
register_camera_expectation, register_camera_status


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -75,13 +75,78 @@ def __str__(self) -> str:
'''
return f"'{self.agent.agent_id}' @ '{self.url}'"

def activate_camera(self, on=True):
def is_on(self) -> bool:
"""Retrieve whether or not the camera is in Standby.
For Panasonic camera AW-UE70:
0 if Standby
1 if On
3 if Transferring from Standby to On
TODO:
- Which panasonic models do we have?
- Maybe there is a difference for other models?
--> Works for the two models that we have

For Sony camera:
0 if Standby
1 if On

-1 if Something went wrong
"""
state = False
if self.type == CameraType.panasonic:
url = f'{self.url}/cgi-bin/aw_ptz'
command = "#O"
params = {'cmd': command, 'res': 1}
auth = (self.user, self.password) \
if self.user and self.password else None
logger.debug('GET %s with params: %s', url, params)
response = requests.get(url, auth=auth, params=params, timeout=5)
response.raise_for_status()
state_int = int(response.content.decode().removeprefix('p'))
while state_int == 3:
# Escape the transition from standby to on
time.sleep(3)
response = requests.get(
url,
auth=auth,
params=params,
timeout=5)
response.raise_for_status()
state = bool(state_int)

if self.type == CameraType.sony:
url = f'{self.url}/command/inquiry.cgi'
params = {'inq': 'system'}
headers = {'referer': f'{self.url}/'}
auth = HTTPDigestAuth(self.user, self.password) \
if self.user and self.password else None
logger.debug('GET %s with params: %s', url, params)
response = requests.get(url,
auth=auth,
headers=headers,
params=params,
timeout=5)
response.raise_for_status()
values = response.content.decode().split("&")
for v in values:
if "Power" in v:
if v.removeprefix("Power=")[1] == 'on':
state = True
else:
state = False

register_camera_status(self.url, int(state))
return state

def set_power(self, turn_on=True):
"""Activate the camera or put it into standby mode.
:param bool on: camera should be online or standby (default: True)
:param bool on: camera should be turned on (True)
or set to standby (False) (default: True)
"""
if self.type == CameraType.panasonic:
url = f'{self.url}/cgi-bin/aw_ptz'
command = '#On' if on else '#Of'
# If the camera is in Standby, turn it on
command = '#On' if not turn_on else '#Of'
params = {'cmd': command, 'res': 1}
auth = (self.user, self.password) \
if self.user and self.password else None
Expand All @@ -91,7 +156,8 @@ def activate_camera(self, on=True):

elif self.type == CameraType.sony:
url = f'{self.url}/command/main.cgi'
command = 'on' if on else 'standby'
# If the camera is in Standby, turn it on
command = 'on' if not turn_on else 'standby'
params = {'System': command}
headers = {'referer': f'{self.url}/'}
auth = HTTPDigestAuth(self.user, self.password) \
Expand All @@ -104,6 +170,8 @@ def activate_camera(self, on=True):
timeout=5)
response.raise_for_status()

self.is_on()

def move_to_preset(self, preset: int):
'''Move the PTZ camera to the specified preset position
'''
Expand Down Expand Up @@ -170,17 +238,39 @@ def update_position(self):
logger.info('[%s] Event `%s` started', agent_id, event.title)
logger.info('[%s] Moving to preset %i', agent_id,
self.preset_active)
self.activate_camera()
logger.debug('[%s] Retrieving the camera state', agent_id)
if not self.is_on():
self.set_power()
time.sleep(10)

# To update the metrics
self.is_on()

self.move_to_preset(self.preset_active)
else: # No active event
if self.position != self.preset_inactive:
logger.info('[%s] Returning to preset %i', agent_id,
self.preset_inactive)
self.activate_camera()
self.move_to_preset(self.preset_inactive)
logger.debug('[%s] Retrieving the camera state', agent_id)
if not self.is_on():
self.set_power()
time.sleep(10)

# To update the metrics
self.is_on()

self.move_to_preset(self.preset_inactive)
# Regular update
if time.time() - self.last_updated >= self.update_frequency:
logger.info('[%s] Re-sending preset %i to camera', agent_id,
self.position)
self.activate_camera()

logger.debug('[%s] Retrieving the camera state', agent_id)
if not self.is_on():
self.set_power()
time.sleep(10)

# To update the metrics
self.is_on()

self.move_to_preset(self.position)
14 changes: 14 additions & 0 deletions occameracontrol/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
'camera_position_expected',
'The position (preset number) a camera should be in',
('camera',))
camera_is_on = Gauge(
'camera_status',
'Whether the camera is On (1.0) or Standby (0.0)',
('camera',))


class RequestErrorHandler():
Expand Down Expand Up @@ -120,6 +124,16 @@ def register_camera_expectation(camera: str, position: int):
camera_position_expected.labels(camera).set(position)


def register_camera_status(camera: str, status: int):
'''Update metrics for the status of the camera. This ensures the (power)
state of the camera is available as part of the metrics.

:param camera: Camera identified
:param status: 1.0 if camera is 'on', 0.0 if 'standby'
'''
camera_is_on.labels(camera).set(status)


def start_metrics_exporter():
'''Start the web server for the metrics exporter endpoint if it is enabled
in the configuration.
Expand Down
Loading