Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TCP infinite loop, client shutdown and reconnection [13540][13718][13721] #2470

Merged
merged 11 commits into from
Feb 10, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ void HelloWorldSubscriber::run(
uint32_t number)
{
std::cout << "[RTCP] Subscriber running until " << number << "samples have been received" << std::endl;
while (number < this->listener_.samples_)
while (number > this->listener_.samples_)
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
Expand Down
6 changes: 3 additions & 3 deletions src/cpp/rtps/transport/TCPChannelResourceBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,13 @@ void TCPChannelResourceBasic::disconnect()
{
auto socket = socket_;

std::error_code ec;
socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec);

service_.post([&, socket]()
{
try
{
std::error_code ec;
socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
socket->cancel();

// This method was added on the version 1.12.0
Expand All @@ -125,7 +126,6 @@ void TCPChannelResourceBasic::disconnect()
{
}
});

}
}

Expand Down
35 changes: 31 additions & 4 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ bool TCPTransportInterface::CloseInputChannel(
void TCPTransportInterface::close_tcp_socket(
std::shared_ptr<TCPChannelResource>& channel)
{
channel->disable();
channel->disconnect();
// channel.reset(); lead to race conditions because TransportInterface functions used in the callbacks doesn't check validity.
}

Expand Down Expand Up @@ -614,6 +614,20 @@ bool TCPTransportInterface::OpenOutputChannel(
//TODO Review with wan ip.
if (tcp_sender_resource && physical_locator == tcp_sender_resource->channel()->locator())
{
// Look for an existing channel that matches this physical locator
auto existing_channel = channel_resources_.find(physical_locator);
// If the channel exists, check if the channel reference in the sender resource needs to be updated with
// the found channel
if (existing_channel != channel_resources_.end() &&
existing_channel->second != tcp_sender_resource->channel())
{
// Disconnect the old channel
tcp_sender_resource->channel()->disconnect();
tcp_sender_resource->channel()->clear();
// Update sender resource with new channel
tcp_sender_resource->channel() = existing_channel->second;
}
// Add logical port to channel if it's not there yet
if (!tcp_sender_resource->channel()->is_logical_port_added(logical_port))
{
tcp_sender_resource->channel()->add_logical_port(logical_port, rtcp_message_manager_.get());
Expand Down Expand Up @@ -911,6 +925,10 @@ bool receive_header(
{
return false;
}
else if (!channel->connection_status())
{
return false;
}
}

bytes_needed = TCPHeader::size() - 4;
Expand All @@ -926,6 +944,10 @@ bool receive_header(
{
return false;
}
else if (!channel->connection_status())
{
return false;
}
}

return true;
Expand Down Expand Up @@ -961,17 +983,22 @@ bool TCPTransportInterface::Receive(
do
{
header_found = receive_header(channel, tcp_header, ec);
} while (!header_found && !ec);
} while (!header_found && !ec && channel->connection_status());

if (ec)
{
if (ec != asio::error::eof)
{
logWarning(DEBUG, "Error reading TCP header: " << ec.message());
logWarning(DEBUG, "Failed to read TCP header: " << ec.message());
}
close_tcp_socket(channel);
success = false;
}
else if (!channel->connection_status())
{
logWarning(DEBUG, "Failed to read TCP header: channel disconnected while reading.");
success = false;
}
else
{
size_t body_size = tcp_header.length - static_cast<uint32_t>(TCPHeader::size());
Expand Down Expand Up @@ -1351,7 +1378,7 @@ void TCPTransportInterface::SocketConnected(
}
else
{
channel->disable();
channel->disconnect();
}
}
}
Expand Down
61 changes: 61 additions & 0 deletions test/blackbox/common/BlackboxTestsTransportTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,67 @@ TEST_P(TransportTCP, TCPv6_copy)
EXPECT_EQ(tcpv6_transport_copy, tcpv6_transport);
}

// Test connection is successfully restablished after dropping and relaunching a TCP client (requester)
// Issue -> https://github.com/eProsima/Fast-DDS/issues/2409
TEST(TransportTCP, Client_reconnection)
{
TCPReqRepHelloWorldReplier* replier;
TCPReqRepHelloWorldRequester* requester;
const uint16_t nmsgs = 5;

replier = new TCPReqRepHelloWorldReplier;
replier->init(1, 0, global_port);

ASSERT_TRUE(replier->isInitialized());

requester = new TCPReqRepHelloWorldRequester;
requester->init(0, 0, global_port);

ASSERT_TRUE(requester->isInitialized());

// Wait for discovery.
replier->wait_discovery();
requester->wait_discovery();

ASSERT_TRUE(replier->is_matched());
ASSERT_TRUE(requester->is_matched());

for (uint16_t count = 0; count < nmsgs; ++count)
{
requester->send(count);
requester->block();
}

// Release TCP client resources.
delete requester;

// Wait until unmatched.
replier->wait_unmatched();
ASSERT_FALSE(replier->is_matched());

// Create new TCP client instance.
requester = new TCPReqRepHelloWorldRequester;
requester->init(0, 0, global_port);

ASSERT_TRUE(requester->isInitialized());

// Wait for discovery.
replier->wait_discovery();
requester->wait_discovery();

ASSERT_TRUE(replier->is_matched());
ASSERT_TRUE(requester->is_matched());

for (uint16_t count = 0; count < nmsgs; ++count)
{
requester->send(count);
requester->block();
}

delete replier;
delete requester;
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
35 changes: 35 additions & 0 deletions test/blackbox/common/TCPReqRepHelloWorldReplier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,31 @@ void TCPReqRepHelloWorldReplier::wait_discovery(
std::cout << "Replier discovery phase finished" << std::endl;
}

void TCPReqRepHelloWorldReplier::wait_unmatched(
std::chrono::seconds timeout)
{
std::unique_lock<std::mutex> lock(mutexDiscovery_);

std::cout << "Replier waiting until being unmatched..." << std::endl;

if (timeout == std::chrono::seconds::zero())
{
cvDiscovery_.wait(lock, [&]()
{
return !is_matched();
});
}
else
{
cvDiscovery_.wait_for(lock, timeout, [&]()
{
return !is_matched();
});
}

std::cout << "Replier unmatched" << std::endl;
}

void TCPReqRepHelloWorldReplier::matched()
{
std::unique_lock<std::mutex> lock(mutexDiscovery_);
Expand All @@ -186,6 +211,16 @@ void TCPReqRepHelloWorldReplier::matched()
}
}

void TCPReqRepHelloWorldReplier::unmatched()
{
std::unique_lock<std::mutex> lock(mutexDiscovery_);
--matched_;
if (!is_matched())
{
cvDiscovery_.notify_one();
}
}

bool TCPReqRepHelloWorldReplier::is_matched()
{
return matched_ > 1;
Expand Down
7 changes: 7 additions & 0 deletions test/blackbox/common/TCPReqRepHelloWorldReplier.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ class TCPReqRepHelloWorldReplier
{
replier_.matched();
}
else if (info.status == eprosima::fastrtps::rtps::REMOVED_MATCHING)
{
replier_.unmatched();
}
}

private:
Expand Down Expand Up @@ -132,7 +136,10 @@ class TCPReqRepHelloWorldReplier
uint16_t number);
void wait_discovery(
std::chrono::seconds timeout = std::chrono::seconds::zero());
void wait_unmatched(
std::chrono::seconds timeout = std::chrono::seconds::zero());
void matched();
void unmatched();
bool is_matched();

virtual void configSubscriber(
Expand Down
6 changes: 6 additions & 0 deletions test/blackbox/common/TCPReqRepHelloWorldRequester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ void TCPReqRepHelloWorldRequester::matched()
}
}

