Skip to content

Problem: In zmq::socket_base_t::send method with or without ZMQ_ROUTER_MANDATORY option set #2348

Closed
@reza-ebrahimi

Description

@reza-ebrahimi

This is the send method in socket_base_t class, and there is a bug here, in the case of ROUTER socket, we have two processes, one is ROUTER process and another is DEALER process, code below is the ROUTER process:

//  Send a message to an unknown peer with mandatory routing
//  This will fail
int mandatory = 1;
int rc = zmq_setsockopt (router, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof (mandatory));
assert (rc == 0);

while (true) {
	//  Get message from dealer to know when connection is ready
	char buffer[255];
	rc = zmq_recv(router, buffer, 255, 0); // for identity
	rc = zmq_recv(router, buffer, 255, 0); // for another part of message

	Sleep(6000); // in this time gap we close DEALER process

	//  Send a message to disconnected dealer now
        //  and it will block here in zmq_send method
	rc = zmq_send(router, "X", 1, ZMQ_SNDMORE);
	assert(rc == 1);
	rc = zmq_send(router, "Hello", 5, 0);
	assert(rc == 5);
}

and there problem is in below send method, due to ZMQ_ROUTER_MANDATORY option, it might return EHOSTUNREACH or -1, but it goes to last while loop to process remained commands and blocks in select system call untill another DEALER process coming up!

int zmq::socket_base_t::send (msg_t *msg_, int flags_)
{

scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);

//  Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
    errno = ETERM;
    return -1;
}

//  Check whether message passed to the function is valid.
if (unlikely (!msg_ || !msg_->check ())) {
    errno = EFAULT;
    return -1;
}

//  Process pending commands, if any.
int rc = process_commands (0, true);
if (unlikely (rc != 0)) {
    return -1;
}

//  Clear any user-visible flags that are set on the message.
msg_->reset_flags (msg_t::more);

//  At this point we impose the flags on the message.
if (flags_ & ZMQ_SNDMORE)
    msg_->set_flags (msg_t::more);

msg_->reset_metadata ();

//  Try to send the message using method in each socket class
rc = xsend (msg_);
if (rc == 0) {
    return 0;
}
if (unlikely (errno != EAGAIN)) {
    return -1;
}

//  In case of non-blocking send we'll simply propagate
//  the error - including EAGAIN - up the stack.
if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) {
    return -1;
}

//  Compute the time when the timeout should occur.
//  If the timeout is infinite, don't care.
int timeout = options.sndtimeo;
uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);

//  Oops, we couldn't send the message. Wait for the next
//  command, process it and try to send the message again.
//  If timeout is reached in the meantime, return EAGAIN.
while (true) {
    if (unlikely (process_commands (timeout, false) != 0)) {
        return -1;
    }
    rc = xsend (msg_);
    if (rc == 0)
        break;
    if (unlikely (errno != EAGAIN)) {
        return -1;
    }
    if (timeout > 0) {
        timeout = (int) (end - clock.now_ms ());
        if (timeout <= 0) {
            errno = EAGAIN;
            return -1;
        }
    }
}

return 0;

}
  1. if ZMQ_ROUTER_MANDATORY option is set, it should return before reach to last while loop, is still processing command for a disconnected socket necessary? or if ZMQ_ROUTER_MANDATORY option is set?

  2. blocking in send method is unacceptable by any reason, in this scenario it should return actual error code with its string to the caller.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions