Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions tests/unit/mindtrace/cluster/test_run_script_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,28 @@ def test_setup_environment_docker(self, mock_docker_env_class, worker, docker_jo
assert worker.env_manager == mock_docker_env
assert worker.container_id == "test-container-id"

@patch("mindtrace.cluster.workers.run_script_worker.DockerEnvironment")
@patch.dict(os.environ, {"GOOGLE_APPLICATION_CREDENTIALS": "/host/creds.json"})
def test_setup_environment_docker_maps_gcp_credentials_volume(self, mock_docker_env_class, worker):
docker_job = {
"environment": {
"docker": {
"image": "python:3.11",
"volumes": {"GCP_CREDENTIALS": {"bind": "/creds.json", "mode": "ro"}},
}
}
}

mock_docker_env = Mock()
mock_docker_env.setup.return_value = "container-123"
mock_docker_env_class.return_value = mock_docker_env

worker.setup_environment(docker_job["environment"])

passed_volumes = mock_docker_env_class.call_args.kwargs["volumes"]
assert "GCP_CREDENTIALS" not in passed_volumes
assert passed_volumes["/host/creds.json"] == {"bind": "/creds.json", "mode": "ro"}

def test_setup_environment_invalid_config(self, worker):
"""Test environment setup with invalid configuration."""
job_dict = {"environment": {"invalid_env": {"some": "config"}}}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Focused tests for MockGenICamCameraBackend behavior."""

import pytest

from mindtrace.hardware.cameras.backends.genicam.mock_genicam_camera_backend import MockGenICamCameraBackend
from mindtrace.hardware.core.exceptions import CameraConfigurationError, CameraConnectionError, CameraNotFoundError


def test_available_cameras_formats():
cams = MockGenICamCameraBackend.get_available_cameras()
details = MockGenICamCameraBackend.get_available_cameras(include_details=True)
assert isinstance(cams, list)
assert "MOCK_KEYENCE_001" in cams
assert isinstance(details, dict)
assert details["MOCK_KEYENCE_001"]["vendor"] == "KEYENCE"


@pytest.mark.asyncio
async def test_initialize_missing_camera_raises():
cam = MockGenICamCameraBackend("does_not_exist")
with pytest.raises(CameraNotFoundError):
await cam.initialize()


@pytest.mark.asyncio
async def test_set_exposure_requires_initialization():
cam = MockGenICamCameraBackend("MOCK_KEYENCE_001")
with pytest.raises(CameraConnectionError):
await cam.set_exposure(1000)


@pytest.mark.asyncio
async def test_keyence_exposure_casts_to_int():
cam = MockGenICamCameraBackend("MOCK_KEYENCE_001", vendor="KEYENCE")
await cam.initialize()
await cam.set_exposure(1234.9)
assert cam.exposure_time == 1234.0


@pytest.mark.asyncio
async def test_exposure_out_of_range_raises():
cam = MockGenICamCameraBackend("MOCK_BASLER_001", vendor="BASLER")
await cam.initialize()
with pytest.raises(CameraConfigurationError):
await cam.set_exposure(2_000_000)


def test_invalid_buffer_count_rejected():
with pytest.raises(CameraConfigurationError):
MockGenICamCameraBackend("MOCK_KEYENCE_001", buffer_count=0)
Original file line number Diff line number Diff line change
Expand Up @@ -519,3 +519,34 @@ def _test_pil_image_with_mock(self):
assert isinstance(calib_data, CalibrationData)
assert calib_data.world_unit == "mm"
assert calib_data.H.shape == (3, 3)

def test_calibrate_checkerboard_multi_view_validates_lengths(self):
image = np.zeros((100, 100, 3), dtype=np.uint8)
with pytest.raises(CameraConfigurationError, match="must match number of positions"):
self.calibrator.calibrate_checkerboard_multi_view(images=[image], positions=[])

@patch("cv2.findChessboardCorners")
@patch.object(HomographyCalibrator, "calibrate_from_correspondences")
def test_calibrate_checkerboard_multi_view_combines_points(self, mock_from_correspondences, mock_find_corners):
image = np.zeros((80, 80, 3), dtype=np.uint8)
corners = np.array([[[1.0, 1.0]], [[2.0, 2.0]], [[3.0, 3.0]], [[4.0, 4.0]]], dtype=np.float32)
mock_find_corners.return_value = (True, corners)
mock_from_correspondences.return_value = CalibrationData(H=np.eye(3))

result = self.calibrator.calibrate_checkerboard_multi_view(
images=[image],
positions=[(10.0, 20.0)],
board_size=(2, 2),
square_width=5.0,
square_height=7.0,
refine_corners=False,
world_unit="mm",
)

assert isinstance(result, CalibrationData)
kwargs = mock_from_correspondences.call_args.kwargs
np.testing.assert_array_equal(
kwargs["world_points"], np.array([[10.0, 20.0], [15.0, 20.0], [10.0, 27.0], [15.0, 27.0]])
)
np.testing.assert_array_equal(kwargs["image_points"], corners.reshape(-1, 2))
assert kwargs["world_unit"] == "mm"
54 changes: 54 additions & 0 deletions tests/unit/mindtrace/hardware/cameras/homography/test_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import json

import numpy as np

from mindtrace.hardware.cameras.homography.data import CalibrationData, MeasuredBox


def test_calibration_data_save_and_load_roundtrip(tmp_path):
filepath = tmp_path / "nested" / "calibration.json"
original = CalibrationData(
H=np.eye(3),
camera_matrix=np.array([[1.0, 0, 2.0], [0, 1.0, 3.0], [0, 0, 1.0]]),
dist_coeffs=np.array([0.1, 0.2, 0.3]),
world_unit="cm",
plane_normal_camera=np.array([0.0, 0.0, 1.0]),
)

original.save(str(filepath))
assert filepath.exists()

loaded = CalibrationData.load(str(filepath))
assert np.array_equal(loaded.H, original.H)
assert np.array_equal(loaded.camera_matrix, original.camera_matrix)
assert np.array_equal(loaded.dist_coeffs, original.dist_coeffs)
assert np.array_equal(loaded.plane_normal_camera, original.plane_normal_camera)
assert loaded.world_unit == "cm"


def test_calibration_data_load_defaults_world_unit(tmp_path):
filepath = tmp_path / "calibration.json"
filepath.write_text(json.dumps({"H": [[1, 0, 0], [0, 1, 0], [0, 0, 1]]}))

loaded = CalibrationData.load(str(filepath))
assert loaded.world_unit == "mm"
assert loaded.camera_matrix is None
assert loaded.dist_coeffs is None
assert loaded.plane_normal_camera is None


def test_measured_box_to_dict_serializes_corners():
measured = MeasuredBox(
corners_world=np.array([[0.0, 0.0], [2.0, 0.0], [2.0, 1.0], [0.0, 1.0]]),
width_world=2.0,
height_world=1.0,
area_world=2.0,
unit="m",
)

as_dict = measured.to_dict()
assert as_dict["corners_world"] == [[0.0, 0.0], [2.0, 0.0], [2.0, 1.0], [0.0, 1.0]]
assert as_dict["width_world"] == 2.0
assert as_dict["height_world"] == 1.0
assert as_dict["area_world"] == 2.0
assert as_dict["unit"] == "m"
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ def test_measurer_invalid_homography_shape(self):
with pytest.raises(CameraConfigurationError, match="Invalid homography matrix shape"):
HomographyMeasurer(invalid_calib)

def test_measurer_singular_homography_raises(self):
singular = np.array([[1.0, 0.0, 0.0], [2.0, 0.0, 0.0], [0.0, 0.0, 1.0]])
with pytest.raises(CameraConfigurationError, match="singular"):
HomographyMeasurer(CalibrationData(H=singular))

def test_unit_scale_conversion(self):
"""Test unit scaling conversion factors."""
# Test all supported unit conversions
Expand Down Expand Up @@ -422,3 +427,16 @@ def test_measurer_with_camera_parameters(self):
assert measured.width_world > 0
assert measured.height_world > 0
assert measured.unit == "mm"

def test_measure_distance_returns_expected_and_target_unit(self):
distance_mm, unit_mm = self.measurer.measure_distance((100.0, 50.0), (200.0, 50.0))
assert unit_mm == "mm"
assert distance_mm == pytest.approx(50.0)

distance_cm, unit_cm = self.measurer.measure_distance((100.0, 50.0), (200.0, 50.0), target_unit="cm")
assert unit_cm == "cm"
assert distance_cm == pytest.approx(5.0)

def test_measure_distance_validates_point_shape(self):
with pytest.raises(ValueError):
self.measurer.measure_distance((1.0, 2.0, 3.0), (2.0, 3.0))
28 changes: 28 additions & 0 deletions tests/unit/mindtrace/hardware/cameras/setup/test_setup_basler.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,34 @@ def test_installer_logging(self):
assert isinstance(installer.logger, logging.Logger)


class TestInstallerInternals:
"""More targeted tests for installer internals to improve branch coverage."""

def test_validate_package_rejects_small_file(self, tmp_path):
installer = PylonSDKInstaller()
package = tmp_path / "pylon_small.tar.gz"
package.write_bytes(b"x" * 1024)

info = {"min_size_mb": 100, "file_description": "x", "search_term": "x", "file_pattern": "x"}
assert installer._validate_package(package, info) is False

def test_validate_package_rejects_non_pylon_name(self, tmp_path):
installer = PylonSDKInstaller()
package = tmp_path / "camera_sdk.tar.gz"
package.write_bytes(b"x" * (120 * 1024 * 1024))

info = {"min_size_mb": 100, "file_description": "x", "search_term": "x", "file_pattern": "x"}
assert installer._validate_package(package, info) is False

def test_install_from_package_unsupported_platform(self, tmp_path):
installer = PylonSDKInstaller()
installer.platform = "Darwin"
package = tmp_path / "pylon.pkg"
package.write_text("dummy")

assert installer._install_from_package(package) is False


class TestTyperCommands:
"""Test Typer CLI commands."""

Expand Down
115 changes: 115 additions & 0 deletions tests/unit/mindtrace/hardware/cameras/setup/test_setup_basler_more.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""Additional branch tests for Basler setup installer."""

from unittest.mock import Mock, patch

import pytest

from mindtrace.hardware.cameras.setup.setup_basler import PylonSDKInstaller


@pytest.fixture
def installer(tmp_path):
cfg = Mock()
cfg.paths.lib_dir = str(tmp_path)
hw = Mock()
hw.get_config.return_value = cfg

with patch("mindtrace.hardware.cameras.setup.setup_basler.get_hardware_config", return_value=hw):
inst = PylonSDKInstaller()
return inst


def test_prompt_for_file_quit(installer):
with patch("mindtrace.hardware.cameras.setup.setup_basler.typer.prompt", return_value="q"):
assert installer._prompt_for_file(installer.PLATFORM_INFO["Linux"]) is None


def test_prompt_for_file_accepts_valid_path(installer, tmp_path):
pkg = tmp_path / "pylon_ok.tar.gz"
pkg.write_bytes(b"x" * (150 * 1024 * 1024))

with patch("mindtrace.hardware.cameras.setup.setup_basler.typer.prompt", return_value=str(pkg)):
got = installer._prompt_for_file(installer.PLATFORM_INFO["Linux"])

assert got == pkg


def test_open_download_page_handles_browser_error(installer):
with patch("mindtrace.hardware.cameras.setup.setup_basler.webbrowser.open", side_effect=RuntimeError("boom")):
installer._open_download_page() # no raise


def test_install_from_package_dispatches_linux(installer, tmp_path):
installer.platform = "Linux"
pkg = tmp_path / "pylon.tar.gz"
pkg.write_text("x")

with patch.object(installer, "_install_linux", return_value=True) as m:
assert installer._install_from_package(pkg) is True
m.assert_called_once_with(pkg)


def test_install_from_package_dispatches_windows(installer, tmp_path):
installer.platform = "Windows"
pkg = tmp_path / "pylon.exe"
pkg.write_text("x")

with patch.object(installer, "_install_windows", return_value=True) as m:
assert installer._install_from_package(pkg) is True
m.assert_called_once_with(pkg)


def test_install_linux_installs_debs(installer, tmp_path):
installer.platform = "Linux"
installer.pylon_dir = tmp_path / "pylon"
pkg = tmp_path / "pylon_debs.tar.gz"
pkg.write_bytes(b"x")

deb1 = installer.pylon_dir / "a.deb"
deb2 = installer.pylon_dir / "sub" / "b.deb"
deb2.parent.mkdir(parents=True, exist_ok=True)
deb1.write_text("a")
deb2.write_text("b")

class DummyTar:
def __enter__(self):
return self

def __exit__(self, *args):
return False

def extractall(self, path):
return None

calls = []

def fake_run(cmd):
calls.append(cmd)

with patch("tarfile.open", return_value=DummyTar()), patch.object(installer, "_run_command", side_effect=fake_run):
ok = installer._install_linux(pkg)

assert ok is True
assert any(cmd[:3] == ["sudo", "apt-get", "update"] for cmd in calls)
assert any(cmd[:3] == ["sudo", "dpkg", "-i"] for cmd in calls)


def test_uninstall_linux_runs_cleanup(installer):
installer.platform = "Linux"

calls = []

def fake_subprocess_run(cmd, check=False):
calls.append((cmd, check))
return Mock()

with patch("subprocess.run", side_effect=fake_subprocess_run), patch.object(installer, "_run_command") as rc:
assert installer._uninstall_linux() is True
rc.assert_called_once_with(["sudo", "apt-get", "autoremove", "-y"])

assert any(c[0][:4] == ["sudo", "apt-get", "remove", "-y"] for c in calls)


def test_uninstall_windows_returns_false(installer):
installer.platform = "Windows"
assert installer._uninstall_windows() is False
Loading
Loading