diff --git a/src/ip.cpp b/src/ip.cpp index ca73e4f694..db8bf0ca91 100644 --- a/src/ip.cpp +++ b/src/ip.cpp @@ -234,20 +234,12 @@ int zmq::bind_to_device (fd_t s_, const std::string &bound_device_) #ifdef ZMQ_HAVE_SO_BINDTODEVICE int rc = setsockopt (s_, SOL_SOCKET, SO_BINDTODEVICE, bound_device_.c_str (), bound_device_.length ()); - -#ifdef ZMQ_HAVE_WINDOWS - if (rc != SOCKET_ERROR) - return 0; - const int lastError = WSAGetLastError (); - errno = wsa_error_to_errno (lastError); - wsa_assert (lastError != WSAENOTSOCK); - return -1; -#else - if (rc == 0) + if (rc != 0) { + assert_success_or_recoverable (s_, rc); + return -1; + } else { return 0; - errno_assert (errno != ENOTSOCK); - return -1; -#endif + } #else LIBZMQ_UNUSED (s_); LIBZMQ_UNUSED (bound_device_); @@ -682,10 +674,17 @@ void zmq::make_socket_noninheritable (fd_t sock_) #endif } -void zmq::assert_socket_tuning_error (zmq::fd_t s_, int rc_) +void zmq::assert_success_or_recoverable (zmq::fd_t s_, int rc_) { - if (rc_ == 0) +#ifdef ZMQ_HAVE_WINDOWS + if (rc_ != SOCKET_ERROR) { + return; + } +#else + if (rc_ != -1) { return; + } +#endif // Check whether an error occurred int err = 0; diff --git a/src/ip.hpp b/src/ip.hpp index f9be650096..15e038c48e 100644 --- a/src/ip.hpp +++ b/src/ip.hpp @@ -72,9 +72,10 @@ int make_fdpair (fd_t *r_, fd_t *w_); // Asserts on any failure. void make_socket_noninheritable (fd_t sock_); -// Asserts that an internal error did not occur. Does not assert -// on network errors such as reset or aborted connections. -void assert_socket_tuning_error (fd_t s_, int rc_); +// Asserts that: +// - an internal 0MQ error did not occur, +// - and, if a socket error occured, it can be recovered from. +void assert_success_or_recoverable (fd_t s_, int rc_); } #endif diff --git a/src/tcp.cpp b/src/tcp.cpp index aa0b998cd5..237115db67 100644 --- a/src/tcp.cpp +++ b/src/tcp.cpp @@ -62,7 +62,7 @@ int zmq::tune_tcp_socket (fd_t s_) int nodelay = 1; int rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast (&nodelay), sizeof (int)); - assert_socket_tuning_error (s_, rc); + assert_success_or_recoverable (s_, rc); if (rc != 0) return rc; @@ -71,7 +71,7 @@ int zmq::tune_tcp_socket (fd_t s_) int nodelack = 1; rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char *) &nodelack, sizeof (int)); - assert_socket_tuning_error (s_, rc); + assert_success_or_recoverable (s_, rc); #endif return rc; } @@ -81,7 +81,7 @@ int zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_) const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF, reinterpret_cast (&bufsize_), sizeof bufsize_); - assert_socket_tuning_error (sockfd_, rc); + assert_success_or_recoverable (sockfd_, rc); return rc; } @@ -90,7 +90,7 @@ int zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_) const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF, reinterpret_cast (&bufsize_), sizeof bufsize_); - assert_socket_tuning_error (sockfd_, rc); + assert_success_or_recoverable (sockfd_, rc); return rc; } @@ -123,7 +123,7 @@ int zmq::tune_tcp_keepalives (fd_t s_, int rc = WSAIoctl (s_, SIO_KEEPALIVE_VALS, &keepalive_opts, sizeof (keepalive_opts), NULL, 0, &num_bytes_returned, NULL, NULL); - assert_socket_tuning_error (s_, rc); + assert_success_or_recoverable (s_, rc); if (rc == SOCKET_ERROR) return rc; } @@ -133,7 +133,7 @@ int zmq::tune_tcp_keepalives (fd_t s_, int rc = setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast (&keepalive_), sizeof (int)); - assert_socket_tuning_error (s_, rc); + assert_success_or_recoverable (s_, rc); if (rc != 0) return rc; @@ -141,7 +141,7 @@ int zmq::tune_tcp_keepalives (fd_t s_, if (keepalive_cnt_ != -1) { int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT, &keepalive_cnt_, sizeof (int)); - assert_socket_tuning_error (s_, rc); + assert_success_or_recoverable (s_, rc); if (rc != 0) return rc; } @@ -151,7 +151,7 @@ int zmq::tune_tcp_keepalives (fd_t s_, if (keepalive_idle_ != -1) { int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE, &keepalive_idle_, sizeof (int)); - assert_socket_tuning_error (s_, rc); + assert_success_or_recoverable (s_, rc); if (rc != 0) return rc; } @@ -160,7 +160,7 @@ int zmq::tune_tcp_keepalives (fd_t s_, if (keepalive_idle_ != -1) { int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE, &keepalive_idle_, sizeof (int)); - assert_socket_tuning_error (s_, rc); + assert_success_or_recoverable (s_, rc); if (rc != 0) return rc; } @@ -171,7 +171,7 @@ int zmq::tune_tcp_keepalives (fd_t s_, if (keepalive_intvl_ != -1) { int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL, &keepalive_intvl_, sizeof (int)); - assert_socket_tuning_error (s_, rc); + assert_success_or_recoverable (s_, rc); if (rc != 0) return rc; } @@ -196,13 +196,13 @@ int zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_) int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT, reinterpret_cast (&timeout_), sizeof (timeout_)); - assert_socket_tuning_error (sockfd_, rc); + assert_success_or_recoverable (sockfd_, rc); return rc; // FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT #elif defined(TCP_USER_TIMEOUT) int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_, sizeof (timeout_)); - assert_socket_tuning_error (sockfd_, rc); + assert_success_or_recoverable (sockfd_, rc); return rc; #else return 0; diff --git a/src/udp_engine.cpp b/src/udp_engine.cpp index 7009b1e590..2f0da1e678 100644 --- a/src/udp_engine.cpp +++ b/src/udp_engine.cpp @@ -119,8 +119,14 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) int rc = 0; // Bind the socket to a device if applicable - if (!_options.bound_device.empty ()) + if (!_options.bound_device.empty ()) { rc = rc | bind_to_device (_fd, _options.bound_device); + if (rc != 0) { + assert_success_or_recoverable (_fd, rc); + error (connection_error); + return; + } + } if (_send_enabled) { if (!_options.raw_socket) { @@ -162,9 +168,7 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) if (multicast) { // Multicast addresses should be allowed to bind to more than // one port as all ports should receive the message -#ifdef SO_REUSEPORT rc = rc | set_udp_reuse_port (_fd, true); -#endif // In multicast we should bind ANY and use the mreq struct to // specify the interface @@ -175,6 +179,11 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) real_bind_addr = bind_addr; } + if (rc != 0) { + error (protocol_error); + return; + } + #ifdef ZMQ_HAVE_VXWORKS rc = rc | bind (_fd, (sockaddr *) real_bind_addr->as_sockaddr (), @@ -184,6 +193,11 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) | bind (_fd, real_bind_addr->as_sockaddr (), real_bind_addr->sockaddr_len ()); #endif + if (rc != 0) { + assert_success_or_recoverable (_fd, rc); + error (connection_error); + return; + } if (multicast) { rc = rc | add_membership (_fd, udp_addr); @@ -224,7 +238,7 @@ int zmq::udp_engine_t::set_udp_multicast_loop (fd_t s_, int loop = loop_ ? 1 : 0; int rc = setsockopt (s_, level, optname, reinterpret_cast (&loop), sizeof (loop)); - assert_socket_tuning_error (s_, rc); + assert_success_or_recoverable (s_, rc); return rc; } @@ -240,7 +254,7 @@ int zmq::udp_engine_t::set_udp_multicast_ttl (fd_t s_, bool is_ipv6_, int hops_) int rc = setsockopt (s_, level, IP_MULTICAST_TTL, reinterpret_cast (&hops_), sizeof (hops_)); - assert_socket_tuning_error (s_, rc); + assert_success_or_recoverable (s_, rc); return rc; } @@ -270,7 +284,7 @@ int zmq::udp_engine_t::set_udp_multicast_iface (fd_t s_, } } - assert_socket_tuning_error (s_, rc); + assert_success_or_recoverable (s_, rc); return rc; } @@ -279,7 +293,7 @@ int zmq::udp_engine_t::set_udp_reuse_address (fd_t s_, bool on_) int on = on_ ? 1 : 0; int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast (&on), sizeof (on)); - assert_socket_tuning_error (s_, rc); + assert_success_or_recoverable (s_, rc); return rc; } @@ -291,7 +305,7 @@ int zmq::udp_engine_t::set_udp_reuse_port (fd_t s_, bool on_) int on = on_ ? 1 : 0; int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEPORT, reinterpret_cast (&on), sizeof (on)); - assert_socket_tuning_error (s_, rc); + assert_success_or_recoverable (s_, rc); return rc; #endif } @@ -322,7 +336,7 @@ int zmq::udp_engine_t::add_membership (fd_t s_, const udp_address_t *addr_) reinterpret_cast (&mreq), sizeof (mreq)); } - assert_socket_tuning_error (s_, rc); + assert_success_or_recoverable (s_, rc); return rc; } @@ -470,17 +484,28 @@ void zmq::udp_engine_t::out_event () #ifdef ZMQ_HAVE_WINDOWS rc = sendto (_fd, _out_buffer, static_cast (size), 0, _out_address, _out_address_len); - wsa_assert (rc != SOCKET_ERROR); #elif defined ZMQ_HAVE_VXWORKS rc = sendto (_fd, reinterpret_cast (_out_buffer), size, 0, (sockaddr *) _out_address, _out_address_len); - errno_assert (rc != -1); #else rc = sendto (_fd, _out_buffer, size, 0, _out_address, _out_address_len); - errno_assert (rc != -1); #endif - } else + if (rc < 0) { +#ifdef ZMQ_HAVE_WINDOWS + if (WSAGetLastError () != WSAEWOULDBLOCK) { + assert_success_or_recoverable (_fd, rc); + error (connection_error); + } +#else + if (rc != EWOULDBLOCK) { + assert_success_or_recoverable (_fd, rc); + error (connection_error); + } +#endif + } + } else { reset_pollout (_handle); + } } const zmq::endpoint_uri_pair_t &zmq::udp_engine_t::get_endpoint () const @@ -510,20 +535,22 @@ void zmq::udp_engine_t::in_event () const int nbytes = recvfrom (_fd, _in_buffer, MAX_UDP_MSG, 0, reinterpret_cast (&in_address), &in_addrlen); + + if (nbytes < 0) { #ifdef ZMQ_HAVE_WINDOWS - if (nbytes == SOCKET_ERROR) { - const int last_error = WSAGetLastError (); - wsa_assert (last_error == WSAENETDOWN || last_error == WSAENETRESET - || last_error == WSAEWOULDBLOCK); - return; - } + if (WSAGetLastError () != WSAEWOULDBLOCK) { + assert_success_or_recoverable (_fd, nbytes); + error (connection_error); + } #else - if (nbytes == -1) { - errno_assert (errno != EBADF && errno != EFAULT && errno != ENOMEM - && errno != ENOTSOCK); + if (nbytes != EWOULDBLOCK) { + assert_success_or_recoverable (_fd, nbytes); + error (connection_error); + } +#endif return; } -#endif + int rc; int body_size; int body_offset;