From 18483c2515f09957c12b94b00e5282bb3453fc80 Mon Sep 17 00:00:00 2001 From: Shane Loretz Date: Tue, 24 Nov 2020 09:06:18 -0800 Subject: [PATCH] Revert "Discriminate when the Client has gone from when the Client has not completely matched (#479)" (#489) This reverts commit c78c31a825932d9cc6682120d06e6fcea733a3bb. Signed-off-by: Shane Loretz --- rmw_fastrtps_cpp/src/rmw_service.cpp | 2 +- .../custom_service_info.hpp | 206 ++++-------------- rmw_fastrtps_shared_cpp/src/rmw_response.cpp | 9 +- 3 files changed, 49 insertions(+), 168 deletions(-) diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index 8bfbf012c..7fd67d478 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -241,7 +241,7 @@ rmw_create_service( delete info->pub_listener_; } }); - info->pub_listener_ = new (std::nothrow) PatchedServicePubListener(); + info->pub_listener_ = new (std::nothrow) ServicePubListener(); if (!info->pub_listener_) { RMW_SET_ERROR_MSG("failed to create service response publisher listener"); return nullptr; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index 1839e5a77..e2e7ff86a 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -20,7 +20,6 @@ #include #include #include -#include #include "fastcdr/FastBuffer.h" @@ -38,15 +37,6 @@ class ServiceListener; class ServicePubListener; -class PatchedServicePubListener; - -enum class client_present_t -{ - FAILURE, // an error occurred when checking - MAYBE, // reader not matched, writer still present - YES, // reader matched - GONE // neither reader nor writer -}; typedef struct CustomServiceInfo { @@ -72,132 +62,6 @@ typedef struct CustomServiceRequest : buffer_(nullptr) {} } CustomServiceRequest; -class ServicePubListener : public eprosima::fastrtps::PublisherListener -{ -public: - ServicePubListener() = default; - - template - bool wait_for_subscription( - const eprosima::fastrtps::rtps::GUID_t & guid, - const std::chrono::duration & rel_time) - { - auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool - { - return subscriptions_.find(guid) != subscriptions_.end(); - }; - - std::unique_lock lock(mutex_); - return cv_.wait_for(lock, rel_time, guid_is_present); - } - - void onPublicationMatched( - eprosima::fastrtps::Publisher * pub, - eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) - { - (void) pub; - std::lock_guard lock(mutex_); - if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) { - subscriptions_.insert(matchingInfo.remoteEndpointGuid); - } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { - subscriptions_.erase(matchingInfo.remoteEndpointGuid); - } else { - return; - } - cv_.notify_all(); - } - -protected: - std::mutex & getMutex() - { - return mutex_; - } - - std::unordered_set< - eprosima::fastrtps::rtps::GUID_t, - rmw_fastrtps_shared_cpp::hash_fastrtps_guid> & - getSubscriptions() - { - return subscriptions_; - } - - std::condition_variable & getConditionVariable() - { - return cv_; - } - -private: - using subscriptions_set_t = - std::unordered_set; - - std::mutex mutex_; - subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); - std::condition_variable cv_; -}; - -// Wrapper around ServicePubListener to fix issue when matching clients in an ABI compatible way -// See original patch: https://github.com/ros2/rmw_fastrtps/pull/467 -class PatchedServicePubListener : public ServicePubListener -{ -public: - using clients_endpoints_map_t = - std::unordered_map; - - void onPublicationMatched( - eprosima::fastrtps::Publisher * pub, - eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) - { - (void) pub; - std::lock_guard lock(getMutex()); - std::unordered_set< - eprosima::fastrtps::rtps::GUID_t, - rmw_fastrtps_shared_cpp::hash_fastrtps_guid> & subscriptions_ = getSubscriptions(); - std::condition_variable & cv_ = getConditionVariable(); - if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) { - subscriptions_.insert(matchingInfo.remoteEndpointGuid); - } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { - subscriptions_.erase(matchingInfo.remoteEndpointGuid); - auto endpoint = clients_endpoints_.find(matchingInfo.remoteEndpointGuid); - if (endpoint != clients_endpoints_.end()) { - clients_endpoints_.erase(endpoint->second); - clients_endpoints_.erase(matchingInfo.remoteEndpointGuid); - } - } else { - return; - } - cv_.notify_all(); - } - - client_present_t - check_for_subscription( - const eprosima::fastrtps::rtps::GUID_t & guid) - { - // Check if the guid is still in the map - if (clients_endpoints_.find(guid) == clients_endpoints_.end()) { - // Client is gone - return client_present_t::GONE; - } - // Wait for subscription - if (!wait_for_subscription(guid, std::chrono::milliseconds(100))) { - return client_present_t::MAYBE; - } - return client_present_t::YES; - } - - // Accesors - clients_endpoints_map_t & clients_endpoints() - { - std::lock_guard lock(getMutex()); - return clients_endpoints_; - } - -private: - clients_endpoints_map_t clients_endpoints_; -}; - class ServiceListener : public eprosima::fastrtps::SubscriberListener { public: @@ -208,23 +72,6 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener (void)info_; } - void - onSubscriptionMatched( - eprosima::fastrtps::Subscriber * sub, - eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) - { - (void) sub; - PatchedServicePubListener * pub_listener = static_cast( - info_->pub_listener_); - if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { - auto endpoint = pub_listener->clients_endpoints().find( - matchingInfo.remoteEndpointGuid); - if (endpoint != pub_listener->clients_endpoints().end()) { - pub_listener->clients_endpoints().erase(endpoint->second); - pub_listener->clients_endpoints().erase(matchingInfo.remoteEndpointGuid); - } - } - } void onNewDataMessage(eprosima::fastrtps::Subscriber * sub) @@ -248,14 +95,6 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener request.sample_identity_.writer_guid() = reader_guid; } - // Save both guids in the clients_endpoints map - PatchedServicePubListener * pub_listener = - static_cast(info_->pub_listener_); - const eprosima::fastrtps::rtps::GUID_t & writer_guid = - request.sample_info_.sample_identity.writer_guid(); - pub_listener->clients_endpoints().emplace(reader_guid, writer_guid); - pub_listener->clients_endpoints().emplace(writer_guid, reader_guid); - std::lock_guard lock(internalMutex_); if (conditionMutex_ != nullptr) { @@ -330,4 +169,49 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); }; +class ServicePubListener : public eprosima::fastrtps::PublisherListener +{ +public: + ServicePubListener() = default; + + template + bool wait_for_subscription( + const eprosima::fastrtps::rtps::GUID_t & guid, + const std::chrono::duration & rel_time) + { + auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool + { + return subscriptions_.find(guid) != subscriptions_.end(); + }; + + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, rel_time, guid_is_present); + } + + void onPublicationMatched( + eprosima::fastrtps::Publisher * pub, + eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) + { + (void) pub; + std::lock_guard lock(mutex_); + if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) { + subscriptions_.insert(matchingInfo.remoteEndpointGuid); + } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { + subscriptions_.erase(matchingInfo.remoteEndpointGuid); + } else { + return; + } + cv_.notify_all(); + } + +private: + using subscriptions_set_t = + std::unordered_set; + + std::mutex mutex_; + subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + std::condition_variable cv_; +}; + #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp index d4a4d14ef..4ee96af24 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp @@ -118,13 +118,10 @@ __rmw_send_response( if ((related_guid.entityId.value[3] & entity_id_is_reader_bit) != 0) { // Related guid is a reader, so it is the response subscription guid. // Wait for the response writer to be matched with it. - auto listener = static_cast(info->pub_listener_); - client_present_t ret = listener->check_for_subscription(related_guid); - if (ret == client_present_t::GONE) { - return RMW_RET_OK; - } else if (ret == client_present_t::MAYBE) { + auto listener = info->pub_listener_; + if (!listener->wait_for_subscription(related_guid, std::chrono::milliseconds(100))) { RMW_SET_ERROR_MSG("client will not receive response"); - return RMW_RET_TIMEOUT; + return RMW_RET_ERROR; } }