Skip to content

Commit

Permalink
Add {active_pid, pid()} option to socket/2, which implies {active, tr…
Browse files Browse the repository at this point in the history
…ue}. The change provides a controlling_process concept for active mode sockets (so that the code doesn't assume self() is always the controlling process).
  • Loading branch information
okeuday committed Feb 9, 2013
1 parent 4f2350b commit a45c2ad
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 9 deletions.
13 changes: 10 additions & 3 deletions c_src/erlzmq_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ typedef struct erlzmq_socket {
int64_t socket_index;
void * socket_zmq;
int active;
ErlNifPid active_pid;
ErlNifMutex * mutex;
} erlzmq_socket_t;

Expand Down Expand Up @@ -115,7 +116,7 @@ static ERL_NIF_TERM return_zmq_errno(ErlNifEnv* env, int const value);
static ErlNifFunc nif_funcs[] =
{
{"context", 1, erlzmq_nif_context},
{"socket", 3, erlzmq_nif_socket},
{"socket", 4, erlzmq_nif_socket},
{"bind", 2, erlzmq_nif_bind},
{"connect", 2, erlzmq_nif_connect},
{"setsockopt", 3, erlzmq_nif_setsockopt},
Expand Down Expand Up @@ -180,6 +181,7 @@ NIF(erlzmq_nif_socket)
erlzmq_context_t * context;
int socket_type;
int active;
ErlNifPid active_pid;

if (! enif_get_resource(env, argv[0], erlzmq_nif_resource_context,
(void **) &context)) {
Expand All @@ -194,6 +196,10 @@ NIF(erlzmq_nif_socket)
return enif_make_badarg(env);
}

if (! enif_get_local_pid(env, argv[3], &active_pid)) {
return enif_make_badarg(env);
}

erlzmq_socket_t * socket = enif_alloc_resource(erlzmq_nif_resource_socket,
sizeof(erlzmq_socket_t));
assert(socket);
Expand All @@ -204,6 +210,7 @@ NIF(erlzmq_nif_socket)
return return_zmq_errno(env, zmq_errno());
}
socket->active = active;
socket->active_pid = active_pid;
socket->mutex = enif_mutex_create("erlzmq_socket_t_mutex");
assert(socket->mutex);

Expand Down Expand Up @@ -796,7 +803,7 @@ static void * polling_thread(void * handle)
{
enif_mutex_unlock(r->data.recv.socket->mutex);
if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
enif_send(NULL, &r->data.recv.socket->active_pid, r->data.recv.env,
enif_make_tuple3(r->data.recv.env,
enif_make_atom(r->data.recv.env, "zmq"),
enif_make_tuple2(r->data.recv.env,
Expand Down Expand Up @@ -833,7 +840,7 @@ static void * polling_thread(void * handle)
flags_list = enif_make_list(r->data.recv.env, 0);
}

enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
enif_send(NULL, &r->data.recv.socket->active_pid, r->data.recv.env,
enif_make_tuple4(r->data.recv.env,
enif_make_atom(r->data.recv.env, "zmq"),
enif_make_tuple2(r->data.recv.env,
Expand Down
13 changes: 9 additions & 4 deletions src/erlzmq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ context(Threads) when is_integer(Threads) ->
-spec socket(Context :: erlzmq_context(),
Type :: erlzmq_socket_type() |
list(erlzmq_socket_type() |
{active, boolean()})) ->
{active, boolean()} |
{active_pid, pid()})) ->
{ok, erlzmq_socket()} |
erlzmq_error().
socket(Context, Type) when is_atom(Type) ->
Expand All @@ -112,14 +113,18 @@ socket(Context, [H | [Type]]) when is_tuple(H) ->

-spec socket(Context :: erlzmq_context(),
Type :: erlzmq_socket_type(),
{active, boolean()}) ->
{active, boolean()} | {active_pid, pid()}) ->
{ok, erlzmq_socket()} |
erlzmq_error().
socket(Context, Type, {active, true}) ->
true = (Type =/= pub) and (Type =/= push) and (Type =/= xpub),
erlzmq_nif:socket(Context, socket_type(Type), 1);
erlzmq_nif:socket(Context, socket_type(Type), 1, self());
socket(Context, Type, {active_pid, Pid})
when is_pid(Pid), node(Pid) =:= node() ->
true = (Type =/= pub) and (Type =/= push) and (Type =/= xpub),
erlzmq_nif:socket(Context, socket_type(Type), 1, Pid);
socket(Context, Type, {active, false}) ->
erlzmq_nif:socket(Context, socket_type(Type), 0).
erlzmq_nif:socket(Context, socket_type(Type), 0, self()).


%% @doc Accept connections on a socket.
Expand Down
4 changes: 2 additions & 2 deletions src/erlzmq_nif.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
-module(erlzmq_nif).

-export([context/1,
socket/3,
socket/4,
bind/2,
connect/2,
send/3,
Expand Down Expand Up @@ -37,7 +37,7 @@ init() ->
context(_Threads) ->
erlang:nif_error(not_loaded).

socket(_Context, _Type, _Active) ->
socket(_Context, _Type, _Active, _ActivePid) ->
erlang:nif_error(not_loaded).

bind(_Socket, _Endpoint) ->
Expand Down

0 comments on commit a45c2ad

Please sign in to comment.