Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve oom handling #2471

Merged
merged 4 commits into from
Mar 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions RELICENSE/t-b.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Permission to Relicense under MPLv2 or any other OSI approved license chosen by the current ZeroMQ BDFL

This is a statement by Thomas Braun
that grants permission to relicense its copyrights in the libzmq C++
library (ZeroMQ) under the Mozilla Public License v2 (MPLv2) or any other
Open Source Initiative approved license chosen by the current ZeroMQ
BDFL (Benevolent Dictator for Life).

A portion of the commits made by the Github handle "t-b", with
commit author "thomas.braun@virtuell-zuhause.de" and "thomas.braun@byte-physics.de", are copyright of Thomas Braun.
This document hereby grants the libzmq project team to relicense libzmq,
including all past, present and future contributions of the author listed above.

Thomas Braun
2017/03/27
9 changes: 7 additions & 2 deletions src/gssapi_mechanism_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ int zmq::gssapi_mechanism_base_t::encode_message (msg_t *msg_)
flags |= 0x02;

uint8_t *plaintext_buffer = static_cast <uint8_t *>(malloc(msg_->size ()+1));
alloc_assert(plaintext_buffer);

plaintext_buffer[0] = flags;
memcpy (plaintext_buffer+1, msg_->data(), msg_->size());

Expand Down Expand Up @@ -149,8 +151,9 @@ int zmq::gssapi_mechanism_base_t::decode_message (msg_t *msg_)
// TODO: instead of malloc/memcpy, can we just do: wrapped.value = ptr;
const size_t alloc_length = wrapped.length? wrapped.length: 1;
wrapped.value = static_cast <char *> (malloc (alloc_length));
alloc_assert (wrapped.value);

if (wrapped.length) {
alloc_assert (wrapped.value);
memcpy(wrapped.value, ptr, wrapped.length);
ptr += wrapped.length;
bytes_left -= wrapped.length;
Expand Down Expand Up @@ -247,9 +250,11 @@ int zmq::gssapi_mechanism_base_t::process_initiate (msg_t *msg_, void **token_va
errno = EPROTO;
return -1;
}

*token_value_ = static_cast <char *> (malloc (token_length_ ? token_length_ : 1));
alloc_assert (*token_value_);

if (token_length_) {
alloc_assert (*token_value_);
memcpy(*token_value_, ptr, token_length_);
ptr += token_length_;
bytes_left -= token_length_;
Expand Down
4 changes: 3 additions & 1 deletion src/norm_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,9 @@ void zmq::norm_engine_t::recv_data(NormObjectHandle object)
if (NULL == rxState)
{
// This is a new stream, so create rxState with zmq decoder, etc
rxState = new NormRxStreamState(object, options.maxmsgsize);
rxState = new (std::nothrow) NormRxStreamState(object, options.maxmsgsize);
errno_assert(rxState);

if (!rxState->Init())
{
errno_assert(false);
Expand Down
2 changes: 2 additions & 0 deletions src/req.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ int zmq::req_t::xsend (msg_t *msg_)

// Copy request id before sending (see issue #1695 for details).
uint32_t *request_id_copy = (uint32_t *) malloc (sizeof (uint32_t));
zmq_assert (request_id_copy);

*request_id_copy = request_id;

msg_t id;
Expand Down
2 changes: 2 additions & 0 deletions src/signaler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,8 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
if (*r_ != INVALID_SOCKET) {
size_t dummy_size = 1024 * 1024; // 1M to overload default receive buffer
unsigned char *dummy = (unsigned char *) malloc (dummy_size);
wsa_assert (dummy);

int still_to_send = (int) dummy_size;
int still_to_recv = (int) dummy_size;
while (still_to_send || still_to_recv) {
Expand Down
12 changes: 9 additions & 3 deletions src/socket_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,14 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool
options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;

if (thread_safe)
mailbox = new mailbox_safe_t(&sync);
{
mailbox = new (std::nothrow) mailbox_safe_t(&sync);
zmq_assert (mailbox);
}
else {
mailbox_t *m = new mailbox_t();
mailbox_t *m = new (std::nothrow) mailbox_t();
zmq_assert (m);

if (m->get_fd () != retired_fd)
mailbox = m;
else {
Expand Down Expand Up @@ -1298,7 +1303,8 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
else {
scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);

reaper_signaler = new signaler_t();
reaper_signaler = new (std::nothrow) signaler_t();
zmq_assert (reaper_signaler);

// Add signaler to the safe mailbox
fd = reaper_signaler->get_fd();
Expand Down
4 changes: 4 additions & 0 deletions src/stream_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
// Compile metadata.
zmq_assert (metadata == NULL);
metadata = new (std::nothrow) metadata_t (properties);
alloc_assert (metadata);
}

if (options.raw_notify) {
Expand Down Expand Up @@ -861,7 +862,10 @@ void zmq::stream_engine_t::mechanism_ready ()

zmq_assert (metadata == NULL);
if (!properties.empty ())
{
metadata = new (std::nothrow) metadata_t (properties);
alloc_assert (metadata);
}

#ifdef ZMQ_BUILD_DRAFT_API
socket->event_handshake_succeed(endpoint, 0);
Expand Down
2 changes: 1 addition & 1 deletion src/zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// implement zmq_poll on top of zmq_poller
int rc;
zmq_poller_event_t *events;
events = new zmq_poller_event_t[nitems_];
events = new (std::nothrow) zmq_poller_event_t[nitems_];
alloc_assert(events);
void *poller = zmq_poller_new ();
alloc_assert(poller);
Expand Down
6 changes: 4 additions & 2 deletions src/zmq_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "atomic_counter.hpp"
#include "atomic_ptr.hpp"
#include <assert.h>
#include <new>
#include <stdint.h>

#if !defined ZMQ_HAVE_WINDOWS
Expand Down Expand Up @@ -75,7 +76,8 @@ unsigned long zmq_stopwatch_stop (void *watch_)

void *zmq_threadstart(zmq_thread_fn* func, void* arg)
{
zmq::thread_t* thread = new zmq::thread_t;
zmq::thread_t* thread = new (std::nothrow) zmq::thread_t;
alloc_assert(thread);
thread->start(func, arg);
return thread;
}
Expand Down Expand Up @@ -265,7 +267,7 @@ int zmq_curve_public (char *z85_public_key, const char *z85_secret_key)

void *zmq_atomic_counter_new (void)
{
zmq::atomic_counter_t *counter = new zmq::atomic_counter_t;
zmq::atomic_counter_t *counter = new (std::nothrow) zmq::atomic_counter_t;
alloc_assert (counter);
return counter;
}
Expand Down