diff --git a/CMakeLists.txt b/CMakeLists.txt index cfceada220..3def97af15 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -609,7 +609,8 @@ set(tests test_spec_router test_sub_forward test_term_endpoint - test_timeo) + test_timeo + test_inproc_connect_before_bind) if(NOT WIN32) list(APPEND tests test_monitor diff --git a/src/ctx.cpp b/src/ctx.cpp index c6e8e57a38..0dcbf93943 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -392,6 +392,34 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) return endpoint; } +void zmq::ctx_t::pend_connection (const char *addr_, const pending_connection_t &pending_connection_) +{ + endpoints_sync.lock (); + + // Todo, use multimap to support multiple pending connections + pending_connections[addr_] = pending_connection_; + + endpoints_sync.unlock (); +} + +zmq::pending_connection_t zmq::ctx_t::next_pending_connection(const char *addr_) +{ + endpoints_sync.lock (); + + pending_connections_t::iterator it = pending_connections.find (addr_); + if (it == pending_connections.end ()) { + + endpoints_sync.unlock (); + pending_connection_t empty = {NULL, NULL}; + return empty; + } + pending_connection_t pending_connection = it->second; + pending_connections.erase(it); + + endpoints_sync.unlock (); + return pending_connection; +} + // The last used socket ID, or 0 if no socket was used so far. Note that this // is a global variable. Thus, even sockets created in different contexts have // unique IDs. diff --git a/src/ctx.hpp b/src/ctx.hpp index bb498a6520..10a34dec66 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -40,6 +40,7 @@ namespace zmq class io_thread_t; class socket_base_t; class reaper_t; + class pipe_t; // Information associated with inproc endpoint. Note that endpoint options // are registered as well so that the peer can access them without a need @@ -50,6 +51,12 @@ namespace zmq options_t options; }; + struct pending_connection_t + { + socket_base_t *socket; + pipe_t* pipe; + }; + // Context object encapsulates all the global state associated with // the library. @@ -101,6 +108,8 @@ namespace zmq int register_endpoint (const char *addr_, endpoint_t &endpoint_); void unregister_endpoints (zmq::socket_base_t *socket_); endpoint_t find_endpoint (const char *addr_); + void pend_connection (const char *addr_, const pending_connection_t &pending_connection_); + pending_connection_t next_pending_connection (const char *addr_); enum { term_tid = 0, @@ -156,6 +165,10 @@ namespace zmq typedef std::map endpoints_t; endpoints_t endpoints; + // List of inproc connection endpoints pending a bind + typedef std::map pending_connections_t; + pending_connections_t pending_connections; + // Synchronisation of access to the list of inproc endpoints. mutex_t endpoints_sync; diff --git a/src/object.cpp b/src/object.cpp index 8ea9933fc3..fec067554a 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -49,6 +49,11 @@ uint32_t zmq::object_t::get_tid () return tid; } +void zmq::object_t::set_tid(uint32_t id) +{ + tid = id; +} + zmq::ctx_t *zmq::object_t::get_ctx () { return ctx; @@ -143,6 +148,16 @@ zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_) return ctx->find_endpoint (addr_); } +void zmq::object_t::pend_connection (const char *addr_, const pending_connection_t &pending_connection_) +{ + ctx->pend_connection (addr_, pending_connection_); +} + +zmq::pending_connection_t zmq::object_t::next_pending_connection (const char *addr_) +{ + return ctx->next_pending_connection(addr_); +} + void zmq::object_t::destroy_socket (socket_base_t *socket_) { ctx->destroy_socket (socket_); diff --git a/src/object.hpp b/src/object.hpp index b20d2b8723..13851652f3 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -27,6 +27,7 @@ namespace zmq struct i_engine; struct endpoint_t; + struct pending_connection_t; struct command_t; class ctx_t; class pipe_t; @@ -47,6 +48,7 @@ namespace zmq virtual ~object_t (); uint32_t get_tid (); + void set_tid(uint32_t id); ctx_t *get_ctx (); void process_command (zmq::command_t &cmd_); @@ -57,6 +59,9 @@ namespace zmq int register_endpoint (const char *addr_, zmq::endpoint_t &endpoint_); void unregister_endpoints (zmq::socket_base_t *socket_); zmq::endpoint_t find_endpoint (const char *addr_); + void pend_connection (const char *addr_, const pending_connection_t &pending_connection_); + zmq::pending_connection_t next_pending_connection (const char *addr_); + void destroy_socket (zmq::socket_base_t *socket_); // Logs an message. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 84a4923473..ebadd08089 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -344,6 +344,50 @@ int zmq::socket_base_t::bind (const char *addr_) if (rc == 0) { // Save last endpoint URI last_endpoint.assign (addr_); + + pending_connection_t pending_connection = next_pending_connection(addr_); + while (pending_connection.pipe != NULL) + { + inc_seqnum(); + //// If required, send the identity of the local socket to the peer. + //if (peer.options.recv_identity) { + // msg_t id; + // rc = id.init_size (options.identity_size); + // errno_assert (rc == 0); + // memcpy (id.data (), options.identity, options.identity_size); + // id.set_flags (msg_t::identity); + // bool written = new_pipes [0]->write (&id); + // zmq_assert (written); + // new_pipes [0]->flush (); + //} + + //// If required, send the identity of the peer to the local socket. + //if (options.recv_identity) { + // msg_t id; + // rc = id.init_size (peer.options.identity_size); + // errno_assert (rc == 0); + // memcpy (id.data (), peer.options.identity, peer.options.identity_size); + // id.set_flags (msg_t::identity); + // bool written = new_pipes [1]->write (&id); + // zmq_assert (written); + // new_pipes [1]->flush (); + //} + + //// Attach remote end of the pipe to the peer socket. Note that peer's + //// seqnum was incremented in find_endpoint function. We don't need it + //// increased here. + //send_bind (peer.socket, new_pipes [1], false); + + pending_connection.pipe->set_tid(get_tid()); + + command_t cmd; + cmd.type = command_t::bind; + cmd.args.bind.pipe = pending_connection.pipe; + process_command(cmd); + + + pending_connection = next_pending_connection(addr_); + } } return rc; } @@ -435,8 +479,6 @@ int zmq::socket_base_t::connect (const char *addr_) // Find the peer endpoint. endpoint_t peer = find_endpoint (addr_); - if (!peer.socket) - return -1; // The total HWM for an inproc connection should be the sum of // the binder's HWM and the connector's HWM. @@ -448,7 +490,7 @@ int zmq::socket_base_t::connect (const char *addr_) rcvhwm = options.rcvhwm + peer.options.sndhwm; // Create a bi-directional pipe to connect the peers. - object_t *parents [2] = {this, peer.socket}; + object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket}; pipe_t *new_pipes [2] = {NULL, NULL}; bool conflate = options.conflate && @@ -466,34 +508,43 @@ int zmq::socket_base_t::connect (const char *addr_) // Attach local end of the pipe to this socket object. attach_pipe (new_pipes [0]); - // If required, send the identity of the local socket to the peer. - if (peer.options.recv_identity) { - msg_t id; - rc = id.init_size (options.identity_size); - errno_assert (rc == 0); - memcpy (id.data (), options.identity, options.identity_size); - id.set_flags (msg_t::identity); - bool written = new_pipes [0]->write (&id); - zmq_assert (written); - new_pipes [0]->flush (); + if (!peer.socket) + { + pending_connection_t pending_connection = {this, new_pipes [1]}; + pend_connection (addr_, pending_connection); } + else + { + // If required, send the identity of the local socket to the peer. + if (peer.options.recv_identity) { + + msg_t id; + rc = id.init_size (options.identity_size); + errno_assert (rc == 0); + memcpy (id.data (), options.identity, options.identity_size); + id.set_flags (msg_t::identity); + bool written = new_pipes [0]->write (&id); + zmq_assert (written); + new_pipes [0]->flush (); + } - // If required, send the identity of the peer to the local socket. - if (options.recv_identity) { - msg_t id; - rc = id.init_size (peer.options.identity_size); - errno_assert (rc == 0); - memcpy (id.data (), peer.options.identity, peer.options.identity_size); - id.set_flags (msg_t::identity); - bool written = new_pipes [1]->write (&id); - zmq_assert (written); - new_pipes [1]->flush (); - } + // If required, send the identity of the peer to the local socket. + if (options.recv_identity) { + msg_t id; + rc = id.init_size (peer.options.identity_size); + errno_assert (rc == 0); + memcpy (id.data (), peer.options.identity, peer.options.identity_size); + id.set_flags (msg_t::identity); + bool written = new_pipes [1]->write (&id); + zmq_assert (written); + new_pipes [1]->flush (); + } - // Attach remote end of the pipe to the peer socket. Note that peer's - // seqnum was incremented in find_endpoint function. We don't need it - // increased here. - send_bind (peer.socket, new_pipes [1], false); + // Attach remote end of the pipe to the peer socket. Note that peer's + // seqnum was incremented in find_endpoint function. We don't need it + // increased here. + send_bind (peer.socket, new_pipes [1], false); + } // Save last endpoint URI last_endpoint.assign (addr_); @@ -636,7 +687,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) errno = ENOENT; return -1; } - + for (inprocs_t::iterator it = range.first; it != range.second; ++it) it->second->terminate(true); inprocs.erase (range.first, range.second); @@ -655,7 +706,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) if (it->second.second != NULL) it->second.second->terminate(false); term_child (it->second.first); - } + } endpoints.erase (range.first, range.second); return 0; } @@ -1223,20 +1274,20 @@ void zmq::socket_base_t::event_disconnected (std::string &addr_, int fd_) void zmq::socket_base_t::monitor_event (zmq_event_t event_, const std::string& addr_) { if (monitor_socket) { - const uint16_t eid = (uint16_t)event_.event ; - const uint32_t value = (uint32_t)event_.value ; - // prepare and send first message frame - // containing event id and value + const uint16_t eid = (uint16_t)event_.event ; + const uint32_t value = (uint32_t)event_.value ; + // prepare and send first message frame + // containing event id and value zmq_msg_t msg; zmq_msg_init_size (&msg, sizeof(eid) + sizeof(value)); - char* data1 = (char*)zmq_msg_data(&msg); + char* data1 = (char*)zmq_msg_data(&msg); memcpy (data1, &eid, sizeof(eid)); memcpy (data1+sizeof(eid), &value, sizeof(value)); zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE); - // prepare and send second message frame - // containing the address (endpoint) + // prepare and send second message frame + // containing the address (endpoint) zmq_msg_init_size (&msg, addr_.size()); - memcpy(zmq_msg_data(&msg), addr_.c_str(), addr_.size()); + memcpy(zmq_msg_data(&msg), addr_.c_str(), addr_.size()); zmq_sendmsg (monitor_socket, &msg, 0); } } diff --git a/tests/test_ctx_destroy.cpp b/tests/test_ctx_destroy.cpp index ce31e42a83..a78a05a9ca 100644 --- a/tests/test_ctx_destroy.cpp +++ b/tests/test_ctx_destroy.cpp @@ -63,6 +63,9 @@ void test_ctx_shutdown() // Spawn a thread to receive on socket void *receiver_thread = zmq_threadstart (&receiver, socket); + // Wait for thread to start up and block + zmq_sleep (1); + // Shutdown context, if we used destroy here we would deadlock. rc = zmq_ctx_shutdown (ctx); assert (rc == 0); diff --git a/tests/test_inproc_connect_before_bind.cpp b/tests/test_inproc_connect_before_bind.cpp new file mode 100644 index 0000000000..04f78aac17 --- /dev/null +++ b/tests/test_inproc_connect_before_bind.cpp @@ -0,0 +1,114 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include "testutil.hpp" + +void test_bind_before_connect() +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + // Bind first + void *bindSocket = zmq_socket (ctx, ZMQ_PAIR); + assert (bindSocket); + int rc = zmq_bind (bindSocket, "inproc://a"); + assert (rc == 0); + + // Now connect + void *connectSocket = zmq_socket (ctx, ZMQ_PAIR); + assert (connectSocket); + rc = zmq_connect (connectSocket, "inproc://a"); + assert (rc == 0); + + // Queue up some data + rc = zmq_send_const (connectSocket, "foobar", 6, 0); + assert (rc == 6); + + // Read pending message + zmq_msg_t msg; + rc = zmq_msg_init (&msg); + assert (rc == 0); + rc = zmq_msg_recv (&msg, bindSocket, ZMQ_NOBLOCK); + assert (rc == 6); + void *data = zmq_msg_data (&msg); + assert (memcmp ("foobar", data, 6) == 0); + + // Cleanup + rc = zmq_close (connectSocket); + assert (rc == 0); + + rc = zmq_close (bindSocket); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + +void test_connect_before_bind() +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + // Connect first + void *connectSocket = zmq_socket (ctx, ZMQ_PAIR); + assert (connectSocket); + int rc = zmq_connect (connectSocket, "inproc://a"); + assert (rc == 0); + + + // Queue up some data + rc = zmq_send_const (connectSocket, "foobar", 6, 0); + assert (rc == 6); + + // Now bind + void *bindSocket = zmq_socket (ctx, ZMQ_PAIR); + assert (bindSocket); + rc = zmq_bind (bindSocket, "inproc://a"); + assert (rc == 0); + + // Read pending message + zmq_msg_t msg; + rc = zmq_msg_init (&msg); + assert (rc == 0); + rc = zmq_msg_recv (&msg, bindSocket, ZMQ_NOBLOCK); + assert (rc == 6); + void *data = zmq_msg_data (&msg); + assert (memcmp ("foobar", data, 6) == 0); + + // Cleanup + rc = zmq_close (connectSocket); + assert (rc == 0); + + rc = zmq_close (bindSocket); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + +int main (void) +{ + setup_test_environment(); + + test_bind_before_connect(); + test_connect_before_bind(); + + return 0 ; +} diff --git a/tests/test_pair_inproc.cpp b/tests/test_pair_inproc.cpp index 133df82749..20ab071c29 100644 --- a/tests/test_pair_inproc.cpp +++ b/tests/test_pair_inproc.cpp @@ -55,7 +55,7 @@ int main (void) rc = zmq_msg_recv (&msg, sc, 0); assert (rc == 6); data = zmq_msg_data (&msg); - assert (memcmp ("foobar", data, 3) == 0); + assert (memcmp ("foobar", data, 6) == 0); // Cleanup