Skip to content

Commit 2e4c97a

Browse files
wjwwoodpaudrow
andauthored
[rclcpp] Type Adaptation feature (#1557)
* initial version of type_adaptor.hpp Signed-off-by: William Woodall <william@osrfoundation.org> * initial version of rclcpp::get_message_type_support_handle() Signed-off-by: William Woodall <william@osrfoundation.org> * initial version of rclcpp::is_ros_compatible_type check Signed-off-by: William Woodall <william@osrfoundation.org> * fixup include statement order in publisher.hpp Signed-off-by: William Woodall <william@osrfoundation.org> * use new rclcpp::get_message_type_support_handle() and check in Publisher Signed-off-by: William Woodall <william@osrfoundation.org> * update adaptor->adapter, update TypeAdapter to use two arguments, add implicit default Signed-off-by: William Woodall <william@osrfoundation.org> * move away from shared_ptr<allocator> to just allocator, like the STL Signed-off-by: William Woodall <william@osrfoundation.org> * fixes to TypeAdapter and adding new publish function signatures Signed-off-by: William Woodall <william@osrfoundation.org> * bugfixes Signed-off-by: William Woodall <william@osrfoundation.org> * more bugfixes Signed-off-by: William Woodall <william@osrfoundation.org> * Add nullptr check Signed-off-by: Audrow Nash <audrow@hey.com> * Remove public from struct inheritance Signed-off-by: Audrow Nash <audrow@hey.com> * Add tests for publisher with type adapter Signed-off-by: Audrow Nash <audrow@hey.com> * Update packages to C++17 Signed-off-by: Audrow Nash <audrow@hey.com> * Revert "Update packages to C++17" This reverts commit 4585605. Signed-off-by: William Woodall <william@osrfoundation.org> * Begin updating AnySubscriptionCallback to use the TypeAdapter Signed-off-by: Audrow Nash <audrow@hey.com> * Use type adapter's custom type Signed-off-by: Audrow Nash <audrow@hey.com> * Correct which AnySubscriptionCallbackHelper is selected Signed-off-by: Audrow Nash <audrow@hey.com> * Setup dispatch function to work with adapted types Signed-off-by: Audrow Nash <audrow@hey.com> * Improve template logic on dispatch methods Signed-off-by: Audrow Nash <audrow@hey.com> * implement TypeAdapter for Subscription Signed-off-by: William Woodall <william@osrfoundation.org> * Add intraprocess tests with all supported message types Signed-off-by: Audrow Nash <audrow@hey.com> * Add intra process tests Signed-off-by: Audrow Nash <audrow@hey.com> * Add tests for subscription with type adapter Signed-off-by: Audrow Nash <audrow@hey.com> * Fix null allocator test Signed-off-by: Audrow Nash <audrow@hey.com> * Handle serialized message correctly Signed-off-by: Audrow Nash <audrow@hey.com> * Fix generic subscription Signed-off-by: Audrow Nash <audrow@hey.com> * Fix trailing space Signed-off-by: Audrow Nash <audrow@hey.com> * fix some issues found while testing type_adapter in demos Signed-off-by: William Woodall <william@osrfoundation.org> * add more tests, WIP Signed-off-by: William Woodall <william@osrfoundation.org> * Improve pub/sub tests Signed-off-by: Audrow Nash <audrow@hey.com> * Apply uncrustify formatting Signed-off-by: Audrow Nash <audrow@hey.com> * finish new tests for any subscription callback with type adapter Signed-off-by: William Woodall <william@osrfoundation.org> * fix adapt_type<...>::as<...> syntax Signed-off-by: William Woodall <william@osrfoundation.org> * fix explicit template instantiation of create_subscription() in new test Signed-off-by: William Woodall <william@osrfoundation.org> * cpplint fix Signed-off-by: William Woodall <william@osrfoundation.org> * Fix bug by aligning allocator types on both sides of ipm Signed-off-by: Audrow Nash <audrow@hey.com> * Fix intra process manager tests Signed-off-by: Audrow Nash <audrow@hey.com> Co-authored-by: Audrow Nash <audrow@hey.com>
1 parent 0659d82 commit 2e4c97a

31 files changed

+2792
-368
lines changed

rclcpp/include/rclcpp/any_subscription_callback.hpp

Lines changed: 627 additions & 103 deletions
Large diffs are not rendered by default.

rclcpp/include/rclcpp/create_subscription.hpp

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ template<
4747
typename MessageT,
4848
typename CallbackT,
4949
typename AllocatorT,
50-
typename CallbackMessageT,
5150
typename SubscriptionT,
5251
typename MessageMemoryStrategyT,
5352
typename NodeParametersT,
54-
typename NodeTopicsT>
53+
typename NodeTopicsT,
54+
typename ROSMessageType = typename SubscriptionT::ROSMessageType>
5555
typename std::shared_ptr<SubscriptionT>
5656
create_subscription(
5757
NodeParametersT & node_parameters,
@@ -70,7 +70,7 @@ create_subscription(
7070
using rclcpp::node_interfaces::get_node_topics_interface;
7171
auto node_topics_interface = get_node_topics_interface(node_topics);
7272

73-
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<CallbackMessageT>>
73+
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<ROSMessageType>>
7474
subscription_topic_stats = nullptr;
7575

7676
if (rclcpp::detail::resolve_enable_topic_statistics(
@@ -92,11 +92,11 @@ create_subscription(
9292
qos);
9393

9494
subscription_topic_stats = std::make_shared<
95-
rclcpp::topic_statistics::SubscriptionTopicStatistics<CallbackMessageT>
95+
rclcpp::topic_statistics::SubscriptionTopicStatistics<ROSMessageType>
9696
>(node_topics_interface->get_node_base_interface()->get_name(), publisher);
9797

9898
std::weak_ptr<
99-
rclcpp::topic_statistics::SubscriptionTopicStatistics<CallbackMessageT>
99+
rclcpp::topic_statistics::SubscriptionTopicStatistics<ROSMessageType>
100100
> weak_subscription_topic_stats(subscription_topic_stats);
101101
auto sub_call_back = [weak_subscription_topic_stats]() {
102102
auto subscription_topic_stats = weak_subscription_topic_stats.lock();
@@ -153,7 +153,6 @@ create_subscription(
153153
* \tparam MessageT
154154
* \tparam CallbackT
155155
* \tparam AllocatorT
156-
* \tparam CallbackMessageT
157156
* \tparam SubscriptionT
158157
* \tparam MessageMemoryStrategyT
159158
* \tparam NodeT
@@ -171,13 +170,8 @@ template<
171170
typename MessageT,
172171
typename CallbackT,
173172
typename AllocatorT = std::allocator<void>,
174-
typename CallbackMessageT =
175-
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type,
176-
typename SubscriptionT = rclcpp::Subscription<CallbackMessageT, AllocatorT>,
177-
typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
178-
CallbackMessageT,
179-
AllocatorT
180-
>,
173+
typename SubscriptionT = rclcpp::Subscription<MessageT, AllocatorT>,
174+
typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType,
181175
typename NodeT>
182176
typename std::shared_ptr<SubscriptionT>
183177
create_subscription(
@@ -194,7 +188,7 @@ create_subscription(
194188
)
195189
{
196190
return rclcpp::detail::create_subscription<
197-
MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT, MessageMemoryStrategyT>(
191+
MessageT, CallbackT, AllocatorT, SubscriptionT, MessageMemoryStrategyT>(
198192
node, node, topic_name, qos, std::forward<CallbackT>(callback), options, msg_mem_strat);
199193
}
200194

@@ -206,13 +200,8 @@ template<
206200
typename MessageT,
207201
typename CallbackT,
208202
typename AllocatorT = std::allocator<void>,
209-
typename CallbackMessageT =
210-
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type,
211-
typename SubscriptionT = rclcpp::Subscription<CallbackMessageT, AllocatorT>,
212-
typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
213-
CallbackMessageT,
214-
AllocatorT
215-
>>
203+
typename SubscriptionT = rclcpp::Subscription<MessageT, AllocatorT>,
204+
typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType>
216205
typename std::shared_ptr<SubscriptionT>
217206
create_subscription(
218207
rclcpp::node_interfaces::NodeParametersInterface::SharedPtr & node_parameters,
@@ -229,7 +218,7 @@ create_subscription(
229218
)
230219
{
231220
return rclcpp::detail::create_subscription<
232-
MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT, MessageMemoryStrategyT>(
221+
MessageT, CallbackT, AllocatorT, SubscriptionT, MessageMemoryStrategyT>(
233222
node_parameters, node_topics, topic_name, qos,
234223
std::forward<CallbackT>(callback), options, msg_mem_strat);
235224
}

rclcpp/include/rclcpp/experimental/intra_process_manager.hpp

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "rclcpp/allocator/allocator_deleter.hpp"
3434
#include "rclcpp/experimental/subscription_intra_process.hpp"
3535
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
36+
#include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
3637
#include "rclcpp/logger.hpp"
3738
#include "rclcpp/logging.hpp"
3839
#include "rclcpp/macros.hpp"
@@ -181,7 +182,7 @@ class IntraProcessManager
181182
do_intra_process_publish(
182183
uint64_t intra_process_publisher_id,
183184
std::unique_ptr<MessageT, Deleter> message,
184-
std::shared_ptr<typename allocator::AllocRebind<MessageT, Alloc>::allocator_type> allocator)
185+
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
185186
{
186187
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
187188
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
@@ -226,7 +227,7 @@ class IntraProcessManager
226227
{
227228
// Construct a new shared pointer from the message
228229
// for the buffers that do not require ownership
229-
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
230+
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
230231

231232
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
232233
shared_msg, sub_ids.take_shared_subscriptions);
@@ -243,7 +244,7 @@ class IntraProcessManager
243244
do_intra_process_publish_and_return_shared(
244245
uint64_t intra_process_publisher_id,
245246
std::unique_ptr<MessageT, Deleter> message,
246-
std::shared_ptr<typename allocator::AllocRebind<MessageT, Alloc>::allocator_type> allocator)
247+
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
247248
{
248249
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
249250
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
@@ -271,7 +272,7 @@ class IntraProcessManager
271272
} else {
272273
// Construct a new shared pointer from the message for the buffers that
273274
// do not require ownership and to return.
274-
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
275+
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
275276

276277
if (!sub_ids.take_shared_subscriptions.empty()) {
277278
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
@@ -368,12 +369,12 @@ class IntraProcessManager
368369
auto subscription_base = subscription_it->second.subscription.lock();
369370
if (subscription_base) {
370371
auto subscription = std::dynamic_pointer_cast<
371-
rclcpp::experimental::SubscriptionIntraProcess<MessageT, Alloc, Deleter>
372+
rclcpp::experimental::SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>
372373
>(subscription_base);
373374
if (nullptr == subscription) {
374375
throw std::runtime_error(
375376
"failed to dynamic cast SubscriptionIntraProcessBase to "
376-
"SubscriptionIntraProcess<MessageT, Alloc, Deleter>, which "
377+
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, which "
377378
"can happen when the publisher and subscription use different "
378379
"allocator types, which is not supported");
379380
}
@@ -393,7 +394,7 @@ class IntraProcessManager
393394
add_owned_msg_to_buffers(
394395
std::unique_ptr<MessageT, Deleter> message,
395396
std::vector<uint64_t> subscription_ids,
396-
std::shared_ptr<typename allocator::AllocRebind<MessageT, Alloc>::allocator_type> allocator)
397+
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
397398
{
398399
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
399400
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
@@ -406,12 +407,12 @@ class IntraProcessManager
406407
auto subscription_base = subscription_it->second.subscription.lock();
407408
if (subscription_base) {
408409
auto subscription = std::dynamic_pointer_cast<
409-
rclcpp::experimental::SubscriptionIntraProcess<MessageT, Alloc, Deleter>
410+
rclcpp::experimental::SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>
410411
>(subscription_base);
411412
if (nullptr == subscription) {
412413
throw std::runtime_error(
413414
"failed to dynamic cast SubscriptionIntraProcessBase to "
414-
"SubscriptionIntraProcess<MessageT, Alloc, Deleter>, which "
415+
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, which "
415416
"can happen when the publisher and subscription use different "
416417
"allocator types, which is not supported");
417418
}
@@ -423,8 +424,8 @@ class IntraProcessManager
423424
// Copy the message since we have additional subscriptions to serve
424425
MessageUniquePtr copy_message;
425426
Deleter deleter = message.get_deleter();
426-
auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1);
427-
MessageAllocTraits::construct(*allocator.get(), ptr, *message);
427+
auto ptr = MessageAllocTraits::allocate(allocator, 1);
428+
MessageAllocTraits::construct(allocator, ptr, *message);
428429
copy_message = MessageUniquePtr(ptr, deleter);
429430

430431
subscription->provide_intra_process_message(std::move(copy_message));

rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp

Lines changed: 29 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
3131
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
3232
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
33+
#include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
3334
#include "rclcpp/type_support_decl.hpp"
3435
#include "rclcpp/waitable.hpp"
3536
#include "tracetools/tracetools.h"
@@ -44,21 +45,27 @@ template<
4445
typename Alloc = std::allocator<void>,
4546
typename Deleter = std::default_delete<MessageT>,
4647
typename CallbackMessageT = MessageT>
47-
class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
48+
class SubscriptionIntraProcess
49+
: public SubscriptionIntraProcessBuffer<
50+
MessageT,
51+
Alloc,
52+
Deleter
53+
>
4854
{
49-
public:
50-
RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcess)
51-
52-
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
53-
using MessageAlloc = typename MessageAllocTraits::allocator_type;
54-
using ConstMessageSharedPtr = std::shared_ptr<const MessageT>;
55-
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
56-
57-
using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
55+
using SubscriptionIntraProcessBufferT = SubscriptionIntraProcessBuffer<
5856
MessageT,
5957
Alloc,
6058
Deleter
61-
>::UniquePtr;
59+
>;
60+
61+
public:
62+
RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcess)
63+
64+
using MessageAllocTraits = typename SubscriptionIntraProcessBufferT::MessageAllocTraits;
65+
using MessageAlloc = typename SubscriptionIntraProcessBufferT::MessageAlloc;
66+
using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstMessageSharedPtr;
67+
using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::MessageUniquePtr;
68+
using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr;
6269

6370
SubscriptionIntraProcess(
6471
AnySubscriptionCallback<CallbackMessageT, Alloc> callback,
@@ -67,31 +74,14 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
6774
const std::string & topic_name,
6875
rmw_qos_profile_t qos_profile,
6976
rclcpp::IntraProcessBufferType buffer_type)
70-
: SubscriptionIntraProcessBase(topic_name, qos_profile),
77+
: SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>(
78+
allocator,
79+
context,
80+
topic_name,
81+
qos_profile,
82+
buffer_type),
7183
any_callback_(callback)
7284
{
73-
if (!std::is_same<MessageT, CallbackMessageT>::value) {
74-
throw std::runtime_error("SubscriptionIntraProcess wrong callback type");
75-
}
76-
77-
// Create the intra-process buffer.
78-
buffer_ = rclcpp::experimental::create_intra_process_buffer<MessageT, Alloc, Deleter>(
79-
buffer_type,
80-
qos_profile,
81-
allocator);
82-
83-
// Create the guard condition.
84-
rcl_guard_condition_options_t guard_condition_options =
85-
rcl_guard_condition_get_default_options();
86-
87-
gc_ = rcl_get_zero_initialized_guard_condition();
88-
rcl_ret_t ret = rcl_guard_condition_init(
89-
&gc_, context->get_rcl_context().get(), guard_condition_options);
90-
91-
if (RCL_RET_OK != ret) {
92-
throw std::runtime_error("SubscriptionIntraProcess init error initializing guard condition");
93-
}
94-
9585
TRACEPOINT(
9686
rclcpp_subscription_callback_added,
9787
static_cast<const void *>(this),
@@ -104,22 +94,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
10494
#endif
10595
}
10696

107-
~SubscriptionIntraProcess()
108-
{
109-
if (rcl_guard_condition_fini(&gc_) != RCL_RET_OK) {
110-
RCUTILS_LOG_ERROR_NAMED(
111-
"rclcpp",
112-
"Failed to destroy guard condition: %s",
113-
rcutils_get_error_string().str);
114-
}
115-
}
116-
117-
bool
118-
is_ready(rcl_wait_set_t * wait_set)
119-
{
120-
(void) wait_set;
121-
return buffer_->has_data();
122-
}
97+
virtual ~SubscriptionIntraProcess() = default;
12398

12499
std::shared_ptr<void>
125100
take_data()
@@ -128,9 +103,9 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
128103
MessageUniquePtr unique_msg;
129104

130105
if (any_callback_.use_take_shared_method()) {
131-
shared_msg = buffer_->consume_shared();
106+
shared_msg = this->buffer_->consume_shared();
132107
} else {
133-
unique_msg = buffer_->consume_unique();
108+
unique_msg = this->buffer_->consume_unique();
134109
}
135110
return std::static_pointer_cast<void>(
136111
std::make_shared<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
@@ -141,37 +116,10 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
141116

142117
void execute(std::shared_ptr<void> & data)
143118
{
144-
execute_impl<CallbackMessageT>(data);
145-
}
146-
147-
void
148-
provide_intra_process_message(ConstMessageSharedPtr message)
149-
{
150-
buffer_->add_shared(std::move(message));
151-
trigger_guard_condition();
152-
}
153-
154-
void
155-
provide_intra_process_message(MessageUniquePtr message)
156-
{
157-
buffer_->add_unique(std::move(message));
158-
trigger_guard_condition();
159-
}
160-
161-
bool
162-
use_take_shared_method() const
163-
{
164-
return buffer_->use_take_shared_method();
165-
}
166-
167-
private:
168-
void
169-
trigger_guard_condition()
170-
{
171-
rcl_ret_t ret = rcl_trigger_guard_condition(&gc_);
172-
(void)ret;
119+
execute_impl<MessageT>(data);
173120
}
174121

122+
protected:
175123
template<typename T>
176124
typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value, void>::type
177125
execute_impl(std::shared_ptr<void> & data)
@@ -206,7 +154,6 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
206154
}
207155

208156
AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_;
209-
BufferUniquePtr buffer_;
210157
};
211158

212159
} // namespace experimental

0 commit comments

Comments
 (0)