Skip to content

Commit

Permalink
Added ezmq:close/1 and ezmq:term/1 functions as GC cleanup magic was …
Browse files Browse the repository at this point in the history
…often resulting in some hard to debug behaviour.
  • Loading branch information
yrashk committed Mar 4, 2011
1 parent 8e258f8 commit 307d4ee
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 24 deletions.
58 changes: 41 additions & 17 deletions c_src/ezmq_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ NIF(ezmq_nif_getsockopt);
NIF(ezmq_nif_send);
NIF(ezmq_nif_brecv);
NIF(ezmq_nif_recv);
NIF(ezmq_nif_close);
NIF(ezmq_nif_term);

static ErlNifFunc nif_funcs[] =
{
Expand All @@ -55,7 +57,9 @@ static ErlNifFunc nif_funcs[] =
{"getsockopt", 2, ezmq_nif_getsockopt},
{"send", 3, ezmq_nif_send},
{"brecv", 2, ezmq_nif_brecv},
{"recv", 2, ezmq_nif_recv}
{"recv", 2, ezmq_nif_recv},
{"close", 1, ezmq_nif_close},
{"term", 1, ezmq_nif_term}
};

void * polling_thread(void * handle);
Expand Down Expand Up @@ -570,9 +574,30 @@ void * polling_thread(void * handle)
return NULL;
}

static void ezmq_nif_resource_context_cleanup(ErlNifEnv* env, void* arg)
NIF(ezmq_nif_close)
{
ezmq_context * ctx = (ezmq_context *)arg;

ezmq_socket * socket;

if (!enif_get_resource(env, argv[0], ezmq_nif_resource_socket, (void **) &socket)) {
return enif_make_badarg(env);
}

if (-1 == zmq_close(socket->socket)) {
return enif_make_tuple2(env, enif_make_atom(env, "error"), enif_make_int(env, zmq_errno()));
} else {
return enif_make_atom(env, "ok");
}
}

NIF(ezmq_nif_term)
{
ezmq_context * ctx;

if (!enif_get_resource(env, argv[0], ezmq_nif_resource_context, (void **) &ctx)) {
return enif_make_badarg(env);
}

zmq_msg_t msg;
ezmq_recv recv;
recv.env = NULL;
Expand All @@ -585,29 +610,28 @@ static void ezmq_nif_resource_context_cleanup(ErlNifEnv* env, void* arg)
free(ctx->ipc_socket_name);
enif_mutex_destroy(ctx->mutex);
enif_cond_destroy(ctx->cond);
zmq_term(ctx->context);
}

static void ezmq_nif_resource_socket_cleanup(ErlNifEnv* env, void* arg)
{
ezmq_socket * socket = (ezmq_socket *)arg;
zmq_close(socket->socket);
if (-1 == zmq_term(ctx->context)) {
return enif_make_tuple2(env, enif_make_atom(env, "error"), enif_make_int(env, zmq_errno()));
} else {
return enif_make_atom(env, "ok");
}
}

static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
{
ezmq_nif_resource_context =
enif_open_resource_type(env, "ezmq_nif",
"ezmq_nif_resource_context",
&ezmq_nif_resource_context_cleanup,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER,
0);
"ezmq_nif_resource_context",
NULL,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER,
0);
ezmq_nif_resource_socket =
enif_open_resource_type(env, "ezmq_nif",
"ezmq_nif_resource_socket",
&ezmq_nif_resource_socket_cleanup,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER,
0);
"ezmq_nif_resource_socket",
NULL,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER,
0);
return 0;
}

Expand Down
4 changes: 3 additions & 1 deletion perf/local_lat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@ main([BindTo,MessageSizeStr,RoundtripCountStr]) ->
RMsg = Msg,
ezmq:send(Socket, Msg)
end,
[ Do() || _I <- lists:seq(1,RoundtripCount) ].
[ Do() || _I <- lists:seq(1,RoundtripCount) ],
ezmq:close(Socket),
ezmq:term(Context).

5 changes: 4 additions & 1 deletion perf/local_thr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,8 @@ main([BindTo,MessageSizeStr,MessageCountStr]) ->
"message count: ~p~n"
"mean throughput: ~p [msg/s]~n"
"mean throughput: ~p [Mb/s]~n",
[MessageSize, MessageCount, Throughput, Megabits]).
[MessageSize, MessageCount, Throughput, Megabits]),

ezmq:close(Socket),
ezmq:term(Context).

4 changes: 3 additions & 1 deletion perf/remote_lat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ main([ConnectTo,MessageSizeStr,RoundtripCountStr]) ->
io:format("message size: ~p [B]~n"
"roundtrip count: ~p~n"
"average latency: ~p [us]~n",
[MessageSize, RoundtripCount, Latency]).
[MessageSize, RoundtripCount, Latency]),
ezmq:close(Socket),
ezmq:term(Context).
5 changes: 3 additions & 2 deletions perf/remote_thr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ main([ConnectTo,MessageSizeStr,MessageCountStr]) ->
{ok, Socket} = ezmq:socket(Context,pub),
ezmq:connect(Socket, ConnectTo),
Msg = list_to_binary(lists:duplicate(MessageSize, 0)),
ezmq_perf:send_loop(MessageCount, Socket, Msg).

ezmq_perf:send_loop(MessageCount, Socket, Msg),
ezmq:close(Socket),
ezmq:term(Context).
8 changes: 7 additions & 1 deletion src/ezmq.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-module(ezmq).
-include_lib("ezmq.hrl").
-export([context/0, context/1, socket/2, bind/2, connect/2, send/2, send/3, brecv/1, brecv/2, recv/1, recv/2, setsockopt/3, getsockopt/2]).
-export([context/0, context/1, socket/2, bind/2, connect/2, send/2, send/3, brecv/1, brecv/2, recv/1, recv/2, setsockopt/3, getsockopt/2, close/1, term/1]).

context() ->
context(1).
Expand Down Expand Up @@ -52,6 +52,12 @@ setsockopt(Socket, Name, Value) ->
getsockopt(Socket, Name) ->
ezmq_nif:getsockopt(Socket, option_name(Name)).

close(Socket) ->
ezmq_nif:close(Socket).

term(Context) ->
ezmq_nif:term(Context).

%% Private

socket_type(pair) ->
Expand Down
8 changes: 7 additions & 1 deletion src/ezmq_nif.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-module(ezmq_nif).

-export([context/1, socket/2, bind/2, connect/2, send/3, brecv/2, recv/2, setsockopt/3, getsockopt/2]).
-export([context/1, socket/2, bind/2, connect/2, send/3, brecv/2, recv/2, setsockopt/3, getsockopt/2, close/1, term/1]).

-on_load(init/0).

Expand Down Expand Up @@ -42,3 +42,9 @@ setsockopt(_Socket, _OptionName, _OptionValue) ->

getsockopt(_Socket, _OptionName) ->
erlang:nif_error(not_loaded).

close(_Socket) ->
erlang:nif_error(not_loaded).

term(_Context) ->
erlang:nif_error(not_loaded).

0 comments on commit 307d4ee

Please sign in to comment.