diff --git a/src/lb.cpp b/src/lb.cpp index 4ce785a141..02af502b98 100644 --- a/src/lb.cpp +++ b/src/lb.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + Copyright (c) 2007-2018 Contributors as noted in the AUTHORS file This file is part of libzmq, the ZeroMQ core engine in C++. @@ -107,9 +107,24 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_) // Application should handle this as suitable if (_more) { _pipes[_current]->rollback (); + // At this point the pipe is already being deallocated + // and the first N frames are unreachable (_outpipe is + // most likely already NULL so rollback won't actually do + // anything and they can't be un-written to deliver later). + // Return EFAULT to socket_base caller to drop current message + // and any other subsequent frames to avoid them being + // "stuck" and received when a new client reconnects, which + // would break atomicity of multi-part messages (in blocking mode + // socket_base just tries again and again to send the same message) + // Note that given dropping mode returns 0, the user will + // never know that the message could not be delivered, but + // can't really fix it without breaking backward compatibility. + // -2/EAGAIN will make sure socket_base caller does not re-enter + // immediately or after a short sleep in blocking mode. + _dropping = (msg_->flags () & msg_t::more) != 0; _more = false; errno = EAGAIN; - return -1; + return -2; } _active--; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 2b5290c412..e12b90a50a 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1113,6 +1113,18 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) if (rc == 0) { return 0; } + // Special case for ZMQ_PUSH: -2 means pipe is dead while a + // multi-part send is in progress and can't be recovered, so drop + // silently when in blocking mode to keep backward compatibility. + if (unlikely (rc == -2)) { + if (!((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0)) { + rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); + return 0; + } + } if (unlikely (errno != EAGAIN)) { return -1; } diff --git a/tests/test_spec_pushpull.cpp b/tests/test_spec_pushpull.cpp index 9844eb2244..0cf19e52a8 100644 --- a/tests/test_spec_pushpull.cpp +++ b/tests/test_spec_pushpull.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file + Copyright (c) 2007-2018 Contributors as noted in the AUTHORS file This file is part of libzmq, the ZeroMQ core engine in C++. @@ -240,6 +240,140 @@ void test_destroy_queue_on_disconnect (const char *bind_address_) test_context_socket_close_zero_linger (b); } +// PUSH and PULL: SHALL either receive or drop multipart messages atomically. +void test_push_multipart_atomic_drop (const char *bind_address_, + const bool block) +{ + int linger = 0; + int hwm = 1; + + void *push = test_context_socket (ZMQ_PUSH); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (push, ZMQ_LINGER, &linger, sizeof (linger))); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (push, ZMQ_SNDHWM, &hwm, sizeof (hwm))); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (push, bind_address_)); + size_t addr_len = MAX_SOCKET_STRING; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (push, ZMQ_LAST_ENDPOINT, connect_address, &addr_len)); + + void *pull = test_context_socket (ZMQ_PULL); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pull, ZMQ_LINGER, &linger, sizeof (linger))); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pull, ZMQ_RCVHWM, &hwm, sizeof (hwm))); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pull, connect_address)); + + // Wait for connections. + msleep (SETTLE_TIME); + + int rc; + zmq_msg_t msg_data; + // A large message is needed to overrun the TCP buffers + const size_t len = 16 * 1024 * 1024; + size_t zmq_events_size = sizeof (int); + int zmq_events; + + // Normal case - excercise the queues + send_string_expect_success (push, "0", ZMQ_SNDMORE); + send_string_expect_success (push, "0", ZMQ_SNDMORE); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg_data, len)); + memset (zmq_msg_data (&msg_data), 'a', len); + TEST_ASSERT_EQUAL_INT (len, zmq_msg_send (&msg_data, push, 0)); + + recv_string_expect_success (pull, "0", 0); + recv_string_expect_success (pull, "0", 0); + zmq_msg_init (&msg_data); + TEST_ASSERT_EQUAL_INT (len, zmq_msg_recv (&msg_data, pull, 0)); + zmq_msg_close (&msg_data); + + // Fill the HWMs of sender and receiver, one message each + send_string_expect_success (push, "1", 0); + + send_string_expect_success (push, "2", ZMQ_SNDMORE); + send_string_expect_success (push, "2", ZMQ_SNDMORE); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg_data, len)); + memset (zmq_msg_data (&msg_data), 'b', len); + TEST_ASSERT_EQUAL_INT (len, zmq_msg_send (&msg_data, push, 0)); + + // Disconnect and simulate a poll (doesn't work on Windows) to + // let the commands run and let the pipes start to be deallocated + TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (pull, connect_address)); + + zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size); + zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size); + msleep (SETTLE_TIME); + zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size); + zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size); + + // Reconnect and immediately push a large message into the pipe, + // if the problem is reproduced the pipe is in the process of being + // terminated but still exists (state term_ack_sent) and had already + // accepted the frame, so with the first frames already gone and + // unreachable only the last is left, and is stuck in the lb. + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pull, connect_address)); + + send_string_expect_success (push, "3", ZMQ_SNDMORE); + send_string_expect_success (push, "3", ZMQ_SNDMORE); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg_data, len)); + memset (zmq_msg_data (&msg_data), 'c', len); + if (block) { + TEST_ASSERT_EQUAL_INT (len, + zmq_msg_send (&msg_data, push, ZMQ_SNDMORE)); + } else { + rc = zmq_msg_send (&msg_data, push, ZMQ_SNDMORE | ZMQ_DONTWAIT); + // inproc won't fail, much faster to connect/disconnect pipes than TCP + if (rc == -1) { + // at this point the new pipe is there and it works + send_string_expect_success (push, "3", ZMQ_SNDMORE); + send_string_expect_success (push, "3", ZMQ_SNDMORE); + TEST_ASSERT_EQUAL_INT (len, + zmq_msg_send (&msg_data, push, ZMQ_SNDMORE)); + } + } + send_string_expect_success (push, "3b", 0); + + zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size); + zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size); + msleep (SETTLE_TIME); + zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size); + zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size); + + send_string_expect_success (push, "5", ZMQ_SNDMORE); + send_string_expect_success (push, "5", ZMQ_SNDMORE); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg_data, len)); + memset (zmq_msg_data (&msg_data), 'd', len); + TEST_ASSERT_EQUAL_INT (len, zmq_msg_send (&msg_data, push, 0)); + + // On very slow machines the message will not be lost, as it will + // be sent when the new pipe is already in place, so avoid failing + // and simply carry on as it would be very noisy otherwise. + // Receive both to avoid leaking metadata. + // If only the "5" message is received, the problem is reproduced, and + // without the fix the first message received would be the last large + // frame of "3". + char buffer[2]; + rc = + TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (pull, buffer, sizeof (buffer), 0)); + TEST_ASSERT_EQUAL_INT (1, rc); + TEST_ASSERT_TRUE (buffer[0] == '3' || buffer[0] == '5'); + if (buffer[0] == '3') { + recv_string_expect_success (pull, "3", 0); + zmq_msg_init (&msg_data); + TEST_ASSERT_EQUAL_INT (len, zmq_msg_recv (&msg_data, pull, 0)); + zmq_msg_close (&msg_data); + recv_string_expect_success (pull, "3b", 0); + recv_string_expect_success (pull, "5", 0); + } + recv_string_expect_success (pull, "5", 0); + zmq_msg_init (&msg_data); + TEST_ASSERT_EQUAL_INT (len, zmq_msg_recv (&msg_data, pull, 0)); + zmq_msg_close (&msg_data); + + test_context_socket_close_zero_linger (pull); + test_context_socket_close_zero_linger (push); +} + #define def_test_spec_pushpull(name, bind_address_) \ void test_spec_pushpull_##name##_push_round_robin_out () \ { \ @@ -256,6 +390,14 @@ void test_destroy_queue_on_disconnect (const char *bind_address_) void test_spec_pushpull_##name##_destroy_queue_on_disconnect () \ { \ test_destroy_queue_on_disconnect (bind_address_); \ + } \ + void test_spec_pushpull_##name##_push_multipart_atomic_drop_block () \ + { \ + test_push_multipart_atomic_drop (bind_address_, true); \ + } \ + void test_spec_pushpull_##name##_push_multipart_atomic_drop_non_block () \ + { \ + test_push_multipart_atomic_drop (bind_address_, false); \ } def_test_spec_pushpull (inproc, "inproc://a") @@ -276,5 +418,9 @@ def_test_spec_pushpull (inproc, "inproc://a") // TODO Tests disabled until libzmq does this properly //RUN_TEST (test_spec_pushpull_inproc_destroy_queue_on_disconnect); //RUN_TEST (test_spec_pushpull_tcp_destroy_queue_on_disconnect); + RUN_TEST (test_spec_pushpull_inproc_push_multipart_atomic_drop_block); + RUN_TEST (test_spec_pushpull_inproc_push_multipart_atomic_drop_non_block); + RUN_TEST (test_spec_pushpull_tcp_push_multipart_atomic_drop_block); + RUN_TEST (test_spec_pushpull_tcp_push_multipart_atomic_drop_non_block); return UNITY_END (); }