From e70a07d0c0810e52719be73ebbac2d1bf4a5d9fc Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 12 Oct 2021 12:23:57 -0700 Subject: [PATCH] use dynamic_pointer_cast to detect allocator mismatch in intra process manager (backport #1643) (#1645) * use dynamic_pointer_cast to detect allocator mismatch in intra process manager (#1643) * use dynamic_pointer_cast to detect allocator mismatch in intra process manager Signed-off-by: William Woodall * add test case for mismatched allocators Signed-off-by: William Woodall * forward template arguments to avoid mismatched types in intra process manager Signed-off-by: William Woodall * style fixes Signed-off-by: William Woodall * refactor to test message address and count, more DRY Signed-off-by: William Woodall * update copyright Signed-off-by: William Woodall * fix typo Signed-off-by: William Woodall Co-authored-by: Michel Hidalgo Co-authored-by: Michel Hidalgo (cherry picked from commit 79c2dd8e8b3e73366c59474b91271a2fd57954bc) # Conflicts: # rclcpp/include/rclcpp/experimental/intra_process_manager.hpp * fix conflicts Signed-off-by: Ivan Santiago Paunovic * cpp14 compatibility Signed-off-by: Ivan Santiago Paunovic * More cpp14 compat Signed-off-by: Ivan Santiago Paunovic * fix Signed-off-by: Ivan Santiago Paunovic * Fix bug Signed-off-by: Ivan Santiago Paunovic Co-authored-by: William Woodall Co-authored-by: Ivan Santiago Paunovic --- .../experimental/intra_process_manager.hpp | 37 ++- rclcpp/test/rclcpp/CMakeLists.txt | 7 + .../rclcpp/test_intra_process_manager.cpp | 5 +- ..._intra_process_manager_with_allocators.cpp | 303 ++++++++++++++++++ 4 files changed, 341 insertions(+), 11 deletions(-) create mode 100644 rclcpp/test/rclcpp/test_intra_process_manager_with_allocators.cpp diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index d671de701e..b8bad62ae4 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -202,7 +202,8 @@ class IntraProcessManager // None of the buffers require ownership, so we promote the pointer std::shared_ptr msg = std::move(message); - this->template add_shared_msg_to_buffers(msg, sub_ids.take_shared_subscriptions); + this->template add_shared_msg_to_buffers( + msg, sub_ids.take_shared_subscriptions); } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT sub_ids.take_shared_subscriptions.size() <= 1) { @@ -227,7 +228,7 @@ class IntraProcessManager // for the buffers that do not require ownership auto shared_msg = std::allocate_shared(*allocator, *message); - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); this->template add_owned_msg_to_buffers( std::move(message), sub_ids.take_ownership_subscriptions, allocator); @@ -263,7 +264,7 @@ class IntraProcessManager // If there are no owning, just convert to shared. std::shared_ptr shared_msg = std::move(message); if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); } return shared_msg; @@ -273,7 +274,7 @@ class IntraProcessManager auto shared_msg = std::allocate_shared(*allocator, *message); if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); } @@ -350,7 +351,10 @@ class IntraProcessManager bool can_communicate(PublisherInfo pub_info, SubscriptionInfo sub_info) const; - template + template< + typename MessageT, + typename Alloc, + typename Deleter> void add_shared_msg_to_buffers( std::shared_ptr message, @@ -363,10 +367,16 @@ class IntraProcessManager } auto subscription_base = subscription_it->second.subscription; - auto subscription = std::static_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcess + auto subscription = std::dynamic_pointer_cast< + rclcpp::experimental::SubscriptionIntraProcess >(subscription_base); - + if (nullptr == subscription) { + throw std::runtime_error( + "failed to dynamic cast SubscriptionIntraProcessBase to " + "SubscriptionIntraProcess, which " + "can happen when the publisher and subscription use different " + "allocator types, which is not supported"); + } subscription->provide_intra_process_message(message); } } @@ -391,9 +401,16 @@ class IntraProcessManager } auto subscription_base = subscription_it->second.subscription; - auto subscription = std::static_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcess + auto subscription = std::dynamic_pointer_cast< + rclcpp::experimental::SubscriptionIntraProcess >(subscription_base); + if (nullptr == subscription) { + throw std::runtime_error( + "failed to dynamic cast SubscriptionIntraProcessBase to " + "SubscriptionIntraProcess, which " + "can happen when the publisher and subscription use different " + "allocator types, which is not supported"); + } if (std::next(it) == subscription_ids.end()) { // If this is the last subscription, give up ownership diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index 5e2c9ca4e1..a94e0cdd73 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -134,6 +134,13 @@ if(TARGET test_intra_process_manager) ) target_link_libraries(test_intra_process_manager ${PROJECT_NAME}) endif() +ament_add_gmock(test_intra_process_manager_with_allocators test_intra_process_manager_with_allocators.cpp) +if(TARGET test_intra_process_manager_with_allocators) + ament_target_dependencies(test_intra_process_manager_with_allocators + "test_msgs" + ) + target_link_libraries(test_intra_process_manager_with_allocators ${PROJECT_NAME}) +endif() ament_add_gtest(test_ring_buffer_implementation test_ring_buffer_implementation.cpp) if(TARGET test_ring_buffer_implementation) ament_target_dependencies(test_ring_buffer_implementation diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index b71fb1d061..ec7ccc7e5a 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -216,7 +216,10 @@ class SubscriptionIntraProcessBase const char * topic_name; }; -template +template< + typename MessageT, + typename Alloc = std::allocator, + typename Deleter = std::default_delete> class SubscriptionIntraProcess : public SubscriptionIntraProcessBase { public: diff --git a/rclcpp/test/rclcpp/test_intra_process_manager_with_allocators.cpp b/rclcpp/test/rclcpp/test_intra_process_manager_with_allocators.cpp new file mode 100644 index 0000000000..dd844eb82d --- /dev/null +++ b/rclcpp/test/rclcpp/test_intra_process_manager_with_allocators.cpp @@ -0,0 +1,303 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include + +#include "test_msgs/msg/empty.hpp" + +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/allocator/allocator_common.hpp" +#include "rclcpp/strategies/allocator_memory_strategy.hpp" + +// For demonstration purposes only, not necessary for allocator_traits +static uint32_t num_allocs = 0; +static uint32_t num_deallocs = 0; +// A very simple custom allocator. Counts calls to allocate and deallocate. +template +struct MyAllocator +{ +public: + using value_type = T; + using size_type = std::size_t; + using pointer = T *; + using const_pointer = const T *; + using difference_type = typename std::pointer_traits::difference_type; + + MyAllocator() noexcept + { + } + + ~MyAllocator() noexcept {} + + template + MyAllocator(const MyAllocator &) noexcept + { + } + + T * allocate(size_t size, const void * = 0) + { + if (size == 0) { + return nullptr; + } + num_allocs++; + return static_cast(std::malloc(size * sizeof(T))); + } + + void deallocate(T * ptr, size_t size) + { + (void)size; + if (!ptr) { + return; + } + num_deallocs++; + std::free(ptr); + } + + template + struct rebind + { + typedef MyAllocator other; + }; +}; + +template +constexpr bool operator==( + const MyAllocator &, + const MyAllocator &) noexcept +{ + return true; +} + +template +constexpr bool operator!=( + const MyAllocator &, + const MyAllocator &) noexcept +{ + return false; +} + +template< + typename ExpectedExceptionT, + typename PublisherT, + typename FutureT, + typename MessageT, + typename ExpectedMessagePtr, + typename std::enable_if::value, int>::type = 0> +void check_exception( + PublisherT & publisher, rclcpp::Executor & executor, FutureT received_message_future, + uint32_t & counter, MessageT msg, ExpectedMessagePtr expected_ptr) +{ + // no exception expected + EXPECT_NO_THROW( + { + publisher->publish(std::move(msg)); + executor.spin_until_future_complete(received_message_future, std::chrono::seconds(10)); + }); + EXPECT_EQ(expected_ptr, received_message_future.get().get()); + EXPECT_EQ(1u, counter); +} + +template< + typename ExpectedExceptionT, + typename PublisherT, + typename FutureT, + typename MessageT, + typename ExpectedMessagePtr, + typename std::enable_if::value, int>::type = 0> +void check_exception( + PublisherT & publisher, rclcpp::Executor & executor, FutureT received_message_future, + uint32_t counter, MessageT msg, ExpectedMessagePtr expected_ptr) +{ + (void)counter; + (void)expected_ptr; + // exception expected + EXPECT_THROW( + { + publisher->publish(std::move(msg)); + executor.spin_until_future_complete(received_message_future, std::chrono::seconds(10)); + }, ExpectedExceptionT); +} + +template< + typename PublishedMessageAllocatorT, + typename PublisherAllocatorT, + typename SubscribedMessageAllocatorT, + typename SubscriptionAllocatorT, + typename MessageMemoryStrategyAllocatorT, + typename MemoryStrategyAllocatorT, + typename ExpectedExceptionT +> +void +do_custom_allocator_test( + PublishedMessageAllocatorT published_message_allocator, + PublisherAllocatorT publisher_allocator, + SubscribedMessageAllocatorT /* subscribed_message_allocator */, // intentionally unused + SubscriptionAllocatorT subscription_allocator, + MessageMemoryStrategyAllocatorT message_memory_strategy, + MemoryStrategyAllocatorT memory_strategy_allocator) +{ + using PublishedMessageAllocTraits = + rclcpp::allocator::AllocRebind; + using PublishedMessageAlloc = typename PublishedMessageAllocTraits::allocator_type; + using PublishedMessageDeleter = + rclcpp::allocator::Deleter; + + using SubscribedMessageAllocTraits = + rclcpp::allocator::AllocRebind; + using SubscribedMessageAlloc = typename SubscribedMessageAllocTraits::allocator_type; + using SubscribedMessageDeleter = + rclcpp::allocator::Deleter; + + // init and node + auto context = std::make_shared(); + context->init(0, nullptr); + auto node = std::make_shared( + "custom_allocator_test", + rclcpp::NodeOptions().context(context).use_intra_process_comms(true)); + + // publisher + auto shared_publisher_allocator = std::make_shared(publisher_allocator); + rclcpp::PublisherOptionsWithAllocator publisher_options; + publisher_options.allocator = shared_publisher_allocator; + auto publisher = + node->create_publisher("custom_allocator_test", 10, publisher_options); + + // callback for subscription + uint32_t counter = 0; + std::promise> received_message; + auto received_message_future = received_message.get_future().share(); + auto callback = + [&counter, &received_message]( + std::unique_ptr msg) + { + ++counter; + received_message.set_value(std::move(msg)); + }; + + // subscription + auto shared_subscription_allocator = + std::make_shared(subscription_allocator); + rclcpp::SubscriptionOptionsWithAllocator subscription_options; + subscription_options.allocator = shared_subscription_allocator; + auto shared_message_strategy_allocator = + std::make_shared(message_memory_strategy); + auto msg_mem_strat = std::make_shared< + rclcpp::message_memory_strategy::MessageMemoryStrategy< + test_msgs::msg::Empty, + MessageMemoryStrategyAllocatorT + > + >(shared_message_strategy_allocator); + using CallbackMessageT = + typename rclcpp::subscription_traits::has_message_type::type; + auto subscriber = node->create_subscription< + test_msgs::msg::Empty, + decltype(callback), + SubscriptionAllocatorT, + CallbackMessageT, + rclcpp::Subscription, + rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, + MessageMemoryStrategyAllocatorT + > + >( + "custom_allocator_test", + 10, + std::forward(callback), + subscription_options, + msg_mem_strat); + + // executor memory strategy + using rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy; + auto shared_memory_strategy_allocator = std::make_shared( + memory_strategy_allocator); + std::shared_ptr memory_strategy = + std::make_shared>( + shared_memory_strategy_allocator); + + // executor + rclcpp::ExecutorOptions options; + options.memory_strategy = memory_strategy; + options.context = context; + rclcpp::executors::SingleThreadedExecutor executor(options); + + executor.add_node(node); + + // rebind message_allocator to ensure correct type + PublishedMessageDeleter message_deleter; + PublishedMessageAlloc rebound_message_allocator = published_message_allocator; + rclcpp::allocator::set_allocator_for_deleter(&message_deleter, &rebound_message_allocator); + + // allocate a message + auto ptr = PublishedMessageAllocTraits::allocate(rebound_message_allocator, 1); + PublishedMessageAllocTraits::construct(rebound_message_allocator, ptr); + std::unique_ptr msg(ptr, message_deleter); + + // publisher and receive + check_exception( + publisher, executor, received_message_future, counter, std::move(msg), ptr); +} + +/* + This tests the case where a custom allocator is used correctly, i.e. the same + custom allocator on both sides. + */ +TEST(TestIntraProcessManagerWithAllocators, custom_allocator) { + using PublishedMessageAllocatorT = std::allocator; + using PublisherAllocatorT = std::allocator; + using SubscribedMessageAllocatorT = std::allocator; + using SubscriptionAllocatorT = std::allocator; + using MessageMemoryStrategyAllocatorT = std::allocator; + using MemoryStrategyAllocatorT = std::allocator; + auto allocator = std::allocator(); + do_custom_allocator_test< + PublishedMessageAllocatorT, + PublisherAllocatorT, + SubscribedMessageAllocatorT, + SubscriptionAllocatorT, + MessageMemoryStrategyAllocatorT, + MemoryStrategyAllocatorT, + void // no exception expected + >(allocator, allocator, allocator, allocator, allocator, allocator); +} + +/* + This tests the case where a custom allocator is used incorrectly, i.e. different + custom allocators on both sides. + */ +TEST(TestIntraProcessManagerWithAllocators, custom_allocator_wrong) { + // explicitly use a different allocator here to provoke a failure + using PublishedMessageAllocatorT = std::allocator; + using PublisherAllocatorT = std::allocator; + using SubscribedMessageAllocatorT = MyAllocator; + using SubscriptionAllocatorT = MyAllocator; + using MessageMemoryStrategyAllocatorT = MyAllocator; + using MemoryStrategyAllocatorT = std::allocator; + auto allocator = std::allocator(); + auto my_allocator = MyAllocator(); + do_custom_allocator_test< + PublishedMessageAllocatorT, + PublisherAllocatorT, + SubscribedMessageAllocatorT, + SubscriptionAllocatorT, + MessageMemoryStrategyAllocatorT, + MemoryStrategyAllocatorT, + std::runtime_error // expected exception + >(allocator, allocator, my_allocator, my_allocator, my_allocator, allocator); +}