Skip to content

Commit 747a87e

Browse files
committed
Minimal hal for board tests
1 parent de26ffb commit 747a87e

File tree

8 files changed

+1173
-0
lines changed

8 files changed

+1173
-0
lines changed

kit_test/hal/__init__.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from .discovery import VidPid, discover_boards
2+
from .motor_board import VIDPID as MOTOR_VIDPID
3+
from .motor_board import MotorBoard
4+
from .power_board import BRAIN_OUTPUT, PowerBoard, PowerOutputPosition
5+
from .power_board import VIDPID as POWER_VIDPID
6+
from .servo_board import VIDPID as SERVO_VIDPID
7+
from .servo_board import ServoBoard
8+
9+
__all__ = [
10+
'BRAIN_OUTPUT',
11+
'MOTOR_VIDPID',
12+
'POWER_VIDPID',
13+
'SERVO_VIDPID',
14+
'MotorBoard',
15+
'PowerBoard',
16+
'PowerOutputPosition',
17+
'ServoBoard',
18+
'VidPid',
19+
'discover_boards',
20+
]

kit_test/hal/discovery.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"""Board discovery helper methods."""
2+
from __future__ import annotations
3+
4+
import logging
5+
from typing import NamedTuple
6+
7+
from serial.tools.list_ports import comports
8+
from serial.tools.list_ports_common import ListPortInfo
9+
10+
from .utils import BoardIdentity
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class VidPid(NamedTuple):
16+
"""A named tuple containing the vendor ID and product ID of a USB device."""
17+
18+
vendor_id: int
19+
product_id: int
20+
21+
def __str__(self) -> str:
22+
return f"{self.vendor_id:04X}:{self.product_id:04X}"
23+
24+
25+
class Port(NamedTuple):
26+
"""A named tuple containing the port name and USB identity."""
27+
28+
port: str
29+
identity: BoardIdentity
30+
31+
def __str__(self) -> str:
32+
return f"{self.port} ({self.identity})"
33+
34+
35+
def discover_boards(pidvids: list[VidPid] | VidPid) -> list[Port]:
36+
"""
37+
Discover boards connected to the system.
38+
39+
:param pidvids: A list of vendor ID and product ID pairs to search for.
40+
If a single pair is given, it will be used as a filter.
41+
:return: A list of ports for the discovered boards.
42+
"""
43+
if isinstance(pidvids, VidPid):
44+
pidvids = [pidvids]
45+
46+
boards: list[Port] = []
47+
48+
for port in comports():
49+
if port.vid is None or port.pid is None:
50+
# Skip ports that don't have a VID or PID
51+
continue
52+
53+
vidpid = VidPid(port.vid, port.pid)
54+
# Filter to USB vendor and product ID values provided
55+
if vidpid not in pidvids:
56+
continue
57+
58+
# Create board identity from USB port info
59+
initial_identity = get_USB_identity(port)
60+
61+
boards.append(
62+
Port(port.device, initial_identity)
63+
)
64+
65+
return boards
66+
67+
68+
def get_USB_identity(port: ListPortInfo) -> BoardIdentity:
69+
"""
70+
Generate an approximate identity for a board using the USB descriptor.
71+
72+
This data will be overridden by the firmware once communication is established,
73+
but is used for early logging messages and error handling.
74+
75+
:param port: The USB port information from pyserial
76+
:return: An initial identity for the board
77+
"""
78+
try:
79+
return BoardIdentity(
80+
manufacturer=port.manufacturer or "",
81+
board_type=port.product or "",
82+
asset_tag=port.serial_number or "",
83+
)
84+
except Exception:
85+
logger.warning(
86+
f"Failed to pull identifying information from serial device {port.device}")
87+
return BoardIdentity()

