From e4a8986ac1e0023998fad5075b4ae0e39d633725 Mon Sep 17 00:00:00 2001 From: Evax Software Date: Fri, 18 Mar 2011 15:23:49 +0100 Subject: [PATCH] Remove brecv function that could block the current VM scheduler --- c_src/erlzmq_nif.c | 107 +++++++++++++++------------------------------ src/erlzmq.erl | 20 +-------- src/erlzmq_nif.erl | 5 +-- 3 files changed, 37 insertions(+), 95 deletions(-) diff --git a/c_src/erlzmq_nif.c b/c_src/erlzmq_nif.c index e70a9a9..5630452 100644 --- a/c_src/erlzmq_nif.c +++ b/c_src/erlzmq_nif.c @@ -44,7 +44,6 @@ NIF(erlzmq_nif_connect); NIF(erlzmq_nif_setsockopt); NIF(erlzmq_nif_getsockopt); NIF(erlzmq_nif_send); -NIF(erlzmq_nif_brecv); NIF(erlzmq_nif_recv); NIF(erlzmq_nif_close); NIF(erlzmq_nif_term); @@ -58,7 +57,6 @@ static ErlNifFunc nif_funcs[] = {"setsockopt", 3, erlzmq_nif_setsockopt}, {"getsockopt", 2, erlzmq_nif_getsockopt}, {"send", 3, erlzmq_nif_send}, - {"brecv", 2, erlzmq_nif_brecv}, {"recv", 2, erlzmq_nif_recv}, {"close", 1, erlzmq_nif_close}, {"term", 1, erlzmq_nif_term} @@ -384,48 +382,6 @@ NIF(erlzmq_nif_send) } -int brecv(zmq_msg_t * msg, erlzmq_socket * socket, int flags) { - int error; - if ((error = zmq_msg_init(msg))) { - return zmq_errno(); - } - - if ((error = zmq_recv(socket->socket, msg, flags))) { - return zmq_errno(); - } - - return 0; -} - -NIF(erlzmq_nif_brecv) -{ - erlzmq_socket * socket; - int _flags; - - if (!enif_get_resource(env, argv[0], erlzmq_nif_resource_socket, (void **) &socket)) { - return enif_make_badarg(env); - } - - if (!enif_get_int(env, argv[1], &_flags)) { - return enif_make_badarg(env); - } - - int error; - zmq_msg_t msg; - - if ((error = brecv(&msg, socket, _flags))) { - return enif_make_tuple2(env, enif_make_atom(env, "error"), enif_make_int(env, error)); - } - - ErlNifBinary bin; - enif_alloc_binary(zmq_msg_size(&msg), &bin); - memcpy(bin.data, zmq_msg_data(&msg), zmq_msg_size(&msg)); - - zmq_msg_close(&msg); - - return enif_make_tuple2(env, enif_make_atom(env, "ok"), enif_make_binary(env, &bin)); -} - NIF(erlzmq_nif_recv) { @@ -446,45 +402,52 @@ NIF(erlzmq_nif_recv) int error; zmq_msg_t msg; - // try brecv with noblock - - error = brecv(&msg, socket, ZMQ_NOBLOCK); + if (zmq_msg_init(&msg)) { + goto errno_out; + } - if (error == EAGAIN) { // if nothing is there, hand it off to the receiver thread - if (recv.flags & ZMQ_NOBLOCK) { - goto out; - } - recv.env = enif_alloc_env(); - recv.ref = enif_make_ref(recv.env); - recv.socket = socket->socket; + // try recv with noblock + if (zmq_recv(socket->socket, &msg, ZMQ_NOBLOCK)) { + error = zmq_errno(); + if (error == EAGAIN) { // if nothing is there, hand it off to the receiver thread + if (recv.flags & ZMQ_NOBLOCK) { + goto out; + } + recv.env = enif_alloc_env(); + recv.ref = enif_make_ref(recv.env); + recv.socket = socket->socket; - if ((error = zmq_msg_init_size(&msg, sizeof(erlzmq_recv)))) { + if (zmq_msg_init_size(&msg, sizeof(erlzmq_recv))) { goto q_err; - } + } - memcpy(zmq_msg_data(&msg), &recv, sizeof(erlzmq_recv)); + memcpy(zmq_msg_data(&msg), &recv, sizeof(erlzmq_recv)); - if ((error = zmq_send(socket->context->ipc_socket, &msg, 0))) { + if (zmq_send(socket->context->ipc_socket, &msg, 0)) { + zmq_msg_close(&msg); goto q_err; - } + } - zmq_msg_close(&msg); + zmq_msg_close(&msg); - return enif_make_copy(env, recv.ref); + return enif_make_copy(env, recv.ref); q_err: - enif_free_env(recv.env); - return enif_make_tuple2(env, enif_make_atom(env, "error"), - enif_make_int(env, zmq_errno())); - } else if (error == 0) { // return result immediately - ErlNifBinary bin; - enif_alloc_binary(zmq_msg_size(&msg), &bin); - memcpy(bin.data, zmq_msg_data(&msg), zmq_msg_size(&msg)); + enif_free_env(recv.env); + goto errno_out; + } else { + goto out; + } + } + ErlNifBinary bin; + enif_alloc_binary(zmq_msg_size(&msg), &bin); + memcpy(bin.data, zmq_msg_data(&msg), zmq_msg_size(&msg)); - zmq_msg_close(&msg); + zmq_msg_close(&msg); - return enif_make_tuple2(env, enif_make_atom(env, "ok"), - enif_make_binary(env, &bin)); - } + return enif_make_tuple2(env, enif_make_atom(env, "ok"), + enif_make_binary(env, &bin)); +errno_out: + error = zmq_errno(); out: return enif_make_tuple2(env, enif_make_atom(env, "error"), enif_make_int(env, error)); diff --git a/src/erlzmq.erl b/src/erlzmq.erl index 359180d..05c400d 100644 --- a/src/erlzmq.erl +++ b/src/erlzmq.erl @@ -2,8 +2,7 @@ %% @headerfile "erlzmq.hrl" -include_lib("erlzmq.hrl"). -export([context/0, context/1, socket/2, bind/2, connect/2, send/2, send/3, - brecv/1, brecv/2, recv/1, recv/2, setsockopt/3, getsockopt/2, - close/1, term/1, term/2]). + recv/1, recv/2, setsockopt/3, getsockopt/2, close/1, term/1, term/2]). -export_type([erlzmq_socket/0, erlzmq_context/0]). %% @equiv context(1) @@ -89,23 +88,6 @@ send(Socket, Binary) -> send(Socket, Binary, Flags) when is_list(Flags) -> erlzmq_result(erlzmq_nif:send(Socket, Binary, sendrecv_flags(Flags))). -%% @equiv brecv(Socket, 0) -%% @spec brecv(erlzmq_socket()) -> {ok, erlzmq_data()} | erlzmq_error() --spec brecv(Socket :: erlzmq_socket()) -> {ok, erlzmq_data()} | erlzmq_error(). - -brecv(Socket) -> - erlzmq_result(brecv(Socket, [])). - -%% @doc Receive a message from a socket in a blocking way. -%% This function can block the current VM scheduler. DO NOT USE IT UNLESS YOU REALLY KNOW WHAT YOU ARE DOING. -%% @end -%% @spec brecv(erlzmq_socket(), erlzmq_send_recv_flags()) -> {ok, erlzmq_data()} | erlzmq_error() --spec brecv(Socket :: erlzmq_socket(), Flags :: erlzmq_send_recv_flags()) -> {ok, erlzmq_data()} | erlzmq_error(). - -brecv(Socket, Flags) when is_list(Flags) -> - erlzmq_result( erlzmq_nif:brecv(Socket, sendrecv_flags(Flags))). - - %% @equiv recv(Socket, 0) %% @spec recv(erlzmq_socket()) -> {ok, erlzmq_data()} | erlzmq_error() -spec recv(Socket :: erlzmq_socket()) -> {ok, erlzmq_data()} | erlzmq_error(). diff --git a/src/erlzmq_nif.erl b/src/erlzmq_nif.erl index aa45ca2..c35e655 100644 --- a/src/erlzmq_nif.erl +++ b/src/erlzmq_nif.erl @@ -1,7 +1,7 @@ %% @hidden -module(erlzmq_nif). --export([context/1, socket/2, bind/2, connect/2, send/3, brecv/2, recv/2, setsockopt/3, getsockopt/2, close/1, term/1]). +-export([context/1, socket/2, bind/2, connect/2, send/3, recv/2, setsockopt/3, getsockopt/2, close/1, term/1]). -on_load(init/0). @@ -32,9 +32,6 @@ connect(_Socket, _Endpoint) -> send(_Socket, _Binary, _Flags) -> erlang:nif_error(not_loaded). -brecv(_Socket, _Flags) -> - erlang:nif_error(not_loaded). - recv(_Socket, _Flags) -> erlang:nif_error(not_loaded).