From 10eff0affd954f4f4e755492a3c7b460a1772784 Mon Sep 17 00:00:00 2001 From: MiguelCompany Date: Tue, 25 Jun 2019 14:45:07 +0200 Subject: [PATCH] Fix closing multicast UDP channel with whitelist [5732] (#569) * Refs #5714. Added unit test. * Refs #3463. On releasing UDP resources, if the closing of the UDP fails 10 times, shut it down manually. * Refactor without sending close message * Refs #5714 remove UDPTransportInterface::ReleaseInputChannel * Wait until the UDPChannelResource::perform_listen_operation has joined before deleting the channel resource * Refs #5714 Simplify channel resource release mechanism. Eliminates condition variable and mutex --- .../fastrtps/transport/UDPChannelResource.h | 7 +- .../transport/UDPTransportInterface.h | 3 - src/cpp/transport/UDPChannelResource.cpp | 48 +++------- src/cpp/transport/UDPTransportInterface.cpp | 87 +------------------ test/unittest/transport/UDPv4Tests.cpp | 31 ++++++- 5 files changed, 48 insertions(+), 128 deletions(-) diff --git a/include/fastrtps/transport/UDPChannelResource.h b/include/fastrtps/transport/UDPChannelResource.h index 0d88d93bba4..2a106d6aef4 100644 --- a/include/fastrtps/transport/UDPChannelResource.h +++ b/include/fastrtps/transport/UDPChannelResource.h @@ -141,9 +141,7 @@ class UDPChannelResource : public ChannelResource ChannelResource::disable(); } - void release( - const Locator_t& locator, - const asio::ip::address& address); + void release(); protected: /** @@ -175,9 +173,6 @@ class UDPChannelResource : public ChannelResource bool only_multicast_purpose_; std::string interface_; UDPTransportInterface* transport_; - bool closing_; - std::mutex mtx_closing_; - std::condition_variable cv_closing_; UDPChannelResource(const UDPChannelResource&) = delete; UDPChannelResource& operator=(const UDPChannelResource&) = delete; diff --git a/include/fastrtps/transport/UDPTransportInterface.h b/include/fastrtps/transport/UDPTransportInterface.h index cf8aab517bf..5253c7e4416 100644 --- a/include/fastrtps/transport/UDPTransportInterface.h +++ b/include/fastrtps/transport/UDPTransportInterface.h @@ -64,9 +64,6 @@ class UDPTransportInterface : public TransportInterface SendResourceList& sender_resource_list, const Locator_t&) override; - //! Release the listening socket for the specified port. - bool ReleaseInputChannel(const Locator_t& locator, const asio::ip::address& interface_address); - /** * Converts a given remote locator (that is, a locator referring to a remote * destination) to the main local locator whose channel can write to that diff --git a/src/cpp/transport/UDPChannelResource.cpp b/src/cpp/transport/UDPChannelResource.cpp index 454c2e48301..1c3a4d7e9b2 100644 --- a/src/cpp/transport/UDPChannelResource.cpp +++ b/src/cpp/transport/UDPChannelResource.cpp @@ -35,7 +35,6 @@ UDPChannelResource::UDPChannelResource( , only_multicast_purpose_(false) , interface_(sInterface) , transport_(transport) - , closing_(false) { thread(std::thread(&UDPChannelResource::perform_listen_operation, this, locator)); } @@ -68,6 +67,8 @@ void UDPChannelResource::perform_listen_operation(Locator_t input_locator) logWarning(RTPS_MSG_IN, "Received Message, but no receiver attached"); } } + + message_receiver(nullptr); } bool UDPChannelResource::Receive( @@ -84,15 +85,9 @@ bool UDPChannelResource::Receive( receive_buffer_size = static_cast(bytes); if (receive_buffer_size > 0) { + // This is not necessary anymore but it's left here for back compatibility with versions older than 1.8.1 if (receive_buffer_size == 13 && memcmp(receive_buffer, "EPRORTPSCLOSE", 13) == 0) { - if (!alive()) - { - std::lock_guard lock(mtx_closing_); - closing_ = true; - message_receiver(nullptr); - cv_closing_.notify_all(); - } return false; } transport_->endpoint_to_locator(senderEndpoint, remote_locator); @@ -102,38 +97,23 @@ bool UDPChannelResource::Receive( catch (const std::exception& error) { (void)error; - logWarning(RTPS_MSG_OUT, "Error receiving data: " << error.what()); - std::cout << "+++ERROR: " << error.what() << " - " << message_receiver() << " (" << this << ")" << std::endl; + logWarning(RTPS_MSG_OUT, "Error receiving data: " << error.what() << " - " << message_receiver() + << " (" << this << ")"); return false; } } -void UDPChannelResource::release( - const Locator_t& locator, - const asio::ip::address& address) +void UDPChannelResource::release() { - if (!address.is_multicast()) - { - std::unique_lock lock(mtx_closing_); - uint32_t tries_ = 0; - while (!closing_) - { - transport_->ReleaseInputChannel(locator, address); - cv_closing_.wait_for(lock, std::chrono::milliseconds(5), - [this]{ - return closing_; - }); - ++tries_; - if (tries_ == 10) - { - logError(UDPChannelResource, "After " << tries_ << " retries UDP Socket doesn't close. Aborting."); - socket()->cancel(); - closing_ = true; - message_receiver(nullptr); - } - } - } + // Cancel all asynchronous operations associated with the socket. socket()->cancel(); + // Disable receives on the socket. + // shutdown always returns a 'shutdown: Transport endpoint is not connected' error, + // since the endpoint is indeed not connected. However, it unblocks the synchronous receive + // in Windows and Linux anyways, which is what we want. + asio::error_code ec; + socket()->shutdown(asio::socket_base::shutdown_type::shutdown_receive, ec); + // On OSX shutdown does not unblock the listening thread, but close does. socket()->close(); } diff --git a/src/cpp/transport/UDPTransportInterface.cpp b/src/cpp/transport/UDPTransportInterface.cpp index ae06a0a600f..90432c9eb96 100644 --- a/src/cpp/transport/UDPTransportInterface.cpp +++ b/src/cpp/transport/UDPTransportInterface.cpp @@ -84,28 +84,12 @@ bool UDPTransportInterface::CloseInputChannel(const Locator_t& locator) } - std::map addresses; - // It may sound redundant, but we must mark all the related channel to be killed first. - // Mostly in Windows, but in Linux can happen too, if we access to the endpoint - // of an already closed socket we get an exception. So we store the interface address to - // be used in the ReleaseInputChannel call later. - for (UDPChannelResource* channel_resource : channel_resources) - { - if (channel_resource->alive()) - { - addresses[channel_resource] = channel_resource->socket()->local_endpoint().address(); - } - else - { - addresses[channel_resource] = asio::ip::address(); - } - channel_resource->disable(); - } - - // Then we release the channels + // We now disable and release the channels for (UDPChannelResource* channel : channel_resources) { - channel->release(locator, addresses[channel]); + channel->disable(); + channel->release(); + channel->clear(); delete channel; } @@ -373,69 +357,6 @@ bool UDPTransportInterface::OpenOutputChannel( return true; } -bool UDPTransportInterface::ReleaseInputChannel(const Locator_t& locator, const asio::ip::address& interface_address) -{ - try - { - asio::error_code ec; - socket_base::message_flags flags = 0; - uint16_t port = IPLocator::getPhysicalPort(locator); - - if(is_interface_whitelist_empty()) - { - Locator_t localLocator; - fill_local_ip(localLocator); - - ip::udp::socket socket(io_service_); - socket.open(generate_protocol()); - socket.bind(generate_local_endpoint(localLocator, 0)); - socket.set_option(ip::multicast::enable_loopback(true)); - - // We first send directly to localhost, in case all network interfaces are disabled - // (which would mean that multicast traffic may not be sent) - // We ignore the error message because some OS don't allow this functionality like Windows (WSAENETUNREACH) or Mac (EADDRNOTAVAIL) - auto localEndpoint = generate_local_endpoint(localLocator, port); - socket.send_to(asio::buffer("EPRORTPSCLOSE", 13), localEndpoint, flags, ec); - - // We then send to the address of the input locator - auto destinationEndpoint = generate_local_endpoint(locator, port); - - // We ignore the error message because some OS don't allow this functionality like Windows (WSAENETUNREACH) or Mac (EADDRNOTAVAIL) - socket.send_to(asio::buffer("EPRORTPSCLOSE", 13), destinationEndpoint,flags, ec); - - socket.close(); - } - else if (!interface_address.is_multicast()) - { - ip::udp::socket socket(io_service_); - socket.open(generate_protocol()); - auto bound_endpoint = asio::ip::udp::endpoint(interface_address, 0); - socket.bind(bound_endpoint); - socket.set_option(ip::multicast::enable_loopback(true)); - SetSocketOutboundInterface(socket, bound_endpoint.address().to_string()); - - // We ignore the error message because some OS don't allow this functionality like Windows (WSAENETUNREACH) or Mac (EADDRNOTAVAIL) - auto localEndpoint = ip::udp::endpoint(interface_address, port); - socket.send_to(asio::buffer("EPRORTPSCLOSE", 13), localEndpoint, flags, ec); - - // We then send to the address of the input locator - auto destinationEndpoint = generate_local_endpoint(locator, port); - - // We ignore the error message because some OS don't allow this functionality like Windows (WSAENETUNREACH) or Mac (EADDRNOTAVAIL) - socket.send_to(asio::buffer("EPRORTPSCLOSE", 13), destinationEndpoint, flags, ec); - - socket.close(); - } - } - catch (const std::exception& error) - { - logError(RTPS_MSG_OUT, "Error " << error.what()); - return false; - } - - return true; -} - Locator_t UDPTransportInterface::RemoteToMainLocal(const Locator_t& remote) const { if (!IsLocatorSupported(remote)) diff --git a/test/unittest/transport/UDPv4Tests.cpp b/test/unittest/transport/UDPv4Tests.cpp index 6d1b42965f8..d9c91a7200e 100644 --- a/test/unittest/transport/UDPv4Tests.cpp +++ b/test/unittest/transport/UDPv4Tests.cpp @@ -332,7 +332,6 @@ TEST_F(UDPv4Tests, send_to_allowed_interface) if (IsAddressDefined(locator)) { - descriptor.interfaceWhiteList.emplace_back("127.0.0.1"); descriptor.interfaceWhiteList.emplace_back(IPLocator::toIPv4string(locator)); UDPv4Transport transportUnderTest(descriptor); transportUnderTest.init(); @@ -351,7 +350,6 @@ TEST_F(UDPv4Tests, send_to_allowed_interface) IPLocator::setIPv4(remoteMulticastLocator, 239, 255, 1, 4); // Loopback // Sending through a ALLOWED IP will work - IPLocator::setIPv4(outputChannelLocator, 127, 0, 0, 1); std::vector message = { 'H','e','l','l','o' }; ASSERT_TRUE(send_resource_list.at(0)->send(message.data(), (uint32_t)message.size(), remoteMulticastLocator)); @@ -521,6 +519,35 @@ TEST_F(UDPv4Tests, send_and_receive_between_allowed_sockets_using_unicast_to_mul senderThread->join(); sem.wait(); } + +TEST_F(UDPv4Tests, open_and_close_two_multicast_transports_with_whitelist) +{ + std::vector interfaces; + GetIP4s(interfaces); + + if (interfaces.size() > 0) + { + descriptor.interfaceWhiteList.push_back(interfaces.at(0).name); + + UDPv4Transport transport1(descriptor); + UDPv4Transport transport2(descriptor); + transport1.init(); + transport2.init(); + + Locator_t multicastLocator; + multicastLocator.port = g_default_port; + multicastLocator.kind = LOCATOR_KIND_UDPv4; + IPLocator::setIPv4(multicastLocator, "239.255.1.4"); + + std::cout << "Opening input channels" << std::endl; + ASSERT_TRUE(transport1.OpenInputChannel(multicastLocator, nullptr, 65500)); + ASSERT_TRUE(transport2.OpenInputChannel(multicastLocator, nullptr, 65500)); + std::cout << "Closing input channel on transport 1" << std::endl; + ASSERT_TRUE(transport1.CloseInputChannel(multicastLocator)); + std::cout << "Closing input channel on transport 2" << std::endl; + ASSERT_TRUE(transport2.CloseInputChannel(multicastLocator)); + } +} #endif TEST_F(UDPv4Tests, open_a_blocked_socket)