kit_test/hal/motor_board.py

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
"""The motor board module provides an interface to the motor board firmware over serial."""
2+
from __future__ import annotations
3+
4+
import atexit
5+
import logging
6+
from enum import IntEnum
7+
from typing import NamedTuple
8+
9+
from .discovery import VidPid
10+
from .serial_wrapper import SerialWrapper
11+
from .utils import (
12+
BoardIdentity,
13+
map_to_float,
14+
map_to_int,
15+
)
16+
17+
logger = logging.getLogger(__name__)
18+
BAUDRATE = 115200
19+
VIDPID = VidPid(0x0403, 0x6001) # FTDI FT232R chip used on the motor board
20+
21+
22+
class MotorPower(IntEnum):
23+
"""Special values for motor power."""
24+
25+
BRAKE = 0
26+
COAST = -1024 # A value outside the allowable range
27+
28+
29+
class MotorStatus(NamedTuple):
30+
"""A tuple representing the status of the motor board."""
31+
32+
output_faults: tuple[bool, ...]
33+
input_voltage: float
34+
other: tuple[str, ...] = tuple()
35+
36+
@classmethod
37+
def from_status_response(cls, response: str) -> MotorStatus:
38+
"""
39+
Create a MotorStatus object from the response to a status command.
40+
41+
:param response: The response from a *STATUS? command.
42+
:raise TypeError: If the response is invalid.
43+
:return: A MotorStatus object.
44+
"""
45+
output_fault_str, input_voltage_mv, *other = response.split(':')
46+
output_faults = tuple((port == '1') for port in output_fault_str.split(','))
47+
input_voltage = float(input_voltage_mv) / 1000
48+
return cls(output_faults, input_voltage, tuple(other))
49+
50+
51+
class MotorBoard:
52+
"""
53+
A class representing the motor board interface.
54+
55+
This class is intended to be used to communicate with the motor board over serial
56+
using the text-based protocol added in version 4.4 of the motor board firmware.
57+
58+
:param serial_port: The serial port to connect to.
59+
:param initial_identity: The identity of the board, as reported by the USB descriptor.
60+
"""
61+
62+
def __init__(
63+
self,
64+
serial_port: str,
65+
initial_identity: BoardIdentity | None = None,
66+
) -> None:
67+
if initial_identity is None:
68+
initial_identity = BoardIdentity()
69+
self._serial = SerialWrapper(serial_port, BAUDRATE, identity=initial_identity)
70+
71+
self.motors = (
72+
Motor(self._serial, 0),
73+
Motor(self._serial, 1)
74+
)
75+
76+
identity = self.identify()
77+
assert identity.board_type == 'MCv4B', \
78+
f"Expected board type 'MCv4B', got {identity.board_type!r} instead."
79+
80+
self._serial.set_identity(identity)
81+
82+
# Disable motors on exit
83+
atexit.register(self._cleanup)
84+
85+
def identify(self) -> BoardIdentity:
86+
"""
87+
Get the identity of the board.
88+
89+
:return: The identity of the board.
90+
"""
91+
response = self._serial.query('*IDN?')
92+
return BoardIdentity(*response.split(':'))
93+
94+
def status(self) -> MotorStatus:
95+
"""
96+
The status of the board.
97+
98+
:return: The status of the board.
99+
"""
100+
response = self._serial.query('*STATUS?')
101+
return MotorStatus.from_status_response(response)
102+
103+
def reset(self) -> None:
104+
"""
105+
Reset the board.
106+
107+
This command disables the motors and clears all faults.
108+
"""
109+
self._serial.write('*RESET')
110+
111+
def _cleanup(self) -> None:
112+
"""
113+
Disable the motors while exiting.
114+
115+
This method is registered as an exit handler.
116+
"""
117+
try:
118+
self.reset()
119+
except Exception:
120+
logger.warning(f"Failed to cleanup motor board {self._serial}.")
121+
122+
def __repr__(self) -> str:
123+
return f"<{self.__class__.__qualname__}: {self._serial}>"
124+
125+
126+
class Motor:
127+
"""
128+
A class representing a motor on the motor board.
129+
130+
Each motor is controlled through the power property
131+
and its current can be read using the current property.
132+
133+
:param serial: The serial wrapper to use to communicate with the board.
134+
:param index: The index of the motor on the board.
135+
"""
136+
137+
def __init__(self, serial: SerialWrapper, index: int):
138+
self._serial = serial
139+
self._index = index
140+
141+
def get_power(self) -> float:
142+
"""
143+
Read the current power setting of the motor.
144+
145+
:return: The power of the motor as a float between -1.0 and 1.0
146+
or the special value MotorPower.COAST.
147+
"""
148+
response = self._serial.query(f'MOT:{self._index}:GET?')
149+
150+
data = response.split(':')
151+
enabled = (data[0] == '1')
152+
value = int(data[1])
153+
154+
if not enabled:
155+
return MotorPower.COAST
156+
return map_to_float(value, -1000, 1000, -1.0, 1.0, precision=3)
157+
158+
def set_power(self, value: float) -> None:
159+
"""
160+
Set the power of the motor.
161+
162+
Internally this method maps the power to an integer between
163+
-1000 and 1000 so only 3 digits of precision are available.
164+
165+
:param value: The power of the motor as a float between -1.0 and 1.0
166+
or the special values MotorPower.COAST and MotorPower.BRAKE.
167+
"""
168+
if value == MotorPower.COAST:
169+
self._serial.write(f'MOT:{self._index}:DISABLE')
170+
return
171+
172+
setpoint = map_to_int(value, -1.0, 1.0, -1000, 1000)
173+
self._serial.write(f'MOT:{self._index}:SET:{setpoint}')
174+
175+
def current(self) -> float:
176+
"""
177+
Read the current draw of the motor.
178+
179+
:return: The current draw of the motor in amps.
180+
"""
181+
response = self._serial.query(f'MOT:{self._index}:I?')
182+
return float(response) / 1000
183+
184+
def in_fault(self) -> bool:
185+
"""
186+
Check if the motor is in a fault state.
187+
188+
:return: True if the motor is in a fault state, False otherwise.
189+
"""
190+
response = self._serial.query('*STATUS?')
191+
return MotorStatus.from_status_response(response).output_faults[self._index]
192+
193+
def __repr__(self) -> str:
194+
return f"<{self.__class__.__qualname__} index={self._index} {self._serial}>"

0 commit comments

Comments
 (0)