Skip to content

Commit

Permalink
Fix closing multicast UDP channel with whitelist [5732] (#569)
Browse files Browse the repository at this point in the history
* Refs #5714. Added unit test.

* Refs #3463. On releasing UDP resources, if the closing of the UDP fails 10 times, shut it down manually.

* Refactor without sending close message

* Refs #5714 remove UDPTransportInterface::ReleaseInputChannel

* Wait until the UDPChannelResource::perform_listen_operation has joined before deleting the channel resource

* Refs #5714 Simplify channel resource release mechanism. Eliminates condition variable and mutex
  • Loading branch information
MiguelCompany authored and raquelalvarezbanos committed Jun 25, 2019
1 parent bd1161a commit 10eff0a
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 128 deletions.
7 changes: 1 addition & 6 deletions include/fastrtps/transport/UDPChannelResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ class UDPChannelResource : public ChannelResource
ChannelResource::disable();
}

void release(
const Locator_t& locator,
const asio::ip::address& address);
void release();

protected:
/**
Expand Down Expand Up @@ -175,9 +173,6 @@ class UDPChannelResource : public ChannelResource
bool only_multicast_purpose_;
std::string interface_;
UDPTransportInterface* transport_;
bool closing_;
std::mutex mtx_closing_;
std::condition_variable cv_closing_;

UDPChannelResource(const UDPChannelResource&) = delete;
UDPChannelResource& operator=(const UDPChannelResource&) = delete;
Expand Down
3 changes: 0 additions & 3 deletions include/fastrtps/transport/UDPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ class UDPTransportInterface : public TransportInterface
SendResourceList& sender_resource_list,
const Locator_t&) override;

//! Release the listening socket for the specified port.
bool ReleaseInputChannel(const Locator_t& locator, const asio::ip::address& interface_address);

/**
* Converts a given remote locator (that is, a locator referring to a remote
* destination) to the main local locator whose channel can write to that
Expand Down
48 changes: 14 additions & 34 deletions src/cpp/transport/UDPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ UDPChannelResource::UDPChannelResource(
, only_multicast_purpose_(false)
, interface_(sInterface)
, transport_(transport)
, closing_(false)
{
thread(std::thread(&UDPChannelResource::perform_listen_operation, this, locator));
}
Expand Down Expand Up @@ -68,6 +67,8 @@ void UDPChannelResource::perform_listen_operation(Locator_t input_locator)
logWarning(RTPS_MSG_IN, "Received Message, but no receiver attached");
}
}

message_receiver(nullptr);
}

bool UDPChannelResource::Receive(
Expand All @@ -84,15 +85,9 @@ bool UDPChannelResource::Receive(
receive_buffer_size = static_cast<uint32_t>(bytes);
if (receive_buffer_size > 0)
{
// This is not necessary anymore but it's left here for back compatibility with versions older than 1.8.1
if (receive_buffer_size == 13 && memcmp(receive_buffer, "EPRORTPSCLOSE", 13) == 0)
{
if (!alive())
{
std::lock_guard<std::mutex> lock(mtx_closing_);
closing_ = true;
message_receiver(nullptr);
cv_closing_.notify_all();
}
return false;
}
transport_->endpoint_to_locator(senderEndpoint, remote_locator);
Expand All @@ -102,38 +97,23 @@ bool UDPChannelResource::Receive(
catch (const std::exception& error)
{
(void)error;
logWarning(RTPS_MSG_OUT, "Error receiving data: " << error.what());
std::cout << "+++ERROR: " << error.what() << " - " << message_receiver() << " (" << this << ")" << std::endl;
logWarning(RTPS_MSG_OUT, "Error receiving data: " << error.what() << " - " << message_receiver()
<< " (" << this << ")");
return false;
}
}

void UDPChannelResource::release(
const Locator_t& locator,
const asio::ip::address& address)
void UDPChannelResource::release()
{
if (!address.is_multicast())
{
std::unique_lock<std::mutex> lock(mtx_closing_);
uint32_t tries_ = 0;
while (!closing_)
{
transport_->ReleaseInputChannel(locator, address);
cv_closing_.wait_for(lock, std::chrono::milliseconds(5),
[this]{
return closing_;
});
++tries_;
if (tries_ == 10)
{
logError(UDPChannelResource, "After " << tries_ << " retries UDP Socket doesn't close. Aborting.");
socket()->cancel();
closing_ = true;
message_receiver(nullptr);
}
}
}
// Cancel all asynchronous operations associated with the socket.
socket()->cancel();
// Disable receives on the socket.
// shutdown always returns a 'shutdown: Transport endpoint is not connected' error,
// since the endpoint is indeed not connected. However, it unblocks the synchronous receive
// in Windows and Linux anyways, which is what we want.
asio::error_code ec;
socket()->shutdown(asio::socket_base::shutdown_type::shutdown_receive, ec);
// On OSX shutdown does not unblock the listening thread, but close does.
socket()->close();
}

Expand Down
87 changes: 4 additions & 83 deletions src/cpp/transport/UDPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,28 +84,12 @@ bool UDPTransportInterface::CloseInputChannel(const Locator_t& locator)

}

std::map<UDPChannelResource*, asio::ip::address> addresses;
// It may sound redundant, but we must mark all the related channel to be killed first.
// Mostly in Windows, but in Linux can happen too, if we access to the endpoint
// of an already closed socket we get an exception. So we store the interface address to
// be used in the ReleaseInputChannel call later.
for (UDPChannelResource* channel_resource : channel_resources)
{
if (channel_resource->alive())
{
addresses[channel_resource] = channel_resource->socket()->local_endpoint().address();
}
else
{
addresses[channel_resource] = asio::ip::address();
}
channel_resource->disable();
}

// Then we release the channels
// We now disable and release the channels
for (UDPChannelResource* channel : channel_resources)
{
channel->release(locator, addresses[channel]);
channel->disable();
channel->release();
channel->clear();
delete channel;
}

Expand Down Expand Up @@ -373,69 +357,6 @@ bool UDPTransportInterface::OpenOutputChannel(
return true;
}

bool UDPTransportInterface::ReleaseInputChannel(const Locator_t& locator, const asio::ip::address& interface_address)
{
try
{
asio::error_code ec;
socket_base::message_flags flags = 0;
uint16_t port = IPLocator::getPhysicalPort(locator);

if(is_interface_whitelist_empty())
{
Locator_t localLocator;
fill_local_ip(localLocator);

ip::udp::socket socket(io_service_);
socket.open(generate_protocol());
socket.bind(generate_local_endpoint(localLocator, 0));
socket.set_option(ip::multicast::enable_loopback(true));

// We first send directly to localhost, in case all network interfaces are disabled
// (which would mean that multicast traffic may not be sent)
// We ignore the error message because some OS don't allow this functionality like Windows (WSAENETUNREACH) or Mac (EADDRNOTAVAIL)
auto localEndpoint = generate_local_endpoint(localLocator, port);
socket.send_to(asio::buffer("EPRORTPSCLOSE", 13), localEndpoint, flags, ec);

// We then send to the address of the input locator
auto destinationEndpoint = generate_local_endpoint(locator, port);

// We ignore the error message because some OS don't allow this functionality like Windows (WSAENETUNREACH) or Mac (EADDRNOTAVAIL)
socket.send_to(asio::buffer("EPRORTPSCLOSE", 13), destinationEndpoint,flags, ec);

socket.close();
}
else if (!interface_address.is_multicast())
{
ip::udp::socket socket(io_service_);
socket.open(generate_protocol());
auto bound_endpoint = asio::ip::udp::endpoint(interface_address, 0);
socket.bind(bound_endpoint);
socket.set_option(ip::multicast::enable_loopback(true));
SetSocketOutboundInterface(socket, bound_endpoint.address().to_string());

// We ignore the error message because some OS don't allow this functionality like Windows (WSAENETUNREACH) or Mac (EADDRNOTAVAIL)
auto localEndpoint = ip::udp::endpoint(interface_address, port);
socket.send_to(asio::buffer("EPRORTPSCLOSE", 13), localEndpoint, flags, ec);

// We then send to the address of the input locator
auto destinationEndpoint = generate_local_endpoint(locator, port);

// We ignore the error message because some OS don't allow this functionality like Windows (WSAENETUNREACH) or Mac (EADDRNOTAVAIL)
socket.send_to(asio::buffer("EPRORTPSCLOSE", 13), destinationEndpoint, flags, ec);

socket.close();
}
}
catch (const std::exception& error)
{
logError(RTPS_MSG_OUT, "Error " << error.what());
return false;
}

return true;
}

Locator_t UDPTransportInterface::RemoteToMainLocal(const Locator_t& remote) const
{
if (!IsLocatorSupported(remote))
Expand Down
31 changes: 29 additions & 2 deletions test/unittest/transport/UDPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ TEST_F(UDPv4Tests, send_to_allowed_interface)

if (IsAddressDefined(locator))
{
descriptor.interfaceWhiteList.emplace_back("127.0.0.1");
descriptor.interfaceWhiteList.emplace_back(IPLocator::toIPv4string(locator));
UDPv4Transport transportUnderTest(descriptor);
transportUnderTest.init();
Expand All @@ -351,7 +350,6 @@ TEST_F(UDPv4Tests, send_to_allowed_interface)
IPLocator::setIPv4(remoteMulticastLocator, 239, 255, 1, 4); // Loopback

// Sending through a ALLOWED IP will work
IPLocator::setIPv4(outputChannelLocator, 127, 0, 0, 1);
std::vector<octet> message = { 'H','e','l','l','o' };
ASSERT_TRUE(send_resource_list.at(0)->send(message.data(), (uint32_t)message.size(),
remoteMulticastLocator));
Expand Down Expand Up @@ -521,6 +519,35 @@ TEST_F(UDPv4Tests, send_and_receive_between_allowed_sockets_using_unicast_to_mul
senderThread->join();
sem.wait();
}

TEST_F(UDPv4Tests, open_and_close_two_multicast_transports_with_whitelist)
{
std::vector<IPFinder::info_IP> interfaces;
GetIP4s(interfaces);

if (interfaces.size() > 0)
{
descriptor.interfaceWhiteList.push_back(interfaces.at(0).name);

UDPv4Transport transport1(descriptor);
UDPv4Transport transport2(descriptor);
transport1.init();
transport2.init();

Locator_t multicastLocator;
multicastLocator.port = g_default_port;
multicastLocator.kind = LOCATOR_KIND_UDPv4;
IPLocator::setIPv4(multicastLocator, "239.255.1.4");

std::cout << "Opening input channels" << std::endl;
ASSERT_TRUE(transport1.OpenInputChannel(multicastLocator, nullptr, 65500));
ASSERT_TRUE(transport2.OpenInputChannel(multicastLocator, nullptr, 65500));
std::cout << "Closing input channel on transport 1" << std::endl;
ASSERT_TRUE(transport1.CloseInputChannel(multicastLocator));
std::cout << "Closing input channel on transport 2" << std::endl;
ASSERT_TRUE(transport2.CloseInputChannel(multicastLocator));
}
}
#endif

TEST_F(UDPv4Tests, open_a_blocked_socket)
Expand Down

0 comments on commit 10eff0a

Please sign in to comment.