Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion mindtrace/hardware/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ The Mindtrace Hardware module provides a unified interface for managing industri

The hardware module consists of five main subsystems:

- **Camera System**: Multi-backend camera management (Basler, GenICam, OpenCV) with bandwidth control
- **Camera System**: Multi-backend camera management (Basler, GenICam, OpenCV) with bandwidth control and liquid lens autofocus
- **Stereo Camera System**: 3D vision with depth measurement and point cloud generation (Basler Stereo ace)
- **3D Scanner System**: Industrial 3D scanning with multi-component capture (Photoneo)
- **PLC System**: Industrial PLC integration (Allen-Bradley) with tag-based operations
Expand Down Expand Up @@ -159,6 +159,32 @@ CameraManagerService.launch(port=8002, block=True)
| OpenCV | opencv-python | USB cameras, webcams |
| Mock | Built-in | Testing, CI/CD |

### Liquid Lens & Autofocus

For Basler cameras with a connected liquid lens, the system provides hardware-level focus control:

```python
from mindtrace.hardware.cameras.core.camera import Camera

camera = Camera(name="Basler:ace2_001")

# Check lens support
status = camera.get_lens_status()
if status["connected"]:
# Manual focus
camera.set_optical_power(5.0) # diopters

# One-shot autofocus
camera.trigger_autofocus(accuracy="Accurate")

# Configure autofocus behavior
camera.set_focus_config(roi_size="Size64", edge_detection=True)

camera.close()
```

Auto-detection is exposed via `get_capabilities()` (`supports_liquid_lens` field) and the REST API at `POST /cameras/capabilities`.

### Homography Measurement

Convert pixel coordinates to real-world measurements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,25 @@ def _load_config():
success_count += 1
total_settings += 1

# Restore liquid lens / focus settings
if "optical_power" in config_data:
total_settings += 1
try:
if self._has_liquid_lens() and self._is_lens_connected():
await self.set_optical_power(float(config_data["optical_power"]))
success_count += 1
except Exception as e:
self.logger.warning(f"Could not restore optical power for camera '{self.camera_name}': {e}")

if "focus_config" in config_data:
total_settings += 1
try:
if self._has_liquid_lens() and self._is_lens_connected():
await self.set_focus_config(**config_data["focus_config"])
success_count += 1
except Exception as e:
self.logger.warning(f"Could not restore focus config for camera '{self.camera_name}': {e}")

