Skip to content

Commit

Permalink
vSomeIP 2.6.4
Browse files Browse the repository at this point in the history
  • Loading branch information
juergengehring committed Jun 20, 2017
1 parent cf67875 commit 2769830
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 110 deletions.
4 changes: 4 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,7 @@ v2.6.3
- Introduce 'max-payload-size-reliable' json file parameter which can be used to
globally limit the maximum allowed payload size for TCP communication
- Added CRC checksum calculation for bit optimized messages

v2.6.4
- Fix bug in reboot detection of other nodes
- Improve restarting of TCP connections
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ project (vsomeip)

set (VSOMEIP_MAJOR_VERSION 2)
set (VSOMEIP_MINOR_VERSION 6)
set (VSOMEIP_PATCH_VERSION 3)
set (VSOMEIP_PATCH_VERSION 4)
set (VSOMEIP_VERSION ${VSOMEIP_MAJOR_VERSION}.${VSOMEIP_MINOR_VERSION}.${VSOMEIP_PATCH_VERSION})
set (PACKAGE_VERSION ${VSOMEIP_VERSION}) # Used in documentatin/doxygen.in
set (CMAKE_VERBOSE_MAKEFILE off)
Expand Down
6 changes: 4 additions & 2 deletions implementation/endpoints/include/client_endpoint_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class client_endpoint_impl: public endpoint_impl<Protocol>, public client_endpoi
bool flush();

void stop();
void restart();
virtual void restart() = 0;

bool is_client() const;

Expand All @@ -68,9 +68,11 @@ class client_endpoint_impl: public endpoint_impl<Protocol>, public client_endpoi
protected:
virtual void send_queued() = 0;
void shutdown_and_close_socket();
void shutdown_and_close_socket_unlocked();
void start_connect_timer();

mutable std::mutex socket_mutex_;
socket_type socket_;
std::unique_ptr<socket_type> socket_;
endpoint_type remote_;

boost::asio::steady_timer flush_timer_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class tcp_client_endpoint_impl: public tcp_client_endpoint_base_impl {
virtual ~tcp_client_endpoint_impl();

void start();
void restart();

bool get_remote_address(boost::asio::ip::address &_address) const;
unsigned short get_local_port() const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class udp_client_endpoint_impl: virtual public udp_client_endpoint_base_impl {
virtual ~udp_client_endpoint_impl();

void start();
void restart();

void receive_cbk(boost::system::error_code const &_error,
std::size_t _bytes);
Expand Down
52 changes: 21 additions & 31 deletions implementation/endpoints/src/client_endpoint_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ client_endpoint_impl<Protocol>::client_endpoint_impl(
boost::asio::io_service &_io,
std::uint32_t _max_message_size)
: endpoint_impl<Protocol>(_host, _local, _io, _max_message_size),
socket_(_io), remote_(_remote),
socket_(new socket_type(_io)), remote_(_remote),
flush_timer_(_io), connect_timer_(_io),
connect_timeout_(VSOMEIP_DEFAULT_CONNECT_TIMEOUT), // TODO: use config variable
is_connected_(false),
Expand Down Expand Up @@ -70,7 +70,7 @@ void client_endpoint_impl<Protocol>::stop() {
bool is_open(false);
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
is_open = socket_.is_open();
is_open = socket_->is_open();
}
if (is_open) {
bool send_queue_empty(false);
Expand All @@ -91,24 +91,6 @@ void client_endpoint_impl<Protocol>::stop() {
shutdown_and_close_socket();
}

template<typename Protocol>
void client_endpoint_impl<Protocol>::restart() {
{
std::lock_guard<std::mutex> its_lock(mutex_);
queue_.clear();
}
shutdown_and_close_socket();
is_connected_ = false;
{
std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
connect_timer_.expires_from_now(
std::chrono::milliseconds(connect_timeout_));
connect_timer_.async_wait(
std::bind(&client_endpoint_impl<Protocol>::wait_connect_cbk,
this->shared_from_this(), std::placeholders::_1));
}
}

template<typename Protocol>
bool client_endpoint_impl<Protocol>::send_to(
const std::shared_ptr<endpoint_definition> _target, const byte_t *_data,
Expand Down Expand Up @@ -212,14 +194,7 @@ void client_endpoint_impl<Protocol>::connect_cbk(
if (its_host) {
if (_error && _error != boost::asio::error::already_connected) {
shutdown_and_close_socket();
{
std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
connect_timer_.expires_from_now(
std::chrono::milliseconds(connect_timeout_));
connect_timer_.async_wait(
std::bind(&client_endpoint_impl<Protocol>::wait_connect_cbk,
this->shared_from_this(), std::placeholders::_1));
}
start_connect_timer();
// Double the timeout as long as the maximum allowed is larger
if (connect_timeout_ < VSOMEIP_MAX_CONNECT_TIMEOUT)
connect_timeout_ = (connect_timeout_ << 1);
Expand Down Expand Up @@ -302,10 +277,15 @@ void client_endpoint_impl<Protocol>::flush_cbk(
template<typename Protocol>
void client_endpoint_impl<Protocol>::shutdown_and_close_socket() {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
if (socket_.is_open()) {
shutdown_and_close_socket_unlocked();
}

template<typename Protocol>
void client_endpoint_impl<Protocol>::shutdown_and_close_socket_unlocked() {
if (socket_->is_open()) {
boost::system::error_code its_error;
socket_.shutdown(Protocol::socket::shutdown_both, its_error);
socket_.close(its_error);
socket_->shutdown(Protocol::socket::shutdown_both, its_error);
socket_->close(its_error);
}
}

Expand All @@ -321,6 +301,16 @@ unsigned short client_endpoint_impl<Protocol>::get_remote_port() const {
return 0;
}

template<typename Protocol>
void client_endpoint_impl<Protocol>::start_connect_timer() {
std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
connect_timer_.expires_from_now(
std::chrono::milliseconds(connect_timeout_));
connect_timer_.async_wait(
std::bind(&client_endpoint_impl<Protocol>::wait_connect_cbk,
this->shared_from_this(), std::placeholders::_1));
}

// Instantiate template
#ifndef _WIN32
template class client_endpoint_impl<boost::asio::local::stream_protocol>;
Expand Down
23 changes: 15 additions & 8 deletions implementation/endpoints/src/local_client_endpoint_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,18 @@ bool local_client_endpoint_impl::is_local() const {
}

void local_client_endpoint_impl::restart() {
is_connected_ = false;
{
std::lock_guard<std::mutex> its_lock(mutex_);
sending_blocked_ = false;
queue_.clear();
}
client_endpoint_impl::restart();
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
shutdown_and_close_socket_unlocked();
socket_.reset(new socket_type(service_));
}
start_connect_timer();
}

void local_client_endpoint_impl::start() {
Expand All @@ -60,23 +67,23 @@ void local_client_endpoint_impl::connect() {
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::system::error_code its_error;
socket_.open(remote_.protocol(), its_error);
socket_->open(remote_.protocol(), its_error);

if (!its_error || its_error == boost::asio::error::already_open) {
socket_.set_option(boost::asio::socket_base::reuse_address(true), its_error);
socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "local_client_endpoint_impl::connect: "
<< "couldn't enable SO_REUSEADDR: " << its_error.message();
}
socket_.connect(remote_, its_connect_error);
socket_->connect(remote_, its_connect_error);

// Credentials
#ifndef _WIN32
if (!its_connect_error) {
auto its_host = host_.lock();
if (its_host) {
if (its_host->get_configuration()->is_security_enabled()) {
credentials::send_credentials(socket_.native(),
credentials::send_credentials(socket_->native(),
its_host->get_client());
}
}
Expand All @@ -102,8 +109,8 @@ void local_client_endpoint_impl::connect() {
void local_client_endpoint_impl::receive() {
#ifndef _WIN32
std::lock_guard<std::mutex> its_lock(socket_mutex_);
if (socket_.is_open()) {
socket_.async_receive(
if (socket_->is_open()) {
socket_->async_receive(
boost::asio::buffer(recv_buffer_),
std::bind(
&local_client_endpoint_impl::receive_cbk,
Expand Down Expand Up @@ -146,7 +153,7 @@ VSOMEIP_INFO << msg.str();
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::asio::async_write(
socket_,
*socket_,
bufs,
std::bind(
&client_endpoint_impl::send_cbk,
Expand Down
57 changes: 40 additions & 17 deletions implementation/endpoints/src/tcp_client_endpoint_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,37 @@ void tcp_client_endpoint_impl::start() {
connect();
}

void tcp_client_endpoint_impl::restart() {
is_connected_ = false;
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
shutdown_and_close_socket_unlocked();
recv_buffer_size_ = 0;
recv_buffer_.resize(recv_buffer_size_initial_, 0x0);
recv_buffer_.shrink_to_fit();
socket_.reset(new socket_type(service_));
}
{
std::lock_guard<std::mutex> its_lock(mutex_);
queue_.clear();
}
start_connect_timer();
}

void tcp_client_endpoint_impl::connect() {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::system::error_code its_error;
socket_.open(remote_.protocol(), its_error);
socket_->open(remote_.protocol(), its_error);

if (!its_error || its_error == boost::asio::error::already_open) {
// Nagle algorithm off
socket_.set_option(ip::tcp::no_delay(true), its_error);
socket_->set_option(ip::tcp::no_delay(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't disable "
<< "Nagle algorithm: " << its_error.message();
}

socket_.set_option(boost::asio::socket_base::keep_alive(true), its_error);
socket_->set_option(boost::asio::socket_base::keep_alive(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't enable "
<< "keep_alive: " << its_error.message();
Expand All @@ -76,7 +93,7 @@ void tcp_client_endpoint_impl::connect() {
// Enable SO_REUSEADDR to avoid bind problems with services going offline
// and coming online again and the user has specified only a small number
// of ports in the clients section for one service instance
socket_.set_option(boost::asio::socket_base::reuse_address(true), its_error);
socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: couldn't enable "
<< "SO_REUSEADDR: " << its_error.message();
Expand All @@ -85,14 +102,14 @@ void tcp_client_endpoint_impl::connect() {
// bind to it before connecting
if (local_.port() != ILLEGAL_PORT) {
boost::system::error_code its_bind_error;
socket_.bind(local_, its_bind_error);
socket_->bind(local_, its_bind_error);
if(its_bind_error) {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
"Error binding socket: " << its_bind_error.message();
}
}

socket_.async_connect(
socket_->async_connect(
remote_,
std::bind(
&tcp_client_endpoint_base_impl::connect_cbk,
Expand All @@ -108,7 +125,7 @@ void tcp_client_endpoint_impl::connect() {

void tcp_client_endpoint_impl::receive() {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
if(socket_.is_open()) {
if(socket_->is_open()) {
const std::size_t its_capacity(recv_buffer_.capacity());
size_t buffer_size = its_capacity - recv_buffer_size_;
try {
Expand Down Expand Up @@ -137,7 +154,7 @@ void tcp_client_endpoint_impl::receive() {
// don't start receiving again
return;
}
socket_.async_receive(
socket_->async_receive(
boost::asio::buffer(&recv_buffer_[recv_buffer_size_], buffer_size),
std::bind(
&tcp_client_endpoint_impl::receive_cbk,
Expand Down Expand Up @@ -172,7 +189,7 @@ void tcp_client_endpoint_impl::send_queued() {
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::asio::async_write(
socket_,
*socket_,
boost::asio::buffer(*its_buffer),
std::bind(
&tcp_client_endpoint_base_impl::send_cbk,
Expand All @@ -196,8 +213,8 @@ bool tcp_client_endpoint_impl::get_remote_address(
unsigned short tcp_client_endpoint_impl::get_local_port() const {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
boost::system::error_code its_error;
if (socket_.is_open()) {
endpoint_type its_endpoint = socket_.local_endpoint(its_error);
if (socket_->is_open()) {
endpoint_type its_endpoint = socket_->local_endpoint(its_error);
if (!its_error) {
return its_endpoint.port();
} else {
Expand Down Expand Up @@ -248,6 +265,10 @@ void tcp_client_endpoint_impl::receive_cbk(
<< (int) recv_buffer_[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
std::unique_lock<std::mutex> its_lock(socket_mutex_);
if (!is_connected_) {
return;
}
std::shared_ptr<endpoint_host> its_host = host_.lock();
if (its_host) {
if (!_error && 0 < _bytes) {
Expand Down Expand Up @@ -369,14 +390,16 @@ void tcp_client_endpoint_impl::receive_cbk(
missing_capacity_ = 0;
}
}
its_lock.unlock();
receive();
} else {
if (_error == boost::asio::error::connection_reset ||
_error == boost::asio::error::eof ||
_error == boost::asio::error::timed_out) {
VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk error detected: " << _error.message();
shutdown_and_close_socket();
shutdown_and_close_socket_unlocked();
} else {
its_lock.unlock();
receive();
}
}
Expand Down Expand Up @@ -413,8 +436,8 @@ const std::string tcp_client_endpoint_impl::get_address_port_local() const {
std::string its_address_port;
its_address_port.reserve(21);
boost::system::error_code ec;
if (socket_.is_open()) {
endpoint_type its_local_endpoint = socket_.local_endpoint(ec);
if (socket_->is_open()) {
endpoint_type its_local_endpoint = socket_->local_endpoint(ec);
if (!ec) {
its_address_port += its_local_endpoint.address().to_string(ec);
its_address_port += ":";
Expand Down Expand Up @@ -455,10 +478,10 @@ void tcp_client_endpoint_impl::handle_recv_buffer_exception(
boost::system::error_code ec;
connect_timer_.cancel(ec);
}
if (socket_.is_open()) {
if (socket_->is_open()) {
boost::system::error_code its_error;
socket_.shutdown(socket_type::shutdown_both, its_error);
socket_.close(its_error);
socket_->shutdown(socket_type::shutdown_both, its_error);
socket_->close(its_error);
}
}

Expand Down
Loading

0 comments on commit 2769830

Please sign in to comment.