Skip to content

Commit

Permalink
UDP engine aborts on networking-related errors from socket syscalls (2)
Browse files Browse the repository at this point in the history
#2862 (#3640)

* UDP engine aborts on networking-related errors from socket syscalls #2862
  • Loading branch information
atomashpolskiy authored and bluca committed Aug 25, 2019
1 parent 7559d2d commit 2aa87c9
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 53 deletions.
29 changes: 14 additions & 15 deletions src/ip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,20 +234,12 @@ int zmq::bind_to_device (fd_t s_, const std::string &bound_device_)
#ifdef ZMQ_HAVE_SO_BINDTODEVICE
int rc = setsockopt (s_, SOL_SOCKET, SO_BINDTODEVICE,
bound_device_.c_str (), bound_device_.length ());

#ifdef ZMQ_HAVE_WINDOWS
if (rc != SOCKET_ERROR)
return 0;
const int lastError = WSAGetLastError ();
errno = wsa_error_to_errno (lastError);
wsa_assert (lastError != WSAENOTSOCK);
return -1;
#else
if (rc == 0)
if (rc != 0) {
assert_success_or_recoverable (s_, rc);
return -1;
} else {
return 0;
errno_assert (errno != ENOTSOCK);
return -1;
#endif
}
#else
LIBZMQ_UNUSED (s_);
LIBZMQ_UNUSED (bound_device_);
Expand Down Expand Up @@ -682,10 +674,17 @@ void zmq::make_socket_noninheritable (fd_t sock_)
#endif
}

void zmq::assert_socket_tuning_error (zmq::fd_t s_, int rc_)
void zmq::assert_success_or_recoverable (zmq::fd_t s_, int rc_)
{
if (rc_ == 0)
#ifdef ZMQ_HAVE_WINDOWS
if (rc_ != SOCKET_ERROR) {
return;
}
#else
if (rc_ != -1) {
return;
}
#endif

// Check whether an error occurred
int err = 0;
Expand Down
7 changes: 4 additions & 3 deletions src/ip.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ int make_fdpair (fd_t *r_, fd_t *w_);
// Asserts on any failure.
void make_socket_noninheritable (fd_t sock_);

// Asserts that an internal error did not occur. Does not assert
// on network errors such as reset or aborted connections.
void assert_socket_tuning_error (fd_t s_, int rc_);
// Asserts that:
// - an internal 0MQ error did not occur,
// - and, if a socket error occured, it can be recovered from.
void assert_success_or_recoverable (fd_t s_, int rc_);
}

#endif
24 changes: 12 additions & 12 deletions src/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ int zmq::tune_tcp_socket (fd_t s_)
int nodelay = 1;
int rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELAY,
reinterpret_cast<char *> (&nodelay), sizeof (int));
assert_socket_tuning_error (s_, rc);
assert_success_or_recoverable (s_, rc);
if (rc != 0)
return rc;

Expand All @@ -71,7 +71,7 @@ int zmq::tune_tcp_socket (fd_t s_)
int nodelack = 1;
rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char *) &nodelack,
sizeof (int));
assert_socket_tuning_error (s_, rc);
assert_success_or_recoverable (s_, rc);
#endif
return rc;
}
Expand All @@ -81,7 +81,7 @@ int zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_)
const int rc =
setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF,
reinterpret_cast<char *> (&bufsize_), sizeof bufsize_);
assert_socket_tuning_error (sockfd_, rc);
assert_success_or_recoverable (sockfd_, rc);
return rc;
}

Expand All @@ -90,7 +90,7 @@ int zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_)
const int rc =
setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF,
reinterpret_cast<char *> (&bufsize_), sizeof bufsize_);
assert_socket_tuning_error (sockfd_, rc);
assert_success_or_recoverable (sockfd_, rc);
return rc;
}

Expand Down Expand Up @@ -123,7 +123,7 @@ int zmq::tune_tcp_keepalives (fd_t s_,
int rc = WSAIoctl (s_, SIO_KEEPALIVE_VALS, &keepalive_opts,
sizeof (keepalive_opts), NULL, 0,
&num_bytes_returned, NULL, NULL);
assert_socket_tuning_error (s_, rc);
assert_success_or_recoverable (s_, rc);
if (rc == SOCKET_ERROR)
return rc;
}
Expand All @@ -133,15 +133,15 @@ int zmq::tune_tcp_keepalives (fd_t s_,
int rc =
setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE,
reinterpret_cast<char *> (&keepalive_), sizeof (int));
assert_socket_tuning_error (s_, rc);
assert_success_or_recoverable (s_, rc);
if (rc != 0)
return rc;

