Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QoS - API and implemention for event callbacks #6

Closed
wants to merge 11 commits into from
11 changes: 11 additions & 0 deletions rclpy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,17 @@ ament_target_dependencies(rclpy_logging
"rcutils"
)

# Signal handling library
add_library(
rclpy_signal_handler
SHARED src/rclpy/_rclpy_signal_handler.c
)
configure_python_c_extension_library(rclpy_signal_handler)
ament_target_dependencies(rclpy_signal_handler
"rcl"
"rcutils"
)

if(NOT WIN32)
ament_environment_hooks(
"${ament_cmake_package_templates_ENVIRONMENT_HOOK_LIBRARY_PATH}"
Expand Down
12 changes: 12 additions & 0 deletions rclpy/rclpy/action/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,13 @@ def send_goal(self, goal, **kwargs):
:type goal: action_type.Goal
:return: The result response.
:rtype: action_type.Result
:raises: TypeError if the type of the passed goal isn't an instance of
the Goal type of the provided action when the service was
constructed.
"""
if not isinstance(goal, self._action_type.Goal):
raise TypeError()

event = threading.Event()

def unblock(future):
Expand Down Expand Up @@ -386,7 +392,13 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None):
:return: a Future instance to a goal handle that completes when the goal request
has been accepted or rejected.
:rtype: :class:`rclpy.task.Future` instance
:raises: TypeError if the type of the passed goal isn't an instance of
the Goal type of the provided action when the service was
constructed.
"""
if not isinstance(goal, self._action_type.Goal):
raise TypeError()

request = self._action_type.Impl.SendGoalService.Request()
request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid
request.goal = goal
Expand Down
3 changes: 3 additions & 0 deletions rclpy/rclpy/action/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ def execute(self, execute_callback=None):
self._action_server.notify_execute(self, execute_callback)

def publish_feedback(self, feedback):
if not isinstance(feedback, self._action_server.action_type.Feedback):
raise TypeError()

with self._lock:
# Ignore for already destructed goal handles
if self._handle is None:
Expand Down
12 changes: 12 additions & 0 deletions rclpy/rclpy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,13 @@ def call(self, request: SrvTypeRequest) -> SrvTypeResponse:

:param request: The service request.
:return: The service response.
:raises: TypeError if the type of the passed request isn't an instance
of the Request type of the provided service when the client was
constructed.
"""
if not isinstance(request, self.srv_type.Request):
raise TypeError()

event = threading.Event()

def unblock(future):
Expand Down Expand Up @@ -116,7 +122,13 @@ def call_async(self, request: SrvTypeRequest) -> Future:

:param request: The service request.
:return: A future that completes when the request does.
:raises: TypeError if the type of the passed request isn't an instance
of the Request type of the provided service when the client was
constructed.
"""
if not isinstance(request, self.srv_type.Request):
raise TypeError()

sequence_number = _rclpy.rclpy_send_request(self.client_handle, request)
if sequence_number in self._pending_requests:
raise RuntimeError('Sequence (%r) conflicts with pending request' % sequence_number)
Expand Down
92 changes: 39 additions & 53 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from rclpy.guard_condition import GuardCondition
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.service import Service
from rclpy.signals import SignalHandlerGuardCondition
from rclpy.subscription import Subscription
from rclpy.task import Future
from rclpy.task import Task
Expand All @@ -46,13 +47,6 @@
from rclpy.waitable import NumberOfEntities
from rclpy.waitable import Waitable

# TODO(wjwwood): make _rclpy_wait(...) thread-safe
# Executor.spin_once() ends up calling _rclpy_wait(...), which right now is
# not thread-safe, no matter if different wait sets are used or not.
# See, for example, https://github.com/ros2/rclpy/issues/192
g_wait_set_spinning_lock = Lock()
g_wait_set_spinning = False

# For documentation purposes
# TODO(jacobperron): Make all entities implement the 'Waitable' interface for better type checking
WaitableEntityType = TypeVar('WaitableEntityType')
Expand Down Expand Up @@ -167,6 +161,7 @@ def __init__(self, *, context: Context = None) -> None:
self._cb_iter = None
self._last_args = None
self._last_kwargs = None
self._sigint_gc = SignalHandlerGuardCondition(context)

@property
def context(self) -> Context:
Expand Down Expand Up @@ -214,6 +209,9 @@ def shutdown(self, timeout_sec: float = None) -> bool:
if self._guard_condition:
_rclpy.rclpy_destroy_entity(self._guard_condition)
self._guard_condition = None
if self._sigint_gc:
self._sigint_gc.destroy()
self._sigint_gc = None
self._cb_iter = None
self._last_args = None
self._last_kwargs = None
Expand All @@ -222,6 +220,8 @@ def shutdown(self, timeout_sec: float = None) -> bool:
def __del__(self):
if self._guard_condition is not None:
_rclpy.rclpy_destroy_entity(self._guard_condition)
if self._sigint_gc is not None:
self._sigint_gc.destroy()

def add_node(self, node: 'Node') -> bool:
"""
Expand Down Expand Up @@ -480,6 +480,7 @@ def _wait_for_ready_callbacks(
entity_count.num_timers,
entity_count.num_clients,
entity_count.num_services,
entity_count.num_events,
self._context.handle)

entities = {
Expand All @@ -497,26 +498,23 @@ def _wait_for_ready_callbacks(
)
for waitable in waitables:
waitable.add_to_wait_set(wait_set)
(sigint_gc, sigint_gc_handle) = \
_rclpy.rclpy_get_sigint_guard_condition(self._context.handle)
try:
_rclpy.rclpy_wait_set_add_entity('guard_condition', wait_set, sigint_gc)
_rclpy.rclpy_wait_set_add_entity(
'guard_condition', wait_set, self._guard_condition)

# Wait for something to become ready
_rclpy.rclpy_wait(wait_set, timeout_nsec)
if self._is_shutdown:
raise ShutdownException()

# get ready entities
subs_ready = _rclpy.rclpy_get_ready_entities('subscription', wait_set)
guards_ready = _rclpy.rclpy_get_ready_entities('guard_condition', wait_set)
timers_ready = _rclpy.rclpy_get_ready_entities('timer', wait_set)
clients_ready = _rclpy.rclpy_get_ready_entities('client', wait_set)
services_ready = _rclpy.rclpy_get_ready_entities('service', wait_set)
finally:
_rclpy.rclpy_destroy_entity(sigint_gc)

sigint_gc = self._sigint_gc.guard_handle
_rclpy.rclpy_wait_set_add_entity('guard_condition', wait_set, sigint_gc)
_rclpy.rclpy_wait_set_add_entity(
'guard_condition', wait_set, self._guard_condition)

# Wait for something to become ready
_rclpy.rclpy_wait(wait_set, timeout_nsec)
if self._is_shutdown:
raise ShutdownException()

# get ready entities
subs_ready = _rclpy.rclpy_get_ready_entities('subscription', wait_set)
guards_ready = _rclpy.rclpy_get_ready_entities('guard_condition', wait_set)
timers_ready = _rclpy.rclpy_get_ready_entities('timer', wait_set)
clients_ready = _rclpy.rclpy_get_ready_entities('client', wait_set)
services_ready = _rclpy.rclpy_get_ready_entities('service', wait_set)

# Mark all guards as triggered before yielding since they're auto-taken
for gc in guards:
Expand Down Expand Up @@ -595,33 +593,21 @@ def wait_for_ready_callbacks(self, *args, **kwargs) -> Tuple[Task, WaitableEntit
.. Including the docstring for the hidden function for reference
.. automethod:: _wait_for_ready_callbacks
"""
global g_wait_set_spinning_lock
global g_wait_set_spinning
with g_wait_set_spinning_lock:
if g_wait_set_spinning:
raise RuntimeError(
'Executor.wait_for_ready_callbacks() called concurrently in multiple threads')
g_wait_set_spinning = True

try:
# if an old generator is done, this var makes the loop get a new one before returning
got_generator = False
while not got_generator:
if self._cb_iter is None or self._last_args != args or self._last_kwargs != kwargs:
# Create a new generator
self._last_args = args
self._last_kwargs = kwargs
self._cb_iter = self._wait_for_ready_callbacks(*args, **kwargs)
got_generator = True
# if an old generator is done, this var makes the loop get a new one before returning
got_generator = False
while not got_generator:
if self._cb_iter is None or self._last_args != args or self._last_kwargs != kwargs:
# Create a new generator
self._last_args = args
self._last_kwargs = kwargs
self._cb_iter = self._wait_for_ready_callbacks(*args, **kwargs)
got_generator = True

try:
return next(self._cb_iter)
except StopIteration:
# Generator ran out of work
self._cb_iter = None
finally:
with g_wait_set_spinning_lock:
g_wait_set_spinning = False
try:
return next(self._cb_iter)
except StopIteration:
# Generator ran out of work
self._cb_iter = None


class SingleThreadedExecutor(Executor):
Expand Down
1 change: 1 addition & 0 deletions rclpy/rclpy/impl/implementation_singleton.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@
rclpy_implementation = _import('._rclpy')
rclpy_action_implementation = _import('._rclpy_action')
rclpy_logging_implementation = _import('._rclpy_logging')
rclpy_signal_handler_implementation = _import('._rclpy_signal_handler')
54 changes: 44 additions & 10 deletions rclpy/rclpy/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
from rclpy.qos import qos_profile_parameter_events
from rclpy.qos import qos_profile_services_default
from rclpy.qos import QoSProfile
from rclpy.qos_event import PublisherEventCallbacks
from rclpy.qos_event import SubscriptionEventCallbacks
from rclpy.service import Service
from rclpy.subscription import Subscription
from rclpy.time_source import TimeSource
Expand Down Expand Up @@ -348,16 +350,23 @@ def create_publisher(
msg_type,
topic: str,
*,
qos_profile: QoSProfile = qos_profile_default
qos_profile: QoSProfile = qos_profile_default,
callback_group: Optional[CallbackGroup] = None,
event_callbacks: Optional[PublisherEventCallbacks] = None,
) -> Publisher:
"""
Create a new publisher.

:param msg_type: The type of ROS messages the publisher will publish.
:param topic: The name of the topic the publisher will publish to.
:param qos_profile: The quality of service profile to apply to the publisher.
:param callback_group: The callback group for the publisher's event handlers.
If ``None``, then the node's default callback group is used.
:param event_callbacks: User-defined callbacks for middleware events.
:return: The new publisher.
"""
callback_group = callback_group or self.default_callback_group

# this line imports the typesupport for the message module if not already done
check_for_type_support(msg_type)
failed = False
Expand All @@ -368,8 +377,16 @@ def create_publisher(
failed = True
if failed:
self._validate_topic_or_service_name(topic)
publisher = Publisher(publisher_handle, msg_type, topic, qos_profile, self.handle)

publisher = Publisher(
publisher_handle, msg_type, topic, qos_profile, self.handle,
event_callbacks=event_callbacks or PublisherEventCallbacks(),
callback_group=callback_group)
self.publishers.append(publisher)

for event_callback in publisher.event_handlers:
self.add_waitable(event_callback)

return publisher

def create_subscription(
Expand All @@ -379,7 +396,8 @@ def create_subscription(
callback: Callable[[MsgType], None],
*,
qos_profile: QoSProfile = qos_profile_default,
callback_group: CallbackGroup = None,
callback_group: Optional[CallbackGroup] = None,
event_callbacks: Optional[SubscriptionEventCallbacks] = None,
raw: bool = False
) -> Subscription:
"""
Expand All @@ -392,11 +410,12 @@ def create_subscription(
:param qos_profile: The quality of service profile to apply to the subscription.
:param callback_group: The callback group for the subscription. If ``None``, then the
nodes default callback group is used.
:param event_callbacks: User-defined callbacks for middleware events.
:param raw: If ``True``, then received messages will be stored in raw binary
representation.
"""
if callback_group is None:
callback_group = self.default_callback_group
callback_group = callback_group or self.default_callback_group

# this line imports the typesupport for the message module if not already done
check_for_type_support(msg_type)
failed = False
Expand All @@ -410,9 +429,14 @@ def create_subscription(

subscription = Subscription(
subscription_handle, subscription_pointer, msg_type,
topic, callback, callback_group, qos_profile, self.handle, raw)
topic, callback, callback_group, qos_profile, self.handle, raw,
event_callbacks=event_callbacks or SubscriptionEventCallbacks())
self.subscriptions.append(subscription)
callback_group.add_entity(subscription)

for event_handler in subscription.event_handlers:
self.add_waitable(event_handler)

return subscription

def create_client(
Expand Down Expand Up @@ -535,6 +559,11 @@ def create_guard_condition(
callback_group.add_entity(guard)
return guard

def _cleanup_publisher(self, publisher: Publisher) -> None:
"""Free all resources used by the publisher."""
publisher.event_handlers = []
_rclpy.rclpy_destroy_node_entity(publisher.publisher_handle, self.handle)

def destroy_publisher(self, publisher: Publisher) -> bool:
"""
Destroy a publisher created by the node.
Expand All @@ -543,11 +572,16 @@ def destroy_publisher(self, publisher: Publisher) -> bool:
"""
for pub in self.publishers:
if pub.publisher_handle == publisher.publisher_handle:
_rclpy.rclpy_destroy_node_entity(pub.publisher_handle, self.handle)
self._cleanup_publisher(pub)
self.publishers.remove(pub)
return True
return False

def _cleanup_subscription(self, subscription: Subscription) -> None:
"""Free all resources used by the subscription."""
subscription.event_handlers = []
_rclpy.rclpy_destroy_node_entity(subscription.subscription_handle, self.handle)

def destroy_subscription(self, subscription: Subscription) -> bool:
"""
Destroy a subscription created by the node.
Expand All @@ -556,7 +590,7 @@ def destroy_subscription(self, subscription: Subscription) -> bool:
"""
for sub in self.subscriptions:
if sub.subscription_handle == subscription.subscription_handle:
_rclpy.rclpy_destroy_node_entity(sub.subscription_handle, self.handle)
self._cleanup_subscription(sub)
self.subscriptions.remove(sub)
return True
return False
Expand Down Expand Up @@ -640,10 +674,10 @@ def destroy_node(self) -> bool:

while self.publishers:
pub = self.publishers.pop()
_rclpy.rclpy_destroy_node_entity(pub.publisher_handle, self.handle)
self._cleanup_publisher(pub)
while self.subscriptions:
sub = self.subscriptions.pop()
_rclpy.rclpy_destroy_node_entity(sub.subscription_handle, self.handle)
self._cleanup_subscription(sub)
while self.clients:
cli = self.clients.pop()
_rclpy.rclpy_destroy_node_entity(cli.client_handle, self.handle)
Expand Down
Loading