Skip to content

Commit

Permalink
Provide a 2tuple/index envelope for matching on erlzmq sockets return…
Browse files Browse the repository at this point in the history
…ed from the NIF.
  • Loading branch information
okeuday committed Mar 25, 2011
1 parent fa6cb33 commit 9ed06d9
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 91 deletions.
16 changes: 13 additions & 3 deletions c_src/erlzmq_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ typedef struct erlzmq_context {
void * ipc_socket;
char * ipc_socket_name;
int running;
int64_t socket_index;
ErlNifCond * cond;
ErlNifMutex * mutex;
ErlNifTid polling_tid;
Expand All @@ -49,6 +50,7 @@ typedef struct erlzmq_context {

typedef struct erlzmq_socket {
erlzmq_context_t * context;
int64_t socket_index;
void * socket_zmq;
int active;
} erlzmq_socket_t;
Expand Down Expand Up @@ -121,6 +123,7 @@ NIF(erlzmq_nif_context)
zmq_bind(handle->ipc_socket,socket_id);

handle->running = 0;
handle->socket_index = 1;
handle->mutex = enif_mutex_create("erlzmq_context_t_mutex");
handle->cond = enif_cond_create("erlzmq_context_t_cond");

Expand Down Expand Up @@ -174,12 +177,15 @@ NIF(erlzmq_nif_socket)
sizeof(erlzmq_socket_t));

handle->context = ctx;
handle->socket_index = ctx->socket_index++;
handle->socket_zmq = zmq_socket(ctx->context_zmq, socket_type);
handle->active = active;

ERL_NIF_TERM result = enif_make_resource(env, handle);
ERL_NIF_TERM socket =
enif_make_tuple2(env, enif_make_uint64(env, handle->socket_index),
enif_make_resource(env, handle));

return enif_make_tuple2(env, enif_make_atom(env, "ok"), result);
return enif_make_tuple2(env, enif_make_atom(env, "ok"), socket);
}

NIF(erlzmq_nif_bind)
Expand Down Expand Up @@ -647,10 +653,14 @@ static void * polling_thread(void * handle)
zmq_msg_close(&msg);

if (r->socket->active == ERLZMQ_ACTIVE_ON) {
ERL_NIF_TERM socket =
enif_make_tuple2(r->env,
enif_make_uint64(r->env, r->socket->socket_index),
enif_make_resource(r->env, r->socket));
enif_send(NULL, &r->pid, r->env,
enif_make_tuple3(r->env,
enif_make_atom(r->env, "zmq"),
enif_make_resource(r->env, r->socket),
socket,
enif_make_binary(r->env, &binary)));
}
else {
Expand Down
4 changes: 2 additions & 2 deletions src/erlzmq.app.src
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{application, erlzmq,
[
{description, ""},
{description, "Erlang ZeroMQ Driver"},
{vsn, "2.0"},
{modules, [erlzmq, erlzmq_nif]},
{registered, []},
{applications, [
kernel,
stdlib
]},
{mod, { erlzmq_app, []}},
{env, []}
]}.
187 changes: 104 additions & 83 deletions src/erlzmq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,26 @@
-module(erlzmq).
%% @headerfile "erlzmq.hrl"
-include_lib("erlzmq.hrl").
-export([context/0, context/1, socket/2, bind/2, connect/2, send/2, send/3,
recv/1, recv/2, setsockopt/3, getsockopt/2, close/1, term/1, term/2]).
-export([context/0,
context/1,
socket/2,
bind/2,
connect/2,
send/2,
send/3,
recv/1,
recv/2,
setsockopt/3,
getsockopt/2,
close/1,
term/1,
term/2]).
-export_type([erlzmq_socket/0, erlzmq_context/0]).

%% @equiv context(1)
%% @spec context() -> {ok, erlzmq_context()} | erlzmq_error()
-spec context() -> {ok, erlzmq_context()} | erlzmq_error().
-spec context() ->
{ok, erlzmq_context()} |
erlzmq_error().
context() ->
context(1).

Expand All @@ -46,9 +59,9 @@ context() ->
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq-init">zmq_init</a></i>
%% @end
%% @spec context(pos_integer()) -> {ok, erlzmq_context()} | erlzmq_error()
-spec context(Threads :: pos_integer()) -> {ok, erlzmq_context()} | erlzmq_error().

-spec context(Threads :: pos_integer()) ->
{ok, erlzmq_context()} |
erlzmq_error().
context(Threads) when is_integer(Threads) ->
erlzmq_nif:context(Threads).

Expand All @@ -65,9 +78,12 @@ context(Threads) when is_integer(Threads) ->
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_socket">zmq_socket</a>.</i>
%% @end
%% @spec socket(erlzmq_context(), erlzmq_socket_type() | list(erlzmq_socket_type() | {active, boolean()})) -> {ok, erlzmq_socket()} | erlzmq_error()
-spec socket(Context :: erlzmq_context(), Type :: erlzmq_socket_type() | list(erlzmq_socket_type() | {active, boolean()})) -> {ok, erlzmq_socket()} | erlzmq_error().

-spec socket(Context :: erlzmq_context(),
Type :: erlzmq_socket_type() |
list(erlzmq_socket_type() |
{active, boolean()})) ->
{ok, {pos_integer(), erlzmq_socket()}} |
erlzmq_error().
socket(Context, Type) when is_atom(Type) ->
socket(Context, [Type]);
socket(Context, [H | _] = L) ->
Expand All @@ -90,39 +106,47 @@ socket(Context, [H | _] = L) ->
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_bind">zmq_bind</a>.</i>
%% @end
%% @spec bind(erlzmq_socket(), erlzmq_endpoint()) -> ok | erlzmq_error()
-spec bind(Socket :: erlzmq_socket(), Endpoint :: erlzmq_endpoint()) -> ok | erlzmq_error().

bind(Socket, Endpoint) when is_list(Endpoint) ->
erlzmq_result(erlzmq_nif:bind(Socket, Endpoint)).
-spec bind(SocketTuple :: {pos_integer(), erlzmq_socket()},
Endpoint :: erlzmq_endpoint()) ->
ok |
erlzmq_error().
bind({I, Socket}, Endpoint)
when is_integer(I), is_list(Endpoint) ->
erlzmq_nif:bind(Socket, Endpoint).

%% @doc Connect a socket.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_connect">zmq_connect</a>.</i>
%% @end
%% @spec connect(erlzmq_socket(), erlzmq_endpoint()) -> ok | erlzmq_error()
-spec connect(Socket :: erlzmq_socket(), Endpoint :: erlzmq_endpoint()) -> ok | erlzmq_error().

connect(Socket, Endpoint) when is_list(Endpoint) ->
erlzmq_result(erlzmq_nif:connect(Socket, Endpoint)).
-spec connect(SocketTuple :: {pos_integer(), erlzmq_socket()},
Endpoint :: erlzmq_endpoint()) ->
ok |
erlzmq_error().
connect({I, Socket}, Endpoint)
when is_integer(I), is_list(Endpoint) ->
erlzmq_nif:connect(Socket, Endpoint).

%% @equiv send(Socket, Msg, [])
%% @spec send(erlzmq_socket(), erlzmq_data()) -> ok | erlzmq_error()
-spec send(Socket :: erlzmq_socket(), Data :: erlzmq_data()) -> ok | erlzmq_error().

send(Socket, Binary) when is_binary(Binary) ->
send(Socket, Binary, []).
-spec send(SocketTuple :: {pos_integer(), erlzmq_socket()},
Data :: erlzmq_data()) ->
ok |
erlzmq_error().
send(SocketTuple, Binary) when is_binary(Binary) ->
send(SocketTuple, Binary, []).

%% @doc Send a message on a socket.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_send">zmq_send</a>.</i>
%% @end
%% @spec send(ezma_socket(), erlzmq_data(), erlzmq_send_recv_flags()) -> ok | erlzmq_error()
-spec send(Socket :: erlzmq_socket(), Data :: erlzmq_data(), Flags :: erlzmq_send_recv_flags()) -> ok | erlzmq_error().

send(Socket, Binary, Flags) when is_binary(Binary), is_list(Flags) ->
-spec send(SocketTuple :: {pos_integer(), erlzmq_socket()},
Data :: erlzmq_data(),
Flags :: erlzmq_send_recv_flags()) ->
ok |
erlzmq_error().
send({I, Socket}, Binary, Flags)
when is_integer(I), is_binary(Binary), is_list(Flags) ->
case erlzmq_nif:send(Socket, Binary, sendrecv_flags(Flags)) of
Ref when is_reference(Ref) ->
receive
Expand All @@ -132,78 +156,83 @@ send(Socket, Binary, Flags) when is_binary(Binary), is_list(Flags) ->
Error
end;
Result ->
erlzmq_result(Result)
Result
end.

%% @equiv recv(Socket, 0)
%% @spec recv(erlzmq_socket()) -> {ok, erlzmq_data()} | erlzmq_error()
-spec recv(Socket :: erlzmq_socket()) -> {ok, erlzmq_data()} | erlzmq_error().

recv(Socket) ->
recv(Socket, []).
-spec recv(SocketTuple :: {pos_integer(), erlzmq_socket()}) ->
{ok, erlzmq_data()} |
erlzmq_error().
recv(SocketTuple) ->
recv(SocketTuple, []).

%% @doc Receive a message from a socket.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_recv">zmq_recv</a>.</i>
%% @end
%% @spec recv(erlzmq_socket(), erlzmq_send_recv_flags()) -> {ok, erlzmq_data()} | erlzmq_error()
-spec recv(Socket :: erlzmq_socket(), Flags :: erlzmq_send_recv_flags()) -> {ok, erlzmq_data()} | erlzmq_error() | {error, timeout, reference()}.

recv(Socket, Flags) when is_list(Flags) ->
-spec recv(SocketTuple :: {pos_integer(), erlzmq_socket()},
Flags :: erlzmq_send_recv_flags()) ->
{ok, erlzmq_data()} |
erlzmq_error() |
{error, {timeout, reference()}}.
recv({I, Socket}, Flags)
when is_integer(I), is_list(Flags) ->
case erlzmq_nif:recv(Socket, sendrecv_flags(Flags)) of
Ref when is_reference(Ref) ->
Timeout = proplists:get_value(timeout, Flags, infinity),
receive
{Ref, Result} ->
{ok, Result}
after Timeout ->
{error, timeout, Ref}
{error, {timeout, Ref}}
end;
Result ->
erlzmq_result(Result)
Result
end.

%% @doc Set an {@link erlzmq_sockopt(). option} associated with a socket.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_setsockopt">zmq_setsockopt</a>.</i>
%% @end
%% @spec setsockopt(erlzmq_socket(), erlzmq_sockopt(), erlzmq_sockopt_value()) -> ok | erlzmq_error()
-spec setsockopt(Socket :: erlzmq_socket(), Name :: erlzmq_sockopt(), erlzmq_sockopt_value()) -> ok | erlzmq_error().

setsockopt(Socket, Name, Value) when is_list(Value) ->
setsockopt(Socket, Name, erlang:list_to_binary(Value));
setsockopt(Socket, Name, Value) when is_atom(Name) ->
erlzmq_result(erlzmq_nif:setsockopt(Socket, option_name(Name), Value)).
-spec setsockopt(SocketTuple :: {pos_integer(), erlzmq_socket()},
Name :: erlzmq_sockopt(),
erlzmq_sockopt_value()) ->
ok |
erlzmq_error().
setsockopt(SocketTuple, Name, Value) when is_list(Value) ->
setsockopt(SocketTuple, Name, erlang:list_to_binary(Value));
setsockopt({I, Socket}, Name, Value) when is_integer(I), is_atom(Name) ->
erlzmq_nif:setsockopt(Socket, option_name(Name), Value).

%% @doc Get an {@link erlzmq_sockopt(). option} associated with a socket.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_getsockopt">zmq_getsockopt</a>.</i>
%% @end
%% @spec getsockopt(erlzmq_socket(), erlzmq_sockopt()) -> {ok, erlzmq_sockopt_value()} | erlzmq_error()
-spec getsockopt(Socket :: erlzmq_socket(), Name :: erlzmq_sockopt()) -> {ok, erlzmq_sockopt_value()} | erlzmq_error().

getsockopt(Socket, Name) when is_atom(Name) ->
erlzmq_result(erlzmq_nif:getsockopt(Socket, option_name(Name))).

-spec getsockopt(SocketTuple :: {pos_integer(), erlzmq_socket()},
Name :: erlzmq_sockopt()) ->
{ok, erlzmq_sockopt_value()} |
erlzmq_error().
getsockopt({I, Socket}, Name) when is_integer(I), is_atom(Name) ->
erlzmq_nif:getsockopt(Socket, option_name(Name)).

%% @doc Close the given socket.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_close">zmq_close</a>.</i>
%% @end
%% @spec close(erlzmq_socket()) -> ok | erlzmq_error()
-spec close(Socket :: erlzmq_socket()) -> ok | erlzmq_error().

close(Socket) ->
erlzmq_result(erlzmq_nif:close(Socket)).
-spec close(SocketTuple :: {pos_integer(), erlzmq_socket()}) ->
ok |
erlzmq_error().
close({I, Socket}) when is_integer(I) ->
erlzmq_nif:close(Socket).

%% @equiv term(Context, infinity)
%% @spec term(erlzmq_context()) -> ok | erlzmq_error()
-spec term(Context :: erlzmq_context()) -> ok | erlzmq_error().

-spec term(Context :: erlzmq_context()) ->
ok |
erlzmq_error().
term(Context) ->
term(Context, infinity).

Expand All @@ -216,8 +245,11 @@ term(Context) ->
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_term">zmq_term</a>.</i>
%% @end
%% @spec term(erlzmq_context(), timeout()) -> ok | erlzmq_error()
-spec term(Context :: erlzmq_context(), Timeout :: timeout()) -> ok | erlzmq_error() | {error, timeout, reference()}.
-spec term(Context :: erlzmq_context(),
Timeout :: timeout()) ->
ok |
erlzmq_error() |
{error, {timeout, reference()}}.

term(Context, Timeout) ->
case erlzmq_nif:term(Context) of
Expand All @@ -226,16 +258,17 @@ term(Context, Timeout) ->
{Ref, Result} ->
Result
after Timeout ->
{error, timeout, Ref}
{error, {timeout, Ref}}
end;
Result ->
erlzmq_result(Result)
Result
end.


%% Private

-spec socket_type(Type :: erlzmq_socket_type()) -> integer().
-spec socket_type(Type :: erlzmq_socket_type()) ->
integer().

socket_type(pair) ->
?'ZMQ_PAIR';
Expand All @@ -260,7 +293,8 @@ socket_type(xpub) ->
socket_type(xsub) ->
?'ZMQ_XSUB'.

-spec sendrecv_flags(Flags :: erlzmq_send_recv_flags()) -> integer().
-spec sendrecv_flags(Flags :: erlzmq_send_recv_flags()) ->
integer().

sendrecv_flags([]) ->
0;
Expand All @@ -271,7 +305,8 @@ sendrecv_flags([noblock|Rest]) ->
sendrecv_flags([sndmore|Rest]) ->
?'ZMQ_SNDMORE' bor sendrecv_flags(Rest).

-spec option_name(Name :: erlzmq_sockopt()) -> integer().
-spec option_name(Name :: erlzmq_sockopt()) ->
integer().

option_name(hwm) ->
?'ZMQ_HWM';
Expand Down Expand Up @@ -312,17 +347,3 @@ option_name(recovery_ivl_msec) ->
option_name(reconnect_ivl_max) ->
?'ZMQ_RECONNECT_IVL_MAX'.


-spec erlzmq_result(ok) -> ok;
({ok, Value :: term()}) -> Value :: term();
({error, Value :: atom()}) -> Value :: atom();
({error, integer()}) -> {error, erlzmq_error_type()}.

erlzmq_result(ok) ->
ok;
erlzmq_result({ok, _} = Result) ->
Result;
erlzmq_result({error, Code} = Error) when is_atom(Code) ->
Error;
erlzmq_result({error, Code} = Error) when is_integer(Code) ->
Error.
Loading

0 comments on commit 9ed06d9

Please sign in to comment.