self.logger.debug(
f"Configuration imported from '{config_path}' for camera '{self.camera_name}': "
f"{success_count}/{total_settings} settings applied successfully"
Expand Down Expand Up @@ -1514,6 +1533,16 @@ async def export_config(self, config_path: str):
except Exception as e:
self.logger.warning(f"Could not get pixel format for camera '{self.camera_name}': {e}")

# Liquid lens / focus (if available)
optical_power = None
focus_config = None
try:
if self._has_liquid_lens() and self._is_lens_connected():
optical_power = await self.get_optical_power()
focus_config = await self.get_focus_config()
except Exception as e:
self.logger.debug(f"Could not export lens config for camera '{self.camera_name}': {e}")

# Create common format configuration
config_data = {
"camera_type": "basler",
Expand All @@ -1533,6 +1562,11 @@ async def export_config(self, config_path: str):
"buffer_count": getattr(self, "buffer_count", 25),
}

if optical_power is not None:
config_data["optical_power"] = optical_power
if focus_config is not None:
config_data["focus_config"] = focus_config

# Write config to file (run in threadpool to avoid blocking event loop)
def _save_config():
with open(config_path, "w") as f:
Expand Down Expand Up @@ -2291,6 +2325,229 @@ async def get_inter_packet_delay_range(self) -> List[int]:
self.logger.warning(f"Failed to get inter-packet delay range for camera '{self.camera_name}': {e}")
return [0, 65535] # Default range

# ---- Liquid lens / focus control ----

def _has_liquid_lens(self) -> bool:
"""Check if camera has liquid lens nodes in its nodemap."""
if self.camera is None:
return False
try:
# pypylon raises LogicalErrorException for missing nodes in __getattr__,
# so hasattr() alone is not reliable — wrap in try/except.
self.camera.LensConnection # noqa: B018
return True
except Exception:
return False

def _is_lens_connected(self) -> bool:
"""Check if a physical lens is connected."""
if not self._has_liquid_lens():
return False
try:
return self.camera.LensConnection.GetValue() == "Connect"
except Exception:
return False

async def get_lens_status(self) -> Dict[str, Any]:
"""Get liquid lens hardware state."""
if not self.initialized or self.camera is None:
raise CameraConnectionError(f"Camera '{self.camera_name}' not initialized")

if not self._has_liquid_lens():
return {"connected": False, "status": "No lens controller", "optical_power": None}

try:
await self._ensure_open()
connection = await self._run_blocking(self.camera.LensConnection.GetValue, timeout=self._op_timeout_s)
connected = connection == "Connect"

status = "Not connected"
optical_power = None
if connected:
try:
status = await self._run_blocking(self.camera.LensStatus.GetValue, timeout=self._op_timeout_s)
except Exception:
status = "Unknown"
try:
optical_power = await self._run_blocking(
self.camera.LensOpticalPower.GetValue, timeout=self._op_timeout_s
)
except Exception:
pass

return {"connected": connected, "status": status, "optical_power": optical_power}
except CameraConnectionError:
raise
except Exception as e:
self.logger.warning(f"Could not get lens status for camera '{self.camera_name}': {e}")
return {"connected": False, "status": f"Error: {e}", "optical_power": None}

async def get_optical_power(self) -> float:
"""Get current lens optical power in diopters."""
if not self.initialized or self.camera is None:
raise CameraConnectionError(f"Camera '{self.camera_name}' not initialized")
if not self._has_liquid_lens() or not self._is_lens_connected():
raise CameraConfigurationError(f"Camera '{self.camera_name}' does not have a connected liquid lens")
try:
await self._ensure_open()
return await self._run_blocking(self.camera.LensOpticalPower.GetValue, timeout=self._op_timeout_s)
except (CameraConnectionError, CameraConfigurationError):
raise
except Exception as e:
raise HardwareOperationError(f"Failed to get optical power for camera '{self.camera_name}': {e}") from e

async def set_optical_power(self, diopters: float):
"""Set lens optical power in diopters (manual focus)."""
if not self.initialized or self.camera is None:
raise CameraConnectionError(f"Camera '{self.camera_name}' not initialized")
if not self._has_liquid_lens() or not self._is_lens_connected():
raise CameraConfigurationError(f"Camera '{self.camera_name}' does not have a connected liquid lens")
try:
min_d, max_d = await self.get_optical_power_range()
if diopters < min_d or diopters > max_d:
raise CameraConfigurationError(
f"Optical power {diopters} outside valid range [{min_d}, {max_d}] for camera '{self.camera_name}'"
)
await self._ensure_open()
await self._run_blocking(self.camera.LensOpticalPower.SetValue, diopters, timeout=self._op_timeout_s)
self.logger.debug(f"Optical power set to {diopters} diopters for camera '{self.camera_name}'")
except (CameraConnectionError, CameraConfigurationError):
raise
except Exception as e:
raise HardwareOperationError(f"Failed to set optical power for camera '{self.camera_name}': {e}") from e

async def get_optical_power_range(self) -> List[float]:
"""Get optical power range [min, max] in diopters."""
if not self.initialized or self.camera is None:
raise CameraConnectionError(f"Camera '{self.camera_name}' not initialized")
if not self._has_liquid_lens():
raise CameraConfigurationError(f"Camera '{self.camera_name}' does not have liquid lens support")
try:
await self._ensure_open()
min_val = await self._run_blocking(self.camera.LensOpticalPower.GetMin, timeout=self._op_timeout_s)
max_val = await self._run_blocking(self.camera.LensOpticalPower.GetMax, timeout=self._op_timeout_s)
return [min_val, max_val]
except CameraConfigurationError:
raise
except Exception as e:
raise HardwareOperationError(
f"Failed to get optical power range for camera '{self.camera_name}': {e}"
) from e

async def trigger_autofocus(self, accuracy: str = "Normal") -> bool:
"""Trigger one-shot autofocus.

Args:
accuracy: "Fast", "Normal", or "Accurate".

Returns:
True when autofocus completes successfully.
"""
if not self.initialized or self.camera is None:
raise CameraConnectionError(f"Camera '{self.camera_name}' not initialized")
if not self._has_liquid_lens() or not self._is_lens_connected():
raise CameraConfigurationError(f"Camera '{self.camera_name}' does not have a connected liquid lens")

valid_accuracies = ("Fast", "Normal", "Accurate")
if accuracy not in valid_accuracies:
raise CameraConfigurationError(f"Invalid accuracy '{accuracy}'. Must be one of {valid_accuracies}")

try:
await self._ensure_open()

# Autofocus needs live image frames for contrast analysis.
# Start grabbing if not already active, and stop afterwards.
was_grabbing = await self._run_blocking(self.camera.IsGrabbing, timeout=self._op_timeout_s)
if not was_grabbing:
await self._ensure_grabbing()

try:
# Set accuracy before triggering
if hasattr(self.camera, "FocusAccurate"):
await self._run_blocking(self.camera.FocusAccurate.SetValue, accuracy, timeout=self._op_timeout_s)

# Trigger one-shot autofocus
await self._run_blocking(self.camera.FocusAuto.SetValue, "Once", timeout=self._op_timeout_s)

# Poll until FocusAuto returns to "Off" (autofocus complete)
af_timeout = 30.0
poll_interval = 0.2
start = time.monotonic()
while (time.monotonic() - start) < af_timeout:
status = await self._run_blocking(self.camera.FocusAuto.GetValue, timeout=self._op_timeout_s)
if status == "Off":
self.logger.info(
f"Autofocus completed for camera '{self.camera_name}' in {time.monotonic() - start:.1f}s"
)
return True
await asyncio.sleep(poll_interval)

raise CameraTimeoutError(f"Autofocus timed out after {af_timeout}s for camera '{self.camera_name}'")
finally:
if not was_grabbing:
await self._ensure_stopped_grabbing()

except (CameraConnectionError, CameraConfigurationError, CameraTimeoutError):
raise
except Exception as e:
raise HardwareOperationError(f"Autofocus failed for camera '{self.camera_name}': {e}") from e

_FOCUS_CONFIG_NODE_MAP: Dict[str, str] = {
"accuracy": "FocusAccurate",
"stepper": "FocusStepper",
"stepper_lower_limit": "FocusStepperLowerLimit",
"stepper_upper_limit": "FocusStepperUpperLimit",
"roi_size": "FocusROISize",
"focus_source": "AutoFocusSource",
"edge_detection": "EdgeDetectionFocusing",
"roi_offset_x": "BslFocusXOffset",
"roi_offset_y": "BslFocusYOffset",
}

async def get_focus_config(self) -> Dict[str, Any]:
"""Get current focus/autofocus configuration."""
if not self.initialized or self.camera is None:
raise CameraConnectionError(f"Camera '{self.camera_name}' not initialized")
if not self._has_liquid_lens():
raise CameraConfigurationError(f"Camera '{self.camera_name}' does not have liquid lens support")

config: Dict[str, Any] = {}
await self._ensure_open()

for key, node_name in self._FOCUS_CONFIG_NODE_MAP.items():
try:
if hasattr(self.camera, node_name):
config[key] = await self._run_blocking(
getattr(self.camera, node_name).GetValue, timeout=self._op_timeout_s
)
except Exception as e:
self.logger.debug(f"Could not read {node_name} for camera '{self.camera_name}': {e}")

return config

async def set_focus_config(self, **settings):
"""Set focus/autofocus parameters."""
if not self.initialized or self.camera is None:
raise CameraConnectionError(f"Camera '{self.camera_name}' not initialized")
if not self._has_liquid_lens() or not self._is_lens_connected():
raise CameraConfigurationError(f"Camera '{self.camera_name}' does not have a connected liquid lens")

await self._ensure_open()

for key, value in settings.items():
node_name = self._FOCUS_CONFIG_NODE_MAP.get(key)
if node_name is None:
self.logger.warning(f"Unknown focus config key '{key}'")
continue
if not hasattr(self.camera, node_name):
self.logger.warning(f"Focus node '{node_name}' not available on camera '{self.camera_name}'")
continue
try:
await self._run_blocking(getattr(self.camera, node_name).SetValue, value, timeout=self._op_timeout_s)
self.logger.debug(f"Set {node_name}={value} for camera '{self.camera_name}'")
except Exception as e:
raise CameraConfigurationError(f"Failed to set {key}={value} for camera '{self.camera_name}': {e}")

async def close(self):
"""Close the camera and release resources.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,69 @@ async def get_capture_timeout(self) -> int:
self.logger.error(f"get_capture_timeout not implemented for {self.__class__.__name__}")
raise NotImplementedError(f"get_capture_timeout not supported by {self.__class__.__name__}")

# Liquid lens / focus control (optional, requires compatible hardware)
async def get_lens_status(self) -> Dict[str, Any]:
"""Get liquid lens hardware state.

Returns:
Dict with keys:
- connected (bool): Whether a lens is physically connected
- status (str): Lens status string (e.g., "Lens OK")
- optical_power (float | None): Current optical power in diopters
"""
self.logger.error(f"get_lens_status not implemented for {self.__class__.__name__}")
raise NotImplementedError(f"get_lens_status not supported by {self.__class__.__name__}")

async def get_optical_power(self) -> float:
"""Get current lens optical power in diopters."""
self.logger.error(f"get_optical_power not implemented for {self.__class__.__name__}")
raise NotImplementedError(f"get_optical_power not supported by {self.__class__.__name__}")

async def set_optical_power(self, diopters: float):
"""Set lens optical power in diopters (manual focus).

Args:
diopters: Target optical power within the lens range.
"""
self.logger.error(f"set_optical_power not implemented for {self.__class__.__name__}")
raise NotImplementedError(f"set_optical_power not supported by {self.__class__.__name__}")

async def get_optical_power_range(self) -> List[float]:
"""Get optical power range [min, max] in diopters."""
self.logger.error(f"get_optical_power_range not implemented for {self.__class__.__name__}")
raise NotImplementedError(f"get_optical_power_range not supported by {self.__class__.__name__}")

async def trigger_autofocus(self, accuracy: str = "Normal") -> bool:
"""Trigger one-shot autofocus.

Args:
accuracy: Autofocus accuracy mode — "Fast", "Normal", or "Accurate".

Returns:
True when autofocus completes successfully.
"""
self.logger.error(f"trigger_autofocus not implemented for {self.__class__.__name__}")
raise NotImplementedError(f"trigger_autofocus not supported by {self.__class__.__name__}")

async def get_focus_config(self) -> Dict[str, Any]:
"""Get current focus/autofocus configuration.

Returns:
Dict with keys: accuracy, stepper, stepper_lower_limit, stepper_upper_limit,
roi_size, focus_source, edge_detection, roi_offset_x, roi_offset_y.
"""
self.logger.error(f"get_focus_config not implemented for {self.__class__.__name__}")
raise NotImplementedError(f"get_focus_config not supported by {self.__class__.__name__}")

async def set_focus_config(self, **settings):
"""Set focus/autofocus parameters.

Args:
**settings: Keys matching get_focus_config() return values.
"""
self.logger.error(f"set_focus_config not implemented for {self.__class__.__name__}")
raise NotImplementedError(f"set_focus_config not supported by {self.__class__.__name__}")

async def __aenter__(self):
await self.setup_camera()
return self
Expand Down
Loading