Skip to content

Save acquisition log for camera sessions (issue #782) #785

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Changelog
8.28.0
------
* added: `_iblrig_tasks_spontaneousBpod` - a spontaneous task that includes Bpod spacers
* added: save acquisistion log for camera sessions
* added: save ambient data to Parquet file
* added: save task version to settings JSON
* added: check for remote data folder during hardware validation
Expand Down
187 changes: 159 additions & 28 deletions iblrig/test/test_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import sys
import tempfile
import unittest
from datetime import date, timedelta
from datetime import date, datetime, timedelta
from pathlib import Path
from unittest import mock
from unittest.mock import ANY, MagicMock, call, patch
from unittest.mock import ANY, DEFAULT, MagicMock, call, patch

import numpy as np

Expand Down Expand Up @@ -40,6 +40,8 @@ def test_download_from_alyx_or_flir(self, mock_os_rename, mock_hashfile, mock_aw
mock_os_rename.assert_called_once_with(Path('mocked_tmp_file'), expected_out_file)


@patch('iblrig.video.HAS_PYSPIN', True)
@patch('iblrig.video.HAS_SPINNAKER', True)
class BaseCameraTest(BaseTestCases.CommonTestTask):
"""A base class for camera hardware test fixtures."""

Expand All @@ -65,8 +67,6 @@ def get_task_kwargs(self, tmpdir=True):
class TestCameraSession(BaseCameraTest):
"""Test for iblrig.video.CameraSession class."""

@patch('iblrig.video.HAS_PYSPIN', True)
@patch('iblrig.video.HAS_SPINNAKER', True)
@patch('builtins.input')
@patch('iblrig.video.call_bonsai')
@patch('iblrig.video_pyspin.enable_camera_trigger')
Expand Down Expand Up @@ -100,6 +100,20 @@ def test_run_video_session(self, enable_camera_trigger, call_bonsai, _):
call(workflows.recording, expected_pars, debug=False, wait=False),
]
call_bonsai.assert_has_calls(expected)
# Check log file initiated and moved to session folder
log_file = raw_data_folder.joinpath('_ibl_log.info-acquisition.log')
self.assertTrue(log_file.exists() and log_file.stat().st_size > 0)
# When file logging fails, there should be a warning
with (
patch('iblrig.video.setup_logger', side_effect=(DEFAULT, OSError)),
self.assertWarns(UserWarning, msg='Failed to set up logs'),
):
session = video.CameraSession(**self.task_kwargs)
session = video.CameraSession(**self.task_kwargs)
with patch.object(session, '_copy_log_to_session', side_effect=OSError), self.assertLogs('iblrig', level='ERROR'):
session.run()
# Check the original log file still exists
self.assertTrue(any(Path(tempfile.gettempdir()).glob('iblrig_logs/????????-??????_camera-session.log')))

# Test validation
self.assertRaises(NotImplementedError, video.CameraSession, append=True)
Expand All @@ -112,6 +126,7 @@ class TestCameraSessionNetworked(unittest.IsolatedAsyncioTestCase, BaseCameraTes

def setUp(self):
super().setUp()
del self.task_kwargs['subject'] # not used in these tests
# Set up keyboad input mock - simply return empty string as await appears to be blocking
self.keyboard = ''
read_stdin = patch('iblrig.video.read_stdin')
Expand All @@ -126,6 +141,10 @@ async def _stdin():
async def asyncSetUp(self):
self.communicator = mock.AsyncMock(spec=video.net.app.EchoProtocol)
self.communicator.is_connected = True
self.session = video.CameraSessionNetworked(**self.task_kwargs)
# These two lines replace a call to `session.listen`
self.session.communicator = self.communicator
self.session._status = net.base.ExpStatus.CONNECTED

# Mock the call_bonsai_async function to return an async subprocess mock that awaits a
# future that we can access via self.bonsai_subprocess_future
Expand All @@ -141,23 +160,14 @@ async def _wait():
self.call_bonsai_async.return_value = mock.AsyncMock(spec=asyncio.subprocess.Process)
self.call_bonsai_async.return_value.wait.side_effect = _wait

@patch('iblrig.video.HAS_PYSPIN', True)
@patch('iblrig.video.HAS_SPINNAKER', True)
@patch('iblrig.video.call_bonsai')
@patch('iblrig.video_pyspin.enable_camera_trigger')
async def test_run_video_session(self, enable_camera_trigger, call_bonsai):
"""Test iblrig.video.CameraSessionNetworked.run method."""
# Some test hardware settings
task_kwargs = self.task_kwargs
del task_kwargs['subject']
config = task_kwargs['hardware_settings']['device_cameras']['default']
config = self.task_kwargs['hardware_settings']['device_cameras']['default']
workflows = config['BONSAI_WORKFLOW']

session = video.CameraSessionNetworked(**task_kwargs)
# These two lines replace a call to `session.listen`
session.communicator = self.communicator
session._status = net.base.ExpStatus.CONNECTED
self.assertEqual(session.config, config)
self.assertEqual(self.session.config, config)

def _end_bonsai_proc():
"""Return args with added side effect of signalling Bonsai subprocess termination."""
Expand All @@ -170,41 +180,41 @@ def _end_bonsai_proc():
match call_number: # before yielding each message, make some assertions on the current state of the session
# Before any messages processed
case 0:
self.assertIs(session.status, net.base.ExpStatus.CONNECTED)
self.assertIsNone(session.exp_ref)
self.assertIs(self.session.status, net.base.ExpStatus.CONNECTED)
self.assertIsNone(self.session.exp_ref)
# After info message processed
case 1:
self.assertIs(session.status, net.base.ExpStatus.CONNECTED)
self.assertIs(self.session.status, net.base.ExpStatus.CONNECTED)
# After init message processed
case 2:
self.assertIs(session.status, net.base.ExpStatus.INITIALIZED)
self.assertEqual(f'{date.today()}_1_foo', session.exp_ref)
bonsai_task = next((t for t in session._async_tasks if t.get_name() == 'bonsai'), None)
self.assertIs(self.session.status, net.base.ExpStatus.INITIALIZED)
self.assertEqual(f'{date.today()}_1_foo', self.session.exp_ref)
bonsai_task = next((t for t in self.session._async_tasks if t.get_name() == 'bonsai'), None)
self.assertIsNotNone(bonsai_task, 'failed to add named bonsai wait task to task set')
self.assertFalse(bonsai_task.done(), 'bonsai task unexpectedly cancelled')
self.call_bonsai_async.return_value.wait.assert_awaited_once()
# After start message processed
case 3:
self.assertIs(session.status, net.base.ExpStatus.RUNNING)
self.assertIs(self.session.status, net.base.ExpStatus.RUNNING)
# Simulate user ending bonsai subprocess
self.bonsai_subprocess_future.set_result(0)
# End loop by simulating communicator object disconnecting
self.communicator.is_connected = False
# case _:
# self.assertIs(session.status, net.base.ExpStatus.STOPPED)
# self.assertIs(self.session.status, net.base.ExpStatus.STOPPED)
yield msg

reponses = _end_bonsai_proc()
self.communicator.on_event.side_effect = lambda evt: next(reponses)
await session.run()
responses = _end_bonsai_proc()
self.communicator.on_event.side_effect = lambda evt: next(responses)
await self.session.run()
self.communicator.close.assert_called_once()
self.communicator.on_event.assert_awaited_with(net.base.ExpMessage.any())
self.assertEqual(net.base.ExpStatus.STOPPED, session.status)
self.assertEqual(net.base.ExpStatus.STOPPED, self.session.status)

# Validate calls
expected = [call(enable=False), call(enable=True), call(enable=False)]
enable_camera_trigger.assert_has_calls(expected)
raw_data_folder = session.paths['SESSION_RAW_DATA_FOLDER']
raw_data_folder = self.session.paths['SESSION_RAW_DATA_FOLDER']
self.assertTrue(str(raw_data_folder).startswith(str(self.tmp)))
expected_pars = {
'LeftCameraIndex': 1,
Expand All @@ -218,6 +228,127 @@ def _end_bonsai_proc():
workflows.setup, {'LeftCameraIndex': 1, 'RightCameraIndex': 1}, debug=False, wait=True
)
self.call_bonsai_async.assert_awaited_once_with(workflows.recording, expected_pars, debug=False)
# Check log file initiated and moved to session folder
log_file = raw_data_folder.joinpath('_ibl_log.info-acquisition.log')
self.assertTrue(log_file.exists() and log_file.stat().st_size > 0)

@patch('iblrig.video.call_bonsai')
@patch('iblrig.video_pyspin.enable_camera_trigger')
async def test_log_move_error(self, *_):
"""Check handles log move error without raising."""
handlers = self.session.logger.handlers
handler = next(h for h in handlers if getattr(h, 'baseFilename', '').endswith('_camera-session.log'))
temp_log_file = Path(handler.baseFilename)
self.assertTrue(temp_log_file.exists() and temp_log_file.is_relative_to(tempfile.gettempdir()))

def _end_bonsai_proc():
"""Return args with added side effect of signalling Bonsai subprocess termination."""
addr = '192.168.0.5:99998'
info_msg = ((net.base.ExpStatus.CONNECTED, {'subject_name': 'foo'}), addr, net.base.ExpMessage.EXPINFO)
init_msg = ([{'exp_ref': f'{date.today()}_1_foo'}], addr, net.base.ExpMessage.EXPINIT)
start_msg = ((f'{date.today()}_1_foo', {}), addr, net.base.ExpMessage.EXPSTART)
status_msg = (net.base.ExpStatus.RUNNING, addr, net.base.ExpMessage.EXPSTATUS)
for msg in (info_msg, init_msg, start_msg, status_msg): # noqa: UP028
yield msg
# End loop by simulating communicator object disconnecting
self.communicator.is_connected = False
yield status_msg

responses = _end_bonsai_proc()
self.communicator.on_event.side_effect = lambda evt: next(responses)

with patch.object(self.session, '_copy_log_to_session', side_effect=OSError), self.assertLogs('iblrig', level='ERROR'):
await self.session.run()
# Check the original log file still exists
self.assertTrue(temp_log_file.exists() and temp_log_file.stat().st_size > 0)

@patch('iblrig.video.call_bonsai')
@patch('iblrig.video_pyspin.enable_camera_trigger')
async def test_error_handling(self, *_):
"""Check handles message errors when running."""

def _message_mock_1():
"""Mock two init messages in a row, both with different expRefs."""
addr = '192.168.0.5:99998'
for ref in ('2020-01-01_1_foo', '2020-01-01_1_bar'):
yield [{'exp_ref': ref}], addr, net.base.ExpMessage.EXPINIT

responses_1 = _message_mock_1()
self.communicator.on_event.side_effect = lambda evt: next(responses_1)

with self.assertRaises(AssertionError) as cm:
await self.session.run()
self.assertEqual(str(cm.exception), 'expected 2025-03-12_1_foo, got 2025-03-12_1_bar')
# Check the communicator was closed
self.communicator.close.assert_called_once()
self.assertFalse(any(self.session._async_tasks))

# When the session is running, the communicator should remain open
await self.asyncSetUp() # reset the session

def _message_mock_2():
"""Mock two init messages in a row, both with different expRefs."""
addr = '192.168.0.5:99998'
yield (f'{date.today()}_1_foo', {}), addr, net.base.ExpMessage.EXPSTART
while True: # sometimes the on_event method is awaited again before the bonsai task
yield [{'exp_ref': '2020-01-01_1_bar'}], addr, net.base.ExpMessage.EXPINIT
if not self.bonsai_subprocess_future.done():
self.bonsai_subprocess_future.set_result(0)
self.communicator.is_connected = False

responses_2 = _message_mock_2()
self.communicator.on_event.side_effect = lambda evt: next(responses_2)
# Should catch and log error instead of raising
with self.assertLogs(self.session.logger.name, 'ERROR') as cm:
await self.session.run()
record = next((r.getMessage() for r in cm.records if r.levelno == 40), '')
self.assertRegex(record, r'2020-01-01_1_bar received; already running [\d\-_]+foo')

async def test_process_keyboard_input(self):
"""Test iblrig.video.CameraSessionNetworked._process_keyboard_input method."""
# With blank input should simply return
with self.assertNoLogs(self.session.logger, 'INFO'):
await self.session._process_keyboard_input('')
self.communicator.close.assert_not_called()
# With unknown input should log error
with self.assertLogs(self.session.logger, 'ERROR'):
await self.session._process_keyboard_input('FOO')
self.communicator.close.assert_not_called()
# With STOP should log info and stop recording
with self.assertLogs(self.session.logger, 'INFO'), patch.object(self.session, 'stop_recording') as stop:
await self.session._process_keyboard_input('STOP')
stop.assert_awaited_once()
self.communicator.close.assert_not_called()
# With START should log info and start recording
assert self.session.exp_ref is None
with self.assertLogs(self.session.logger, 'ERROR'), patch.object(self.session, 'on_start') as start:
await self.session._process_keyboard_input('START')
start.assert_not_awaited()
self.session.session_info = dict(SUBJECT_NAME='foo', SESSION_START_TIME=datetime.now().isoformat(), SESSION_NUMBER=1)
assert self.session.exp_ref
with self.assertLogs(self.session.logger, 'INFO'), patch.object(self.session, 'on_start') as start:
await self.session._process_keyboard_input('START')
exp_ref = f'{date.today()}_1_foo'
start.assert_awaited_once_with([exp_ref, {}], None)
# With QUIT should log info and close communicator
with self.assertLogs(self.session.logger, 'INFO'), patch.object(self.session, 'stop_recording') as stop:
await self.session._process_keyboard_input('QUIT')
stop.assert_not_awaited()
self.communicator.close.assert_called()
self.session.bonsai_process = MagicMock()
self.session.bonsai_process.returncode = 0
await self.session._process_keyboard_input('QUIT')
stop.assert_not_awaited()
self.session.bonsai_process.returncode = None
await self.session._process_keyboard_input('QUIT')
stop.assert_awaited_once()
# With QUIT! should call close method
self.communicator.close.reset_mock()
with self.assertLogs(self.session.logger, 'INFO'), patch.object(self.session, 'close', wraps=self.session.close) as close:
await self.session._process_keyboard_input('QUIT!!!')
close.assert_called_once()
self.communicator.close.assert_called_once()
self.assertFalse(any(self.session._async_tasks))


class TestValidateVideo(unittest.TestCase):
Expand Down
Loading
Loading