#ifdef ZMQ_HAVE_TCP_KEEPCNT
if (keepalive_cnt_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT, &keepalive_cnt_,
sizeof (int));
assert_socket_tuning_error (s_, rc);
assert_success_or_recoverable (s_, rc);
if (rc != 0)
return rc;
}
Expand All @@ -151,7 +151,7 @@ int zmq::tune_tcp_keepalives (fd_t s_,
if (keepalive_idle_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE,
&keepalive_idle_, sizeof (int));
assert_socket_tuning_error (s_, rc);
assert_success_or_recoverable (s_, rc);
if (rc != 0)
return rc;
}
Expand All @@ -160,7 +160,7 @@ int zmq::tune_tcp_keepalives (fd_t s_,
if (keepalive_idle_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE,
&keepalive_idle_, sizeof (int));
assert_socket_tuning_error (s_, rc);
assert_success_or_recoverable (s_, rc);
if (rc != 0)
return rc;
}
Expand All @@ -171,7 +171,7 @@ int zmq::tune_tcp_keepalives (fd_t s_,
if (keepalive_intvl_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL,
&keepalive_intvl_, sizeof (int));
assert_socket_tuning_error (s_, rc);
assert_success_or_recoverable (s_, rc);
if (rc != 0)
return rc;
}
Expand All @@ -196,13 +196,13 @@ int zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_)
int rc =
setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT,
reinterpret_cast<char *> (&timeout_), sizeof (timeout_));
assert_socket_tuning_error (sockfd_, rc);
assert_success_or_recoverable (sockfd_, rc);
return rc;
// FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT
#elif defined(TCP_USER_TIMEOUT)
int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_,
sizeof (timeout_));
assert_socket_tuning_error (sockfd_, rc);
assert_success_or_recoverable (sockfd_, rc);
return rc;
#else
return 0;
Expand Down
73 changes: 50 additions & 23 deletions src/udp_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,14 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
int rc = 0;

// Bind the socket to a device if applicable
if (!_options.bound_device.empty ())
if (!_options.bound_device.empty ()) {
rc = rc | bind_to_device (_fd, _options.bound_device);
if (rc != 0) {
assert_success_or_recoverable (_fd, rc);
error (connection_error);
return;
}
}

if (_send_enabled) {
if (!_options.raw_socket) {
Expand Down Expand Up @@ -162,9 +168,7 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
if (multicast) {
// Multicast addresses should be allowed to bind to more than
// one port as all ports should receive the message
#ifdef SO_REUSEPORT
rc = rc | set_udp_reuse_port (_fd, true);
#endif

// In multicast we should bind ANY and use the mreq struct to
// specify the interface
Expand All @@ -175,6 +179,11 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
real_bind_addr = bind_addr;
}

if (rc != 0) {
error (protocol_error);
return;
}

#ifdef ZMQ_HAVE_VXWORKS
rc = rc
| bind (_fd, (sockaddr *) real_bind_addr->as_sockaddr (),
Expand All @@ -184,6 +193,11 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
| bind (_fd, real_bind_addr->as_sockaddr (),
real_bind_addr->sockaddr_len ());
#endif
if (rc != 0) {
assert_success_or_recoverable (_fd, rc);
error (connection_error);
return;
}

if (multicast) {
rc = rc | add_membership (_fd, udp_addr);
Expand Down Expand Up @@ -224,7 +238,7 @@ int zmq::udp_engine_t::set_udp_multicast_loop (fd_t s_,
int loop = loop_ ? 1 : 0;
int rc = setsockopt (s_, level, optname, reinterpret_cast<char *> (&loop),
sizeof (loop));
assert_socket_tuning_error (s_, rc);
assert_success_or_recoverable (s_, rc);
return rc;
}

Expand All @@ -240,7 +254,7 @@ int zmq::udp_engine_t::set_udp_multicast_ttl (fd_t s_, bool is_ipv6_, int hops_)

int rc = setsockopt (s_, level, IP_MULTICAST_TTL,
reinterpret_cast<char *> (&hops_), sizeof (hops_));
assert_socket_tuning_error (s_, rc);
assert_success_or_recoverable (s_, rc);
return rc;
}

Expand Down Expand Up @@ -270,7 +284,7 @@ int zmq::udp_engine_t::set_udp_multicast_iface (fd_t s_,
}
}

assert_socket_tuning_error (s_, rc);
assert_success_or_recoverable (s_, rc);
return rc;
}

Expand All @@ -279,7 +293,7 @@ int zmq::udp_engine_t::set_udp_reuse_address (fd_t s_, bool on_)
int on = on_ ? 1 : 0;
int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEADDR,
reinterpret_cast<char *> (&on), sizeof (on));
assert_socket_tuning_error (s_, rc);
assert_success_or_recoverable (s_, rc);
return rc;
}