void TCPReqRepHelloWorldRequester::unmatched()
{
std::unique_lock<std::mutex> lock(mutexDiscovery_);
--matched_;
}

bool TCPReqRepHelloWorldRequester::is_matched()
{
return matched_ > 1;
Expand Down
5 changes: 5 additions & 0 deletions test/blackbox/common/TCPReqRepHelloWorldRequester.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class TCPReqRepHelloWorldRequester
{
requester_.matched();
}
else if (info.status == eprosima::fastrtps::rtps::REMOVED_MATCHING)
{
requester_.unmatched();
}
}

private:
Expand Down Expand Up @@ -136,6 +140,7 @@ class TCPReqRepHelloWorldRequester
void wait_discovery(
std::chrono::seconds timeout = std::chrono::seconds::zero());
void matched();
void unmatched();
void send(
const uint16_t number);
bool is_matched();
Expand Down
3 changes: 3 additions & 0 deletions test/dds/communication/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ list(APPEND TEST_DEFINITIONS
zero_copy_pub_communication
zero_copy_sub_communication
mix_zero_copy_communication
close_TCP_client
)


Expand All @@ -118,6 +119,8 @@ list(APPEND XML_CONFIGURATION_FILES
simple_besteffort.xml
simple_reliable_zerocopy.xml
simple_besteffort_zerocopy.xml
TCP_server.xml
TCP_client.xml
)
# Python file
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test_build.py
Expand Down
3 changes: 1 addition & 2 deletions test/dds/communication/PublisherModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ bool PublisherModule::init(
{
std::cout << "Initializing Publisher" << std::endl;

DomainParticipantQos participant_qos;
participant_ =
DomainParticipantFactory::get_instance()->create_participant(seed % 230, participant_qos, this);
DomainParticipantFactory::get_instance()->create_participant(seed % 230, PARTICIPANT_QOS_DEFAULT, this);

if (participant_ == nullptr)
{
Expand Down
4 changes: 2 additions & 2 deletions test/dds/communication/SubscriberModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ bool SubscriberModule::init(
<< StatusMask::data_available()
<< StatusMask::liveliness_changed();

DomainParticipantQos participant_qos;
participant_ =
DomainParticipantFactory::get_instance()->create_participant(seed % 230, participant_qos, this, mask);
DomainParticipantFactory::get_instance()->create_participant(seed % 230, PARTICIPANT_QOS_DEFAULT, this,
mask);

if (participant_ == nullptr)
{
Expand Down
30 changes: 30 additions & 0 deletions test/dds/communication/TCP_client.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="utf-8" ?>
<dds xmlns="http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles">
<profiles>
<transport_descriptors>
<transport_descriptor>
<transport_id>tcp_transport_client</transport_id>
<type>TCPv4</type>
</transport_descriptor>
</transport_descriptors>

<participant profile_name="TCPClient" is_default_profile="true">
<rtps>
<userTransports>
<transport_id>tcp_transport_client</transport_id>
</userTransports>
<useBuiltinTransports>false</useBuiltinTransports>
<builtin>
<initialPeersList>
<locator>
<tcpv4>
<address>127.0.0.1</address>
<physical_port>5100</physical_port>
</tcpv4>
</locator>
</initialPeersList>
</builtin>
</rtps>
</participant>
</profiles>
</dds>
24 changes: 24 additions & 0 deletions test/dds/communication/TCP_server.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="utf-8" ?>
<dds xmlns="http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles">
<profiles>
<transport_descriptors>
<transport_descriptor>
<transport_id>tcp_transport_server</transport_id>
<type>TCPv4</type>
<listening_ports>
<port>5100</port>
</listening_ports>
<wan_addr>127.0.0.1</wan_addr>
</transport_descriptor>
</transport_descriptors>

<participant profile_name="TCPServer" is_default_profile="true">
<rtps>
<userTransports>
<transport_id>tcp_transport_server</transport_id>
</userTransports>
<useBuiltinTransports>false</useBuiltinTransports>
</rtps>
</participant>
</profiles>
</dds>
Loading