Skip to content
69 changes: 69 additions & 0 deletions source/modulo_components/modulo_components/component_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import modulo_core.translators.message_writers as modulo_writers
import state_representation as sr
from geometry_msgs.msg import TransformStamped
from modulo_interfaces.msg import Assignment as AssignmentMsg
from modulo_interfaces.msg import Predicate as PredicateMsg
from modulo_interfaces.msg import PredicateCollection
from modulo_interfaces.srv import EmptyTrigger, StringTrigger
Expand Down Expand Up @@ -46,6 +47,7 @@ def __init__(self, node_name: str, *args, **kwargs):
super().__init__(node_name, *args, **node_kwargs)
self.__step_lock = Lock()
self.__parameter_dict: Dict[str, Union[str, sr.Parameter]] = {}
self.__assignment_dict: Dict[str, sr.Parameter] = {}
self.__read_only_parameters: Dict[str, bool] = {}
self.__pre_set_parameters_callback_called = False
self.__set_parameters_result = SetParametersResult()
Expand All @@ -68,6 +70,9 @@ def __init__(self, node_name: str, *args, **kwargs):
self.add_parameter(sr.Parameter("rate", 10.0, sr.ParameterType.DOUBLE),
"The rate in Hertz for all periodic callbacks")

self.__assignment_publisher = self.create_publisher(AssignmentMsg, "/assignments", self.__qos)
self.__assignment_message = AssignmentMsg()
self.__assignment_message.node = self.get_fully_qualified_name()
self.__predicate_publisher = self.create_publisher(PredicateCollection, "/predicates", self.__qos)
self.__predicate_message = PredicateCollection()
self.__predicate_message.node = self.get_fully_qualified_name()
Expand Down Expand Up @@ -342,6 +347,70 @@ def add_predicate(self, name: str, predicate: Union[bool, Callable[[], bool]]) -
except Exception as e:
self.get_logger().error(f"Failed to add predicate '{name}': {e}")

def add_assignment(self, name: str, type: sr.ParameterType) -> None:
"""
Add an assignment to the dictionary of assignments.

:param name: The name of the assignment
:param type: The type of the assignment
"""
parsed_name = parse_topic_name(name)
if not parsed_name:
self.get_logger().error(
f"The parsed name for assignment '{name}' is empty. Provide a "
"string with valid characters for the assignment name ([a-z0-9_]).")
return
if parsed_name != name:
self.get_logger().error(
f"The parsed name for assignment '{name}' is '{parsed_name}'. Use the parsed name "
"to refer to this assignment.")
if parsed_name in self.__assignment_dict.keys():
self.get_logger().warn(f"Assignment with name '{parsed_name}' already exists, overwriting.")
else:
self.get_logger().debug(f"Adding assignment '{parsed_name}'.")
try:
self.__assignment_dict[parsed_name] = sr.Parameter(parsed_name, type)
except Exception as e:
self.get_logger().error(f"Failed to add assignment '{parsed_name}': {e}")

def get_assignment(self, name: str) -> T:
"""
Get the assignment value from the assignment dictionary by its name.

:param name: The name of the assignment
:raises InvalidAssignmentError: if the assignment does not exist
:raises EmptyStateError: if the assignment has not been set yet
:return: The value of the assignment, if the assignment exists and has been assigned
"""
if name not in self.__assignment_dict.keys():
raise InvalidAssignmentError(f"Assignment '{name}' is not in the dict of assignments")
if self.__assignment_dict[name].is_empty():
# TODO: remove after control libraries v9.3.1
raise sr.exceptions.EmptyStateError(f"{name} state is empty")
return self.__assignment_dict[name].get_value()

def set_assignment(self, name: str, value: T) -> None:
"""
Set the value of an assignment. The assignment must have been previously declared.

:param name: The name of the assignment
:param value: The value of the assignment
"""
if name not in self.__assignment_dict.keys():
self.get_logger().error(
f"Failed to set assignment '{name}': Assignment does not exist.", throttle_duration_sec=1.0)
return
try:
self.__assignment_dict[name].set_value(value)
ros_param = write_parameter(self.__assignment_dict[name])
except Exception as e:
self.get_logger().error(f"Failed to set assignment '{name}': {e}", throttle_duration_sec=1.0)
return

message = copy.copy(self.__assignment_message)
message.assignment = ros_param.to_parameter_msg()
self.__assignment_publisher.publish(message)

def get_predicate(self, name: str) -> bool:
"""
Get the value of the predicate given as parameter. If the predicate is not found or the callable function fails,
Expand Down
34 changes: 33 additions & 1 deletion source/modulo_components/test/python/test_component_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import state_representation as sr
from modulo_interfaces.srv import EmptyTrigger, StringTrigger
from modulo_components.component_interface import ComponentInterface
from modulo_core.exceptions import CoreError, LookupTransformError
from modulo_core.exceptions import CoreError, LookupTransformError, InvalidAssignmentError
from rclpy.qos import QoSProfile
from std_msgs.msg import Bool, String
from sensor_msgs.msg import JointState
Expand Down Expand Up @@ -71,6 +71,38 @@ def test_set_predicate(component_interface):
assert not component_interface.get_predicate('bar')


def test_add_assignment(component_interface):
component_interface.add_assignment('string_assignment', sr.ParameterType.STRING)
assert len(component_interface._ComponentInterface__assignment_dict) == 1
# adding an empty assignment should fail
component_interface.add_assignment('', sr.ParameterType.STRING)
assert len(component_interface._ComponentInterface__assignment_dict) == 1
# adding the assignment again should just overwrite
component_interface.add_assignment('string_assignment', sr.ParameterType.STRING)
assert len(component_interface._ComponentInterface__assignment_dict) == 1
# names should be cleaned up
component_interface.add_assignment('7cleEaGn_AaSssiGNgn#ment', sr.ParameterType.STRING)
assert 'clean_assignment' in component_interface._ComponentInterface__assignment_dict.keys()
assert len(component_interface._ComponentInterface__assignment_dict) == 2
# names without valid characters should fail
component_interface.add_assignment('@@@@@@@', sr.ParameterType.STRING)
assert len(component_interface._ComponentInterface__assignment_dict) == 2


def test_get_set_assignment(component_interface):
component_interface.add_assignment('int_assignment', sr.ParameterType.INT)
with pytest.raises(InvalidAssignmentError):
component_interface.get_assignment('string_assignment')
component_interface.set_assignment('string_assignment', 'test')
with pytest.raises(sr.exceptions.EmptyStateError):
component_interface.get_assignment('int_assignment')
# setting the wrong type of value should fail
component_interface.set_assignment('int_assignment', 'test')
assert component_interface._ComponentInterface__assignment_dict['int_assignment'].is_empty()
component_interface.set_assignment('int_assignment', 5)
assert component_interface.get_assignment('int_assignment') == 5


def test_declare_signal(component_interface):
component_interface.declare_input("input", "test")
assert component_interface.get_parameter_value("input_topic") == "test"
Expand Down
8 changes: 8 additions & 0 deletions source/modulo_core/modulo_core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,11 @@ class LookupJointPositionsException(CoreError):

def __init__(self, message: str):
super().__init__(message, "LookupJointPositionsException")

class InvalidAssignmentError(CoreError):
"""
An exception class to notify errors when getting the value of an assignment.
"""

def __init__(self, message: str):
super().__init__(message, "InvalidAssignmentError")
Loading