Expand All @@ -291,7 +305,7 @@ int zmq::udp_engine_t::set_udp_reuse_port (fd_t s_, bool on_)
int on = on_ ? 1 : 0;
int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEPORT,
reinterpret_cast<char *> (&on), sizeof (on));
assert_socket_tuning_error (s_, rc);
assert_success_or_recoverable (s_, rc);
return rc;
#endif
}
Expand Down Expand Up @@ -322,7 +336,7 @@ int zmq::udp_engine_t::add_membership (fd_t s_, const udp_address_t *addr_)
reinterpret_cast<char *> (&mreq), sizeof (mreq));
}

assert_socket_tuning_error (s_, rc);
assert_success_or_recoverable (s_, rc);
return rc;
}

Expand Down Expand Up @@ -470,17 +484,28 @@ void zmq::udp_engine_t::out_event ()
#ifdef ZMQ_HAVE_WINDOWS
rc = sendto (_fd, _out_buffer, static_cast<int> (size), 0, _out_address,
_out_address_len);
wsa_assert (rc != SOCKET_ERROR);
#elif defined ZMQ_HAVE_VXWORKS
rc = sendto (_fd, reinterpret_cast<caddr_t> (_out_buffer), size, 0,
(sockaddr *) _out_address, _out_address_len);
errno_assert (rc != -1);
#else
rc = sendto (_fd, _out_buffer, size, 0, _out_address, _out_address_len);
errno_assert (rc != -1);
#endif
} else
if (rc < 0) {
#ifdef ZMQ_HAVE_WINDOWS
if (WSAGetLastError () != WSAEWOULDBLOCK) {
assert_success_or_recoverable (_fd, rc);
error (connection_error);
}
#else
if (rc != EWOULDBLOCK) {
assert_success_or_recoverable (_fd, rc);
error (connection_error);
}
#endif
}
} else {
reset_pollout (_handle);
}
}

const zmq::endpoint_uri_pair_t &zmq::udp_engine_t::get_endpoint () const
Expand Down Expand Up @@ -510,20 +535,22 @@ void zmq::udp_engine_t::in_event ()
const int nbytes =
recvfrom (_fd, _in_buffer, MAX_UDP_MSG, 0,
reinterpret_cast<sockaddr *> (&in_address), &in_addrlen);

if (nbytes < 0) {
#ifdef ZMQ_HAVE_WINDOWS
if (nbytes == SOCKET_ERROR) {
const int last_error = WSAGetLastError ();
wsa_assert (last_error == WSAENETDOWN || last_error == WSAENETRESET
|| last_error == WSAEWOULDBLOCK);
return;
}
if (WSAGetLastError () != WSAEWOULDBLOCK) {
assert_success_or_recoverable (_fd, nbytes);
error (connection_error);
}
#else
if (nbytes == -1) {
errno_assert (errno != EBADF && errno != EFAULT && errno != ENOMEM
&& errno != ENOTSOCK);
if (nbytes != EWOULDBLOCK) {
assert_success_or_recoverable (_fd, nbytes);
error (connection_error);
}
#endif
return;
}
#endif

int rc;
int body_size;
int body_offset;
Expand Down

0 comments on commit 2aa87c9

Please sign in to comment.