Skip to content

Commit

Permalink
QoS event callbacks implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
Emerson Knapp committed Apr 16, 2019
1 parent 4802211 commit ae0f5eb
Show file tree
Hide file tree
Showing 9 changed files with 602 additions and 18 deletions.
1 change: 1 addition & 0 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ def _wait_for_ready_callbacks(
entity_count.num_timers,
entity_count.num_clients,
entity_count.num_services,
entity_count.num_events,
self._context.handle)

entities = {
Expand Down
49 changes: 42 additions & 7 deletions rclpy/rclpy/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
from rclpy.qos import qos_profile_parameter_events
from rclpy.qos import qos_profile_services_default
from rclpy.qos import QoSProfile
from rclpy.qos_event import PublisherEventCallbacks
from rclpy.qos_event import SubscriptionEventCallbacks
from rclpy.service import Service
from rclpy.subscription import Subscription
from rclpy.time_source import TimeSource
Expand Down Expand Up @@ -348,16 +350,24 @@ def create_publisher(
msg_type,
topic: str,
*,
qos_profile: QoSProfile = qos_profile_default
qos_profile: QoSProfile = qos_profile_default,
callback_group: CallbackGroup = None,
event_callbacks: PublisherEventCallbacks = PublisherEventCallbacks(),
) -> Publisher:
"""
Create a new publisher.
:param msg_type: The type of ROS messages the publisher will publish.
:param topic: The name of the topic the publisher will publish to.
:param qos_profile: The quality of service profile to apply to the publisher.
:param callback_group: The callback group for the publisher's event handlers.
If ``None``, then the node's default callback group is used.
:param event_callbacks: User-defined callbacks for middleware events.
:return: The new publisher.
"""
if callback_group is None:
callback_group = self.default_callback_group

# this line imports the typesupport for the message module if not already done
check_for_type_support(msg_type)
failed = False
Expand All @@ -368,8 +378,16 @@ def create_publisher(
failed = True
if failed:
self._validate_topic_or_service_name(topic)
publisher = Publisher(publisher_handle, msg_type, topic, qos_profile, self.handle)

publisher = Publisher(
publisher_handle, msg_type, topic, qos_profile, self.handle,
event_callbacks=event_callbacks,
callback_group=callback_group)
self.publishers.append(publisher)

for event_callback in publisher.event_handlers:
self.add_waitable(event_callback)

return publisher

def create_subscription(
Expand All @@ -380,6 +398,7 @@ def create_subscription(
*,
qos_profile: QoSProfile = qos_profile_default,
callback_group: CallbackGroup = None,
event_callbacks: SubscriptionEventCallbacks = SubscriptionEventCallbacks(),
raw: bool = False
) -> Subscription:
"""
Expand All @@ -392,6 +411,7 @@ def create_subscription(
:param qos_profile: The quality of service profile to apply to the subscription.
:param callback_group: The callback group for the subscription. If ``None``, then the
nodes default callback group is used.
:param event_callbacks: User-defined callbacks for middleware events.
:param raw: If ``True``, then received messages will be stored in raw binary
representation.
"""
Expand All @@ -410,9 +430,13 @@ def create_subscription(

subscription = Subscription(
subscription_handle, subscription_pointer, msg_type,
topic, callback, callback_group, qos_profile, self.handle, raw)
topic, callback, callback_group, qos_profile, self.handle, raw, event_callbacks)
self.subscriptions.append(subscription)
callback_group.add_entity(subscription)

for event_handler in subscription.event_handlers:
self.add_waitable(event_handler)

return subscription

def create_client(
Expand Down Expand Up @@ -535,6 +559,11 @@ def create_guard_condition(
callback_group.add_entity(guard)
return guard

def _cleanup_publisher(self, publisher: Publisher) -> None:
"""Free all resources used by the publisher."""
publisher.event_handlers = []
_rclpy.rclpy_destroy_node_entity(publisher.publisher_handle, self.handle)

def destroy_publisher(self, publisher: Publisher) -> bool:
"""
Destroy a publisher created by the node.
Expand All @@ -543,11 +572,16 @@ def destroy_publisher(self, publisher: Publisher) -> bool:
"""
for pub in self.publishers:
if pub.publisher_handle == publisher.publisher_handle:
_rclpy.rclpy_destroy_node_entity(pub.publisher_handle, self.handle)
self._cleanup_publisher(pub)
self.publishers.remove(pub)
return True
return False

def _cleanup_subscription(self, subscription: Subscription) -> None:
"""Free all resources used by the subscription."""
subscription.event_handlers = []
_rclpy.rclpy_destroy_node_entity(subscription.subscription_handle, self.handle)

def destroy_subscription(self, subscription: Subscription) -> bool:
"""
Destroy a subscription created by the node.
Expand All @@ -556,7 +590,7 @@ def destroy_subscription(self, subscription: Subscription) -> bool:
"""
for sub in self.subscriptions:
if sub.subscription_handle == subscription.subscription_handle:
_rclpy.rclpy_destroy_node_entity(sub.subscription_handle, self.handle)
self._cleanup_subscription(sub)
self.subscriptions.remove(sub)
return True
return False
Expand Down Expand Up @@ -630,6 +664,7 @@ def destroy_node(self) -> bool:
:return: ``True`` if successful, ``False`` otherwise.
"""
# TODO(eknapp) kill between "Shutting down" and liveliness callback, segfault happens
ret = True
if self.handle is None:
return ret
Expand All @@ -640,10 +675,10 @@ def destroy_node(self) -> bool:

while self.publishers:
pub = self.publishers.pop()
_rclpy.rclpy_destroy_node_entity(pub.publisher_handle, self.handle)
self._cleanup_publisher(pub)
while self.subscriptions:
sub = self.subscriptions.pop()
_rclpy.rclpy_destroy_node_entity(sub.subscription_handle, self.handle)
self._cleanup_subscription(sub)
while self.clients:
cli = self.clients.pop()
_rclpy.rclpy_destroy_node_entity(cli.client_handle, self.handle)
Expand Down
26 changes: 25 additions & 1 deletion rclpy/rclpy/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List
from typing import TypeVar

from rclpy.callback_groups import CallbackGroup
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.qos import QoSProfile
from rclpy.qos_event import PublisherEventCallbacks
from rclpy.qos_event import QoSEventHandler
from rclpy.qos_event import QoSPublisherEventType

MsgType = TypeVar('MsgType')

Expand All @@ -28,7 +33,9 @@ def __init__(
msg_type: MsgType,
topic: str,
qos_profile: QoSProfile,
node_handle
node_handle,
event_callbacks: PublisherEventCallbacks,
callback_group: CallbackGroup,
) -> None:
"""
Create a container for a ROS publisher.
Expand All @@ -51,6 +58,23 @@ def __init__(
self.topic = topic
self.qos_profile = qos_profile
self.node_handle = node_handle
self.callback_group = callback_group

self.event_callbacks = event_callbacks
self.event_handlers: List[QoSEventHandler] = []
if event_callbacks:
if event_callbacks.deadline:
self.event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=event_callbacks.deadline,
event_type=QoSPublisherEventType.RCL_PUBLISHER_OFFERED_DEADLINE_MISSED,
parent_handle=publisher_handle))
if event_callbacks.liveliness:
self.event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=event_callbacks.liveliness,
event_type=QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST,
parent_handle=publisher_handle))

def publish(self, msg: MsgType) -> None:
"""
Expand Down
172 changes: 172 additions & 0 deletions rclpy/rclpy/qos_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import namedtuple
from enum import IntEnum
from typing import Callable

from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.waitable import NumberOfEntities
from rclpy.waitable import Waitable


class QoSPublisherEventType(IntEnum):
"""
Enum for types of QoS events that a Publisher can receive.
This enum matches the one defined in rcl/event.h
"""

RCL_PUBLISHER_OFFERED_DEADLINE_MISSED = 0
RCL_PUBLISHER_LIVELINESS_LOST = 1


class QoSSubscriptionEventType(IntEnum):
"""
Enum for types of QoS events that a Subscription can receive.
This enum matches the one defined in rcl/event.h
"""

RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED = 0
RCL_SUBSCRIPTION_LIVELINESS_CHANGED = 1


"""
Payload type for Subscription Deadline callback.
Mirrors rmw_requested_deadline_missed_status_t from rmw/types.h
"""
QoSRequestedDeadlineMissedInfo = namedtuple(
'QoSRequestedDeadlineMissedInfo',
('total_count', 'total_count_change'))

"""
Payload type for Subscription Liveliness callback.
Mirrors rmw_liveliness_changed_status_t from rmw/types.h
"""
QoSLivelinessChangedInfo = namedtuple(
'QoSLivelinessChangedInfo',
('alive_count', 'not_alive_count', 'alive_count_change', 'not_alive_count_change'))

"""
Payload type for Publisher Deadline callback.
Mirrors rmw_offered_deadline_missed_status_t from rmw/types.h
"""
QoSOfferedDeadlineMissedInfo = namedtuple(
'QoSOfferedDeadlineMissedInfo',
('total_count', 'total_count_change'))

"""
Payload type for Publisher Liveliness callback.
Mirrors rmw_liveliness_lost_status_t from rmw/types.h
"""
QoSLivelinessLostInfo = namedtuple(
'QoSLivelinessLostInfo',
('total_count', 'total_count_change'))


class SubscriptionEventCallbacks:
"""Container to provide middleware event callbacks for a Subscription."""

def __init__(
self,
*,
deadline: Callable[[QoSRequestedDeadlineMissedInfo], None] = None,
liveliness: Callable[[QoSLivelinessChangedInfo], None] = None,
) -> None:
"""
Constructor.
:param deadline: A user-defined callback that is called when a topic misses our
requested Deadline.
:param liveliness: A user-defined callback that is called when the Liveliness of
a Publisher on subscribed topic changes.
"""
self.deadline = deadline
self.liveliness = liveliness


class PublisherEventCallbacks:
"""Container to provide middleware event callbacks for a Publisher."""

def __init__(
self,
*,
deadline: Callable[[QoSOfferedDeadlineMissedInfo], None] = None,
liveliness: Callable[[QoSLivelinessLostInfo], None] = None
) -> None:
"""
Constructor.
:param deadline: A user-defined callback that is called when the Publisher misses
its offered Deadline.
:param liveliness: A user-defined callback that is called when this Publisher
fails to signal its Liveliness and is reported as not-alive.
"""
self.deadline = deadline
self.liveliness = liveliness


class QoSEventHandler(Waitable):
"""Waitable type to handle QoS events."""

def __init__(
self,
*,
callback_group,
callback,
event_type,
parent_handle,
):
# Waitable init adds self to callback_group
super().__init__(callback_group)
self.event_type = event_type
self.callback = callback

self._parent_handle = parent_handle
self._event_handle = _rclpy.rclpy_create_event(event_type, parent_handle)
self._event_is_ready = False
self._event_index = None

# Start Waitable API
def is_ready(self, wait_set):
"""Return True if entities are ready in the wait set."""
if _rclpy.rclpy_wait_set_is_ready('event', wait_set, self._event_index):
self._event_is_ready = True
return self._event_is_ready

def take_data(self):
"""Take stuff from lower level so the wait set doesn't immediately wake again."""
if self._event_is_ready:
self._event_is_ready = False
return _rclpy.rclpy_take_event(
self._event_handle, self._parent_handle, self.event_type)
return None

async def execute(self, taken_data):
"""Execute work after data has been taken from a ready wait set."""
self.callback(taken_data)

def get_num_entities(self):
"""Return number of each type of entity used."""
return NumberOfEntities(num_events=1)

def add_to_wait_set(self, wait_set):
"""Add entites to wait set."""
self._event_index = _rclpy.rclpy_wait_set_add_entity('event', wait_set, self._event_handle)
# End Waitable API
Loading

0 comments on commit ae0f5eb

Please sign in to comment.