diff --git a/c_src/ezmq_nif.c b/c_src/ezmq_nif.c index c5c2374..9af5132 100644 --- a/c_src/ezmq_nif.c +++ b/c_src/ezmq_nif.c @@ -44,6 +44,8 @@ NIF(ezmq_nif_getsockopt); NIF(ezmq_nif_send); NIF(ezmq_nif_brecv); NIF(ezmq_nif_recv); +NIF(ezmq_nif_close); +NIF(ezmq_nif_term); static ErlNifFunc nif_funcs[] = { @@ -55,7 +57,9 @@ static ErlNifFunc nif_funcs[] = {"getsockopt", 2, ezmq_nif_getsockopt}, {"send", 3, ezmq_nif_send}, {"brecv", 2, ezmq_nif_brecv}, - {"recv", 2, ezmq_nif_recv} + {"recv", 2, ezmq_nif_recv}, + {"close", 1, ezmq_nif_close}, + {"term", 1, ezmq_nif_term} }; void * polling_thread(void * handle); @@ -570,9 +574,30 @@ void * polling_thread(void * handle) return NULL; } -static void ezmq_nif_resource_context_cleanup(ErlNifEnv* env, void* arg) +NIF(ezmq_nif_close) { - ezmq_context * ctx = (ezmq_context *)arg; + + ezmq_socket * socket; + + if (!enif_get_resource(env, argv[0], ezmq_nif_resource_socket, (void **) &socket)) { + return enif_make_badarg(env); + } + + if (-1 == zmq_close(socket->socket)) { + return enif_make_tuple2(env, enif_make_atom(env, "error"), enif_make_int(env, zmq_errno())); + } else { + return enif_make_atom(env, "ok"); + } +} + +NIF(ezmq_nif_term) +{ + ezmq_context * ctx; + + if (!enif_get_resource(env, argv[0], ezmq_nif_resource_context, (void **) &ctx)) { + return enif_make_badarg(env); + } + zmq_msg_t msg; ezmq_recv recv; recv.env = NULL; @@ -585,29 +610,28 @@ static void ezmq_nif_resource_context_cleanup(ErlNifEnv* env, void* arg) free(ctx->ipc_socket_name); enif_mutex_destroy(ctx->mutex); enif_cond_destroy(ctx->cond); - zmq_term(ctx->context); -} -static void ezmq_nif_resource_socket_cleanup(ErlNifEnv* env, void* arg) -{ - ezmq_socket * socket = (ezmq_socket *)arg; - zmq_close(socket->socket); + if (-1 == zmq_term(ctx->context)) { + return enif_make_tuple2(env, enif_make_atom(env, "error"), enif_make_int(env, zmq_errno())); + } else { + return enif_make_atom(env, "ok"); + } } static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) { ezmq_nif_resource_context = enif_open_resource_type(env, "ezmq_nif", - "ezmq_nif_resource_context", - &ezmq_nif_resource_context_cleanup, - ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, - 0); + "ezmq_nif_resource_context", + NULL, + ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, + 0); ezmq_nif_resource_socket = enif_open_resource_type(env, "ezmq_nif", - "ezmq_nif_resource_socket", - &ezmq_nif_resource_socket_cleanup, - ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, - 0); + "ezmq_nif_resource_socket", + NULL, + ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, + 0); return 0; } diff --git a/perf/local_lat.erl b/perf/local_lat.erl index a7e8107..83c2ec6 100755 --- a/perf/local_lat.erl +++ b/perf/local_lat.erl @@ -13,5 +13,7 @@ main([BindTo,MessageSizeStr,RoundtripCountStr]) -> RMsg = Msg, ezmq:send(Socket, Msg) end, - [ Do() || _I <- lists:seq(1,RoundtripCount) ]. + [ Do() || _I <- lists:seq(1,RoundtripCount) ], + ezmq:close(Socket), + ezmq:term(Context). diff --git a/perf/local_thr.erl b/perf/local_thr.erl index 97f532f..5a68ec6 100755 --- a/perf/local_thr.erl +++ b/perf/local_thr.erl @@ -21,5 +21,8 @@ main([BindTo,MessageSizeStr,MessageCountStr]) -> "message count: ~p~n" "mean throughput: ~p [msg/s]~n" "mean throughput: ~p [Mb/s]~n", - [MessageSize, MessageCount, Throughput, Megabits]). + [MessageSize, MessageCount, Throughput, Megabits]), + + ezmq:close(Socket), + ezmq:term(Context). diff --git a/perf/remote_lat.erl b/perf/remote_lat.erl index d533f03..f4cd13a 100755 --- a/perf/remote_lat.erl +++ b/perf/remote_lat.erl @@ -22,4 +22,6 @@ main([ConnectTo,MessageSizeStr,RoundtripCountStr]) -> io:format("message size: ~p [B]~n" "roundtrip count: ~p~n" "average latency: ~p [us]~n", - [MessageSize, RoundtripCount, Latency]). + [MessageSize, RoundtripCount, Latency]), + ezmq:close(Socket), + ezmq:term(Context). diff --git a/perf/remote_thr.erl b/perf/remote_thr.erl index c949201..73c1af6 100755 --- a/perf/remote_thr.erl +++ b/perf/remote_thr.erl @@ -8,5 +8,6 @@ main([ConnectTo,MessageSizeStr,MessageCountStr]) -> {ok, Socket} = ezmq:socket(Context,pub), ezmq:connect(Socket, ConnectTo), Msg = list_to_binary(lists:duplicate(MessageSize, 0)), - ezmq_perf:send_loop(MessageCount, Socket, Msg). - + ezmq_perf:send_loop(MessageCount, Socket, Msg), + ezmq:close(Socket), + ezmq:term(Context). diff --git a/src/ezmq.erl b/src/ezmq.erl index 4c7c2a5..d865682 100644 --- a/src/ezmq.erl +++ b/src/ezmq.erl @@ -1,6 +1,6 @@ -module(ezmq). -include_lib("ezmq.hrl"). --export([context/0, context/1, socket/2, bind/2, connect/2, send/2, send/3, brecv/1, brecv/2, recv/1, recv/2, setsockopt/3, getsockopt/2]). +-export([context/0, context/1, socket/2, bind/2, connect/2, send/2, send/3, brecv/1, brecv/2, recv/1, recv/2, setsockopt/3, getsockopt/2, close/1, term/1]). context() -> context(1). @@ -52,6 +52,12 @@ setsockopt(Socket, Name, Value) -> getsockopt(Socket, Name) -> ezmq_nif:getsockopt(Socket, option_name(Name)). +close(Socket) -> + ezmq_nif:close(Socket). + +term(Context) -> + ezmq_nif:term(Context). + %% Private socket_type(pair) -> diff --git a/src/ezmq_nif.erl b/src/ezmq_nif.erl index 876d6c5..289718f 100644 --- a/src/ezmq_nif.erl +++ b/src/ezmq_nif.erl @@ -1,6 +1,6 @@ -module(ezmq_nif). --export([context/1, socket/2, bind/2, connect/2, send/3, brecv/2, recv/2, setsockopt/3, getsockopt/2]). +-export([context/1, socket/2, bind/2, connect/2, send/3, brecv/2, recv/2, setsockopt/3, getsockopt/2, close/1, term/1]). -on_load(init/0). @@ -42,3 +42,9 @@ setsockopt(_Socket, _OptionName, _OptionValue) -> getsockopt(_Socket, _OptionName) -> erlang:nif_error(not_loaded). + +close(_Socket) -> + erlang:nif_error(not_loaded). + +term(_Context) -> + erlang:nif_error(not_loaded).