Description
This is the send method in socket_base_t
class, and there is a bug here, in the case of ROUTER
socket, we have two processes, one is ROUTER
process and another is DEALER
process, code below is the ROUTER
process:
// Send a message to an unknown peer with mandatory routing
// This will fail
int mandatory = 1;
int rc = zmq_setsockopt (router, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof (mandatory));
assert (rc == 0);
while (true) {
// Get message from dealer to know when connection is ready
char buffer[255];
rc = zmq_recv(router, buffer, 255, 0); // for identity
rc = zmq_recv(router, buffer, 255, 0); // for another part of message
Sleep(6000); // in this time gap we close DEALER process
// Send a message to disconnected dealer now
// and it will block here in zmq_send method
rc = zmq_send(router, "X", 1, ZMQ_SNDMORE);
assert(rc == 1);
rc = zmq_send(router, "Hello", 5, 0);
assert(rc == 5);
}
and there problem is in below send method, due to ZMQ_ROUTER_MANDATORY
option, it might return EHOSTUNREACH
or -1
, but it goes to last while loop to process remained commands and blocks in select
system call untill another DEALER
process coming up!
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
{
scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
errno = ETERM;
return -1;
}
// Check whether message passed to the function is valid.
if (unlikely (!msg_ || !msg_->check ())) {
errno = EFAULT;
return -1;
}
// Process pending commands, if any.
int rc = process_commands (0, true);
if (unlikely (rc != 0)) {
return -1;
}
// Clear any user-visible flags that are set on the message.
msg_->reset_flags (msg_t::more);
// At this point we impose the flags on the message.
if (flags_ & ZMQ_SNDMORE)
msg_->set_flags (msg_t::more);
msg_->reset_metadata ();
// Try to send the message using method in each socket class
rc = xsend (msg_);
if (rc == 0) {
return 0;
}
if (unlikely (errno != EAGAIN)) {
return -1;
}
// In case of non-blocking send we'll simply propagate
// the error - including EAGAIN - up the stack.
if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) {
return -1;
}
// Compute the time when the timeout should occur.
// If the timeout is infinite, don't care.
int timeout = options.sndtimeo;
uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
// Oops, we couldn't send the message. Wait for the next
// command, process it and try to send the message again.
// If timeout is reached in the meantime, return EAGAIN.
while (true) {
if (unlikely (process_commands (timeout, false) != 0)) {
return -1;
}
rc = xsend (msg_);
if (rc == 0)
break;
if (unlikely (errno != EAGAIN)) {
return -1;
}
if (timeout > 0) {
timeout = (int) (end - clock.now_ms ());
if (timeout <= 0) {
errno = EAGAIN;
return -1;
}
}
}
return 0;
}
-
if
ZMQ_ROUTER_MANDATORY
option is set, it should return before reach to last while loop, is still processing command for a disconnected socket necessary? or ifZMQ_ROUTER_MANDATORY
option is set? -
blocking in send method is unacceptable by any reason, in this scenario it should return actual error code with its string to the caller.