Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved Type Safety in Python Client API #42

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 92 additions & 60 deletions kos-py/pykos/services/actuator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,48 @@
from google.protobuf.any_pb2 import Any as AnyPb2

from kos_protos import actuator_pb2, actuator_pb2_grpc, common_pb2
from kos_protos.actuator_pb2 import CalibrateActuatorMetadata
from kos_protos.actuator_pb2 import (
CalibrateActuatorMetadata,
CommandActuatorsRequest,
CommandActuatorsResponse,
ConfigureActuatorRequest,
GetActuatorsStateRequest,
GetActuatorsStateResponse,
)


class ActuatorCommand(TypedDict):
"""Command parameters for an actuator.

Fields:
actuator_id: The ID of the actuator to command
position: Optional target position in degrees
velocity: Optional target velocity in degrees/second
torque: Optional target torque in Nm
"""

actuator_id: int
position: NotRequired[float]
velocity: NotRequired[float]
torque: NotRequired[float]


class ConfigureActuatorRequest(TypedDict):
class ConfigureActuatorParams(TypedDict):
"""Configuration parameters for an actuator.

Fields:
actuator_id: The ID of the actuator to configure
kp: Optional proportional gain for position control
kd: Optional derivative gain for position control
ki: Optional integral gain for position control
max_torque: Optional maximum torque limit
protective_torque: Optional protective torque threshold
protection_time: Optional protection activation time
torque_enabled: Optional flag to enable/disable torque
new_actuator_id: Optional new ID to assign to the actuator
zero_position: Optional flag to set current position as zero
"""

actuator_id: int
kp: NotRequired[float]
kd: NotRequired[float]
Expand All @@ -30,10 +61,6 @@ class ConfigureActuatorRequest(TypedDict):
zero_position: NotRequired[bool]


class ActuatorStateRequest(TypedDict):
actuator_ids: list[int]


class CalibrationStatus:
Calibrating = "calibrating"
Calibrated = "calibrated"
Expand All @@ -50,8 +77,10 @@ def decode_metadata(self, metadata_any: AnyPb2) -> None:
metadata = CalibrateActuatorMetadata()
if metadata_any.Is(CalibrateActuatorMetadata.DESCRIPTOR):
metadata_any.Unpack(metadata)
self.actuator_id = metadata.actuator_id
self.status = metadata.status if metadata.HasField("status") else None
if metadata.HasField("actuator_id"):
self.actuator_id = metadata.actuator_id
if metadata.HasField("status"):
self.status = metadata.status

def __str__(self) -> str:
return f"CalibrationMetadata(actuator_id={self.actuator_id}, status={self.status})"
Expand All @@ -68,88 +97,91 @@ def __init__(self, channel: grpc.Channel) -> None:
def calibrate(self, actuator_id: int) -> CalibrationMetadata:
"""Calibrate an actuator.

Args:
actuator_id: The ID of the actuator to calibrate

Returns:
Operation: The operation for the calibration.
CalibrationMetadata object containing the actuator ID and calibration status
"""
response = self.stub.CalibrateActuator(actuator_pb2.CalibrateActuatorRequest(actuator_id=actuator_id))
metadata = CalibrationMetadata(response.metadata)
return metadata
request = actuator_pb2.CalibrateActuatorRequest(actuator_id=actuator_id)
operation = self.stub.CalibrateActuator(request)
return CalibrationMetadata(operation.metadata)

def get_calibration_status(self, actuator_id: int) -> str | None:
response = self.operations_stub.GetOperation(
operations_pb2.GetOperationRequest(name=f"operations/calibrate_actuator/{actuator_id}")
)
metadata = CalibrationMetadata(response.metadata)
return metadata.status
"""Get the calibration status of an actuator.

def command_actuators(self, commands: list[ActuatorCommand]) -> actuator_pb2.CommandActuatorsResponse:
"""Command multiple actuators at once.
Args:
actuator_id: The ID of the actuator to check

Example:
>>> command_actuators([
... {"actuator_id": 1, "position": 90.0, "velocity": 100.0, "torque": 1.0},
... {"actuator_id": 2, "position": 180.0},
... ])
Returns:
The calibration status string or None if not found
"""
request = operations_pb2.GetOperationRequest(name=str(actuator_id))
operation = self.operations_stub.GetOperation(request)
return CalibrationMetadata(operation.metadata).status if operation.metadata else None

def command_actuators(self, commands: list[ActuatorCommand]) -> CommandActuatorsResponse:
"""Send commands to multiple actuators.

Args:
commands: List of dictionaries containing actuator commands.
Each dict should have 'actuator_id' and optionally 'position',
'velocity', and 'torque'.
commands: List of dictionaries specifying commands for each actuator.
Each dictionary must have 'actuator_id' and may include:
- position: Target position in degrees
- velocity: Target velocity in degrees/second
- torque: Target torque in Nm

Returns:
List of ActionResult objects indicating success/failure for each command.
CommandActuatorsResponse containing results for each command
"""
actuator_commands = [actuator_pb2.ActuatorCommand(**cmd) for cmd in commands]
request = actuator_pb2.CommandActuatorsRequest(commands=actuator_commands)
proto_commands = [actuator_pb2.ActuatorCommand(**cmd) for cmd in commands]
request = CommandActuatorsRequest(commands=proto_commands)
return self.stub.CommandActuators(request)

def configure_actuator(self, **kwargs: Unpack[ConfigureActuatorRequest]) -> common_pb2.ActionResult:
"""Configure an actuator's parameters.
def configure_actuator(self, **kwargs: Unpack[ConfigureActuatorParams]) -> common_pb2.ActionResponse:
"""Configure an actuator with the specified parameters.

Example:
>>> configure_actuator(
... actuator_id=1,
... kp=1.0,
... kd=0.1,
... ki=0.01,
... max_torque=100.0,
... protective_torque=None,
... protection_time=None,
... kp=10.0,
... kd=1.0,
... ki=0.1,
... max_torque=2.0,
... protective_torque=1.5,
... protection_time=0.5,
... torque_enabled=True,
... new_actuator_id=None,
... new_actuator_id=2,
... zero_position=True
... )

>>> configure_actuator(
... actuator_id=2,
... kp=1.0,
... kd=0.1,
... torque_enabled=True,
... )

Args:
actuator_id: ID of the actuator to configure
**kwargs: Configuration parameters that may include:
kp, kd, ki, max_torque, protective_torque,
protection_time, torque_enabled, new_actuator_id
**kwargs: Configuration parameters that must include:
actuator_id: The ID of the actuator to configure
And may optionally include:
kp: Proportional gain for position control
kd: Derivative gain for position control
ki: Integral gain for position control
max_torque: Maximum torque limit
protective_torque: Protective torque threshold
protection_time: Protection activation time
torque_enabled: Flag to enable/disable torque
new_actuator_id: New ID to assign to the actuator
zero_position: Flag to set current position as zero

Returns:
ActionResponse indicating success/failure
ActionResponse indicating if the configuration was successful
"""
request = actuator_pb2.ConfigureActuatorRequest(**kwargs)
request = ConfigureActuatorRequest(**kwargs)
return self.stub.ConfigureActuator(request)

def get_actuators_state(self, actuator_ids: list[int] | None = None) -> actuator_pb2.GetActuatorsStateResponse:
"""Get the state of multiple actuators.

Example:
>>> get_actuators_state([1, 2])
def get_actuators_state(self, actuator_ids: list[int] | None = None) -> GetActuatorsStateResponse:
"""Get the current state of specified actuators.

Args:
actuator_ids: List of actuator IDs to query. If None, gets state of all actuators.
actuator_ids: Optional list of actuator IDs to query. If None, queries all actuators.

Returns:
List of ActuatorStateResponse objects containing the state information
GetActuatorsStateResponse containing states for each queried actuator
"""
request = actuator_pb2.GetActuatorsStateRequest(actuator_ids=actuator_ids or [])
request = GetActuatorsStateRequest(actuator_ids=actuator_ids or [])
return self.stub.GetActuatorsState(request)
92 changes: 82 additions & 10 deletions kos-py/pykos/services/imu.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,68 @@
from google.protobuf.empty_pb2 import Empty

from kos_protos import common_pb2, imu_pb2, imu_pb2_grpc
from kos_protos.imu_pb2 import CalibrateIMUMetadata
from kos_protos.imu_pb2 import (
CalibrateIMUMetadata,
IMUAdvancedValuesResponse,
IMUValuesResponse,
QuaternionResponse,
)


class EulerAnglesResponse(TypedDict):
"""TypedDict containing orientation in Euler angles.

A dictionary type containing the IMU's orientation expressed as Euler angles
in the roll-pitch-yaw convention.

Fields:
roll: Rotation around X-axis in radians
pitch: Rotation around Y-axis in radians
yaw: Rotation around Z-axis in radians
error: Optional error information if the calculation failed
"""

roll: float
pitch: float
yaw: float
error: NotRequired[common_pb2.Error | None]


class ZeroIMUResponse(TypedDict):
"""TypedDict containing response from IMU zeroing operation.

A dictionary type indicating whether the IMU zeroing operation succeeded.
Zeroing sets the current orientation as the reference orientation.

Fields:
success: Whether the zeroing operation completed successfully
error: Optional error information if the operation failed
"""

success: bool
error: NotRequired[common_pb2.Error | None]


class ActionResponse(TypedDict):
success: bool
error: NotRequired[common_pb2.Error]


class CalibrateIMUResponse(TypedDict):
name: str
metadata: AnyPb2


class ZeroIMURequest(TypedDict):
"""Parameters for zeroing the IMU.

Fields:
max_retries: Optional maximum number of retries
max_angular_error: Optional maximum angular error during zeroing
max_velocity: Optional maximum velocity during zeroing
max_acceleration: Optional maximum acceleration during zeroing
"""

max_retries: NotRequired[int]
max_angular_error: NotRequired[float]
max_velocity: NotRequired[float]
Expand Down Expand Up @@ -56,35 +114,49 @@ def __init__(self, channel: grpc.Channel) -> None:
self.stub = imu_pb2_grpc.IMUServiceStub(channel)
self.operations_stub = operations_pb2_grpc.OperationsStub(channel)

def get_imu_values(self) -> imu_pb2.IMUValuesResponse:
def get_imu_values(self) -> IMUValuesResponse:
"""Get the latest IMU sensor values.

Returns:
ImuValuesResponse: The latest IMU sensor values.
IMUValuesResponse containing:
- accel_x/y/z: Acceleration values in m/s²
- gyro_x/y/z: Angular velocity values in rad/s
- mag_x/y/z: Optional magnetic field measurements
- error: Optional error information if the reading failed
"""
return self.stub.GetValues(Empty())

def get_imu_advanced_values(self) -> imu_pb2.IMUAdvancedValuesResponse:
def get_imu_advanced_values(self) -> IMUAdvancedValuesResponse:
"""Get the latest IMU advanced values.

Returns:
ImuAdvancedValuesResponse: The latest IMU advanced values.
IMUAdvancedValuesResponse containing:
- linear_acceleration_x/y/z: Gravity-compensated acceleration in m/s²
- angular_velocity_x/y/z: Filtered angular velocity in rad/s
- error: Optional error information if the processing failed
"""
return self.stub.GetAdvancedValues(Empty())

def get_euler_angles(self) -> imu_pb2.EulerAnglesResponse:
"""Get the latest Euler angles.

Returns:
EulerAnglesResponse: The latest Euler angles.
EulerAnglesResponse containing:
- roll: Rotation around X-axis in radians
- pitch: Rotation around Y-axis in radians
- yaw: Rotation around Z-axis in radians
- error: Optional error information if the calculation failed
"""
return self.stub.GetEuler(Empty())

def get_quaternion(self) -> imu_pb2.QuaternionResponse:
def get_quaternion(self) -> QuaternionResponse:
"""Get the latest quaternion.

Returns:
QuaternionResponse: The latest quaternion.
QuaternionResponse containing:
- w: Scalar component
- x/y/z: Vector components
- error: Optional error information if the calculation failed
"""
return self.stub.GetQuaternion(Empty())

Expand All @@ -108,7 +180,7 @@ def zero(self, duration: float = 1.0, **kwargs: Unpack[ZeroIMURequest]) -> commo
max_acceleration: Maximum acceleration during zeroing

Returns:
ActionResponse: The response from the zero operation.
ActionResponse indicating if the zeroing operation was successful
"""
request = imu_pb2.ZeroIMURequest(duration=_duration_from_seconds(duration), **kwargs)
return self.stub.Zero(request)
Expand All @@ -120,6 +192,6 @@ def calibrate(self) -> imu_pb2.CalibrateIMUResponse:
using get_calibration_status().

Returns:
CalibrationMetadata: Metadata about the calibration operation.
CalibrateIMUResponse containing operation name and metadata about the calibration.
"""
return self.stub.Calibrate(Empty())
Loading