Skip to content

Commit

Permalink
New interfaces for incoming QoS features
Browse files Browse the repository at this point in the history
Adds new PublisherOptions and SubscriptionOptions classes, with new Publisher and Subscriber constructors
to accept them. Adds the liveliness assertion callbacks that will be needed for the new Liveliness QoS policy

Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
emerson authored and Emerson Knapp committed Mar 28, 2019
1 parent b352d45 commit 7cb39a4
Show file tree
Hide file tree
Showing 19 changed files with 543 additions and 22 deletions.
10 changes: 10 additions & 0 deletions rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "rclcpp/client.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/subscription.hpp"
#include "rclcpp/timer.hpp"
#include "rclcpp/visibility_control.hpp"
Expand Down Expand Up @@ -61,6 +62,10 @@ class CallbackGroup
RCLCPP_PUBLIC
explicit CallbackGroup(CallbackGroupType group_type);

RCLCPP_PUBLIC
const std::vector<rclcpp::PublisherBase::WeakPtr> &
get_publisher_ptrs() const;

RCLCPP_PUBLIC
const std::vector<rclcpp::SubscriptionBase::WeakPtr> &
get_subscription_ptrs() const;
Expand Down Expand Up @@ -92,6 +97,10 @@ class CallbackGroup
protected:
RCLCPP_DISABLE_COPY(CallbackGroup)

RCLCPP_PUBLIC
void
add_publisher(const rclcpp::PublisherBase::SharedPtr publisher_ptr);

RCLCPP_PUBLIC
void
add_subscription(const rclcpp::SubscriptionBase::SharedPtr subscription_ptr);
Expand Down Expand Up @@ -119,6 +128,7 @@ class CallbackGroup
CallbackGroupType type_;
// Mutex to protect the subsequent vectors of pointers.
mutable std::mutex mutex_;
std::vector<rclcpp::PublisherBase::WeakPtr> publisher_ptrs_;
std::vector<rclcpp::SubscriptionBase::WeakPtr> subscription_ptrs_;
std::vector<rclcpp::TimerBase::WeakPtr> timer_ptrs_;
std::vector<rclcpp::ServiceBase::WeakPtr> service_ptrs_;
Expand Down
8 changes: 6 additions & 2 deletions rclcpp/include/rclcpp/create_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ create_publisher(
rclcpp::node_interfaces::NodeTopicsInterface * node_topics,
const std::string & topic_name,
const rmw_qos_profile_t & qos_profile,
const PublisherEventCallbacks & event_callbacks,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool use_intra_process_comms,
std::shared_ptr<AllocatorT> allocator)
{
Expand All @@ -39,10 +41,12 @@ create_publisher(

auto pub = node_topics->create_publisher(
topic_name,
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(allocator),
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(event_callbacks, allocator),
publisher_options,
use_intra_process_comms);
node_topics->add_publisher(pub);

node_topics->add_publisher(pub, group);

return std::dynamic_pointer_cast<PublisherT>(pub);
}

Expand Down
6 changes: 5 additions & 1 deletion rclcpp/include/rclcpp/create_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ create_subscription(
const std::string & topic_name,
CallbackT && callback,
const rmw_qos_profile_t & qos_profile,
const SubscriptionEventCallbacks & event_callbacks,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
bool use_intra_process_comms,
Expand All @@ -52,7 +53,10 @@ create_subscription(

auto factory = rclcpp::create_subscription_factory
<MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT>(
std::forward<CallbackT>(callback), msg_mem_strat, allocator);
std::forward<CallbackT>(callback),
event_callbacks,
msg_mem_strat,
allocator);

auto sub = node_topics->create_subscription(
topic_name,
Expand Down
67 changes: 62 additions & 5 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class Node : public std::enable_shared_from_this<Node>
const std::vector<rclcpp::callback_group::CallbackGroup::WeakPtr> &
get_callback_groups() const;

/// Create and return a Publisher.
/// Create and return a Publisher. Note: This constructor is deprecated
/**
* \param[in] topic_name The topic for this publisher to publish on.
* \param[in] qos_history_depth The depth of the publisher message queue.
Expand All @@ -151,10 +151,11 @@ class Node : public std::enable_shared_from_this<Node>
typename PublisherT = ::rclcpp::Publisher<MessageT, Alloc>>
std::shared_ptr<PublisherT>
create_publisher(
const std::string & topic_name, size_t qos_history_depth,
const std::string & topic_name,
size_t qos_history_depth,
std::shared_ptr<Alloc> allocator = nullptr);

/// Create and return a Publisher.
/// Create and return a Publisher. Note: this constructor is deprecated
/**
* \param[in] topic_name The topic for this publisher to publish on.
* \param[in] qos_profile The quality of service profile to pass on to the rmw implementation.
Expand All @@ -170,7 +171,24 @@ class Node : public std::enable_shared_from_this<Node>
const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default,
std::shared_ptr<Alloc> allocator = nullptr);

/// Create and return a Subscription.
/// Create and return a Publisher.
/**
* \param[in] topic_name The topic for this publisher to publish on.
* \param[in] group The callback group for this publisher. NULL for no callback group.
* \param[in] options Additional options for the created Publisher
* \return Shared pointer to the created publisher.
*/
template<
typename MessageT,
typename Alloc = std::allocator<void>,
typename PublisherT = ::rclcpp::Publisher<MessageT, Alloc>>
std::shared_ptr<PublisherT>
create_publisher(
const std::string & topic_name,
const PublisherOptions<Alloc> & options, // not defaulted b/c ambiguous with previous ctor
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr);

/// Create and return a Subscription. Note: this constructor is deprecated
/**
* \param[in] topic_name The topic to subscribe on.
* \param[in] callback The user-defined callback function.
Expand Down Expand Up @@ -203,7 +221,7 @@ class Node : public std::enable_shared_from_this<Node>
msg_mem_strat = nullptr,
std::shared_ptr<Alloc> allocator = nullptr);

/// Create and return a Subscription.
/// Create and return a Subscription. Note: this constructor is deprecated
/**
* \param[in] topic_name The topic to subscribe on.
* \param[in] qos_history_depth The depth of the subscription's incoming message queue.
Expand Down Expand Up @@ -236,6 +254,35 @@ class Node : public std::enable_shared_from_this<Node>
msg_mem_strat = nullptr,
std::shared_ptr<Alloc> allocator = nullptr);

/// Create and return a Subscription.
/**
* \param[in] topic_name The topic to subscribe on.
* \param[in] callback The user-defined callback function.
* \param[in] group The callback group for this subscription. NULL for no callback group.
* \param[in] options Additional options for the created Subscription
* \param[in] msg_mem_strat The message memory strategy to use for allocating messages.
* \return Shared pointer to the created subscription.
*/
/* TODO(jacquelinekay):
Windows build breaks when static member function passed as default
argument to msg_mem_strat, nullptr is a workaround.
*/
template<
typename MessageT,
typename CallbackT,
typename Alloc = std::allocator<void>,
typename SubscriptionT = rclcpp::Subscription<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>>
std::shared_ptr<SubscriptionT>
create_subscription(
const std::string & topic_name,
CallbackT && callback,
const SubscriptionOptions<Alloc> & options, // not defaulted b/c ambiguous with previous ctor
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat = nullptr);

/// Create a timer.
/**
* \param[in] period Time interval between triggers of the callback.
Expand Down Expand Up @@ -600,6 +647,16 @@ class Node : public std::enable_shared_from_this<Node>
const NodeOptions &
get_node_options() const;

/// Manually assert that this Node is alive (for RMW_QOS_POLICY_MANUAL_BY_NODE)
/**
* If the rmw Liveliness policy is set to RMW_QOS_POLICY_MANUAL_BY_NODE, the creator of this
* Node must manually call `assert_liveliness` periodically to signal that this Node
* is still alive. Must be called at least as often as qos_profile's Liveliness lease_duration
*/
RCLCPP_PUBLIC
void
assert_liveliness() {}

protected:
/// Construct a sub-node, which will extend the namespace of all entities created with it.
/**
Expand Down
71 changes: 69 additions & 2 deletions rclcpp/include/rclcpp/node_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ namespace rclcpp
template<typename MessageT, typename Alloc, typename PublisherT>
std::shared_ptr<PublisherT>
Node::create_publisher(
const std::string & topic_name, size_t qos_history_depth,
const std::string & topic_name,
size_t qos_history_depth,
std::shared_ptr<Alloc> allocator)
{
if (!allocator) {
Expand All @@ -80,7 +81,8 @@ extend_name_with_sub_namespace(const std::string & name, const std::string & sub
template<typename MessageT, typename Alloc, typename PublisherT>
std::shared_ptr<PublisherT>
Node::create_publisher(
const std::string & topic_name, const rmw_qos_profile_t & qos_profile,
const std::string & topic_name,
const rmw_qos_profile_t & qos_profile,
std::shared_ptr<Alloc> allocator)
{
if (!allocator) {
Expand All @@ -91,6 +93,30 @@ Node::create_publisher(
this->node_topics_.get(),
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
qos_profile,
PublisherEventCallbacks(),
nullptr,
this->get_node_options().use_intra_process_comms(),
allocator);
}

template<typename MessageT, typename Alloc, typename PublisherT>
std::shared_ptr<PublisherT>
Node::create_publisher(
const std::string & topic_name,
const PublisherOptions<Alloc> & options,
rclcpp::callback_group::CallbackGroup::SharedPtr group)
{
std::shared_ptr<Alloc> allocator = options.allocator();
if (!allocator) {
allocator = std::make_shared<Alloc>();
}

return rclcpp::create_publisher<MessageT, Alloc, PublisherT>(
this->node_topics_.get(),
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
options.qos_profile(),
options.event_callbacks(),
group,
this->get_node_options().use_intra_process_comms(),
allocator);
}
Expand Down Expand Up @@ -128,13 +154,54 @@ Node::create_subscription(
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
std::forward<CallbackT>(callback),
qos_profile,
SubscriptionEventCallbacks(),
group,
ignore_local_publications,
this->get_node_options().use_intra_process_comms(),
msg_mem_strat,
allocator);
}

template<
typename MessageT,
typename CallbackT,
typename Alloc,
typename SubscriptionT>
std::shared_ptr<SubscriptionT>
Node::create_subscription(
const std::string & topic_name,
CallbackT && callback,
const SubscriptionOptions<Alloc> & options,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat)
{
using CallbackMessageT = typename rclcpp::subscription_traits::has_message_type<CallbackT>::type;

std::shared_ptr<Alloc> allocator = options.allocator();
if (!allocator) {
allocator = std::make_shared<Alloc>();
}

if (!msg_mem_strat) {
using rclcpp::message_memory_strategy::MessageMemoryStrategy;
msg_mem_strat = MessageMemoryStrategy<CallbackMessageT, Alloc>::create_default();
}

return rclcpp::create_subscription<MessageT, CallbackT, Alloc, CallbackMessageT, SubscriptionT>(
this->node_topics_.get(),
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
std::forward<CallbackT>(callback),
options.qos_profile(),
options.event_callbacks(),
group,
options.ignore_local_publications(),
this->get_node_options().use_intra_process_comms(),
msg_mem_strat,
allocator);
}

template<
typename MessageT,
typename CallbackT,
Expand Down
3 changes: 2 additions & 1 deletion rclcpp/include/rclcpp/node_interfaces/node_topics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class NodeTopics : public NodeTopicsInterface
virtual
void
add_publisher(
rclcpp::PublisherBase::SharedPtr publisher);
rclcpp::PublisherBase::SharedPtr publisher,
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group);

RCLCPP_PUBLIC
virtual
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class NodeTopicsInterface
virtual
void
add_publisher(
rclcpp::PublisherBase::SharedPtr publisher) = 0;
rclcpp::PublisherBase::SharedPtr publisher,
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) = 0;

RCLCPP_PUBLIC
virtual
Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/parameter_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class AsyncParametersClient
"parameter_events",
std::forward<CallbackT>(callback),
rmw_qos_profile_default,
SubscriptionEventCallbacks(),
nullptr, // group,
false, // ignore_local_publications,
false, // use_intra_process_comms_,
Expand Down
19 changes: 17 additions & 2 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,21 @@

#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/publisher_options.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"

namespace rclcpp
{

// Forward declaration is used for friend statement.
namespace node_interfaces
{
// NOTE(emersonknapp) Forward declaration avoids including node_base_interface.hpp which causes
// circular inclusion from callback_group.hpp
class NodeBaseInterface;
// Forward declaration is used for friend statement.
class NodeTopicsInterface;
}

Expand Down Expand Up @@ -128,6 +132,16 @@ class PublisherBase
size_t
get_intra_process_subscription_count() const;

/// Manually assert that this Publisher is alive (for RMW_QOS_POLICY_MANUAL_BY_TOPIC)
/**
* If the rmw Liveliness policy is set to RMW_QOS_POLICY_MANUAL_BY_TOPIC, the creator of this
* Publisher must manually call `assert_liveliness` periodically to signal that this Publisher
* is still alive. Must be called at least as often as qos_profile's Liveliness lease_duration
*/
RCLCPP_PUBLIC
void
assert_liveliness() {}

/// Compare this publisher to a gid.
/**
* Note that this function calls the next function.
Expand Down Expand Up @@ -194,6 +208,7 @@ class Publisher : public PublisherBase
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rcl_publisher_options_t & publisher_options,
const PublisherEventCallbacks & /* event_callbacks */,
const std::shared_ptr<MessageAlloc> & allocator)
: PublisherBase(
node_base,
Expand Down
Loading

0 comments on commit 7cb39a4

Please sign in to comment.