|
| 1 | +-module(erlzmq_device). |
| 2 | + |
| 3 | +-export([queue/2]). |
| 4 | + |
| 5 | +-import(proplists, [get_bool/2]). |
| 6 | + |
| 7 | +%%-------------------------------------------------------------------- |
| 8 | +%% @doc A queue device implemented in Erlang. |
| 9 | +%% |
| 10 | +%% Frontend and Backend must be sockets in active mode. |
| 11 | +%% |
| 12 | +%% This function will not return. |
| 13 | +%% |
| 14 | +%% @spec queue(Frontend, Backend) -> any() |
| 15 | +%% Frontend = erlzmq_socket() |
| 16 | +%% Backend = erlzmq_socket() |
| 17 | +%% @end |
| 18 | +%%-------------------------------------------------------------------- |
| 19 | + |
| 20 | +queue(Frontend, Backend) -> |
| 21 | + receive |
| 22 | + {zmq, Frontend, Msg, Flags} -> |
| 23 | + Parts = lists:reverse(queue_recv_acc(Frontend, Flags, [Msg])), |
| 24 | + queue_send(Backend, Parts); |
| 25 | + {zmq, Backend, Msg, Flags} -> |
| 26 | + Parts = lists:reverse(queue_recv_acc(Backend, Flags, [Msg])), |
| 27 | + queue_send(Frontend, Parts) |
| 28 | + end, |
| 29 | + queue(Frontend, Backend). |
| 30 | + |
| 31 | +%%-------------------------------------------------------------------- |
| 32 | +%% @doc Accumulates messages from Socket. |
| 33 | +%% @spec queue_recv_acc(Socket, Flags, Acc0) -> Acc |
| 34 | +%% @end |
| 35 | +%%-------------------------------------------------------------------- |
| 36 | + |
| 37 | +queue_recv_acc(Socket, Flags0, Acc) -> |
| 38 | + case get_bool(rcvmore, Flags0) of |
| 39 | + true -> |
| 40 | + receive |
| 41 | + {zmq, Socket, Msg, Flags} -> |
| 42 | + queue_recv_acc(Socket, Flags, [Msg|Acc]) |
| 43 | + end; |
| 44 | + false -> Acc |
| 45 | + end. |
| 46 | + |
| 47 | +%%-------------------------------------------------------------------- |
| 48 | +%% @doc Sends a multipart message to Out. |
| 49 | +%% @spec queue_send(erlzmq_socket(), Parts) -> ok |
| 50 | +%% @end |
| 51 | +%%-------------------------------------------------------------------- |
| 52 | + |
| 53 | +queue_send(Out, [LastPart]) -> |
| 54 | + ok = erlzmq:send(Out, LastPart); |
| 55 | +queue_send(Out, [Part|Rest]) -> |
| 56 | + ok = erlzmq:send(Out, Part, [sndmore]), |
| 57 | + queue_send(Out, Rest). |
0 commit comments