Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion include/types.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,11 @@
%% atom names saves a little bit of bandwidth.
-define(CAST(M, F, A), {cast, M, F, A}).
-define(ORDERED_CAST(M, F, A), {oc, M, F, A}).
-define(ABCAST(N, M), {abcast, N, M}).

-define(IS_CAST(MSG), ((MSG) =:= cast orelse (MSG) =:= oc)).
-define(IS_CAST_MSG(MSG),
is_tuple(MSG) andalso
(element(1, MSG) =:= abcast orelse
element(1, MSG) =:= cast orelse
element(1, MSG) =:= oc)
).
26 changes: 15 additions & 11 deletions src/gen_rpc_acceptor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -136,24 +136,17 @@ waiting_for_data(info, {Driver,Socket,Data},
?ORDERED_CAST(M, F, A) ->
handle_cast(M, F, A, true, State),
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
?ABCAST(N, M) ->
handle_abcast(N, M, State),
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
BatchCast when is_list(BatchCast) ->
lists:foreach(fun(?CAST(M, F, A)) -> handle_cast(M, F, A, false, State);
(?ORDERED_CAST(M, F, A)) -> handle_cast(M, F, A, true, State);
(?ABCAST(N, M)) -> handle_abcast(N, M, State);
(Invalid) -> ?tp(error, gen_rpc_invalid_batch, #{socket => gen_rpc_helper:socket_to_string(Socket), data => Invalid})
end,
BatchCast),
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
{abcast, Name, Msg} ->
_Result = case check_if_module_allowed(erlang, Control, List) of
true ->
?log(debug, "event=abcast_received driver=~s socket=\"~s\" peer=\"~s\" process=~s message=\"~p\"",
[Driver, gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Name, Msg]),
Msg = erlang:send(Name, Msg);
false ->
?log(debug, "event=request_not_allowed driver=~s socket=\"~s\" control=~s method=~s",
[Driver, gen_rpc_helper:socket_to_string(Socket), Control, abcast])
end,
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
{sbcast, Name, Msg, Caller} ->
Reply = case check_if_module_allowed(erlang, Control, List) of
true ->
Expand Down Expand Up @@ -350,6 +343,17 @@ handle_cast(M, F, A, Ordered, #state{socket=Socket, driver=Driver, peer=Peer, co
?log(debug, "event=request_not_allowed driver=~s socket=\"~s\" control=~s method=cast module=~s",[Driver, gen_rpc_helper:socket_to_string(Socket), Control, RealM])
end.

handle_abcast(Name, Msg, #state{socket=Socket, driver=Driver, peer=Peer, control=Control, list=List}) ->
case check_if_module_allowed(erlang, Control, List) of
true ->
?log(debug, "event=abcast_received driver=~s socket=\"~s\" peer=\"~s\" process=~s message=\"~p\"",
[Driver, gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Name, Msg]),
Msg = erlang:send({Name, node()}, Msg);
false ->
?log(debug, "event=request_not_allowed driver=~s socket=\"~s\" control=~s method=~s",
[Driver, gen_rpc_helper:socket_to_string(Socket), Control, abcast])
end.

exec_cast(M, F, A, _PreserveOrder = true) ->
{Pid, MRef} = erlang:spawn_monitor(M, F, A),
receive
Expand Down
32 changes: 17 additions & 15 deletions src/gen_rpc_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ abcast(Name, Msg) when is_atom(Name) ->

-spec abcast(list(), atom(), term()) -> abcast.
abcast(Nodes, Name, Msg) when is_list(Nodes), is_atom(Name) ->
_ = [erlang:spawn(?MODULE, cast_worker, [Node, {abcast,Name,Msg}, abcast, undefined]) || Node <- Nodes],
_ = [do_abcast(Node, Name, Msg) || Node <- Nodes],
abcast.

-spec sbcast(atom(), term()) -> {list(), list()}.
Expand Down Expand Up @@ -330,18 +330,14 @@ handle_cast(Msg, #state{socket=Socket, driver=Driver} = State) ->
{stop, {unknown_cast, Msg}, State}.

%% This is the actual CAST handler for CAST
handle_info({{Cast, _M, _F, _A} = PacketTuple, SendTimeout}, State = #state{max_batch_size = 0}) when ?IS_CAST(Cast) ->
send_cast(PacketTuple, State, SendTimeout, true);
handle_info({{Cast, _M, _F, _A} = PacketTuple, SendTimeout}, State = #state{max_batch_size = MaxBatchSize}) when ?IS_CAST(Cast) ->
send_cast(drain_cast(MaxBatchSize, [PacketTuple]), State, SendTimeout, true);

%% This is the actual CAST handler for ABCAST
handle_info({{abcast,_Name,_Msg} = PacketTuple, undefined}, State) ->
send_cast(PacketTuple, State, undefined, false);
handle_info({PacketTuple, SendTimeout}, State = #state{max_batch_size = 0}) when ?IS_CAST_MSG(PacketTuple) ->
send_cast(PacketTuple, State, SendTimeout);
handle_info({PacketTuple, SendTimeout}, State = #state{max_batch_size = MaxBatchSize}) when ?IS_CAST_MSG(PacketTuple) ->
send_cast(drain_cast(MaxBatchSize, [PacketTuple]), State, SendTimeout);

%% This is the actual CAST handler for SBCAST
handle_info({{sbcast,_Name,_Msg,_Caller} = PacketTuple, undefined}, State) ->
send_cast(PacketTuple, State, undefined, true);
send_cast(PacketTuple, State, undefined);

%% Handle any TCP packet coming in
handle_info({Driver,Socket,Data}, #state{socket=Socket, driver=Driver, driver_mod=DriverMod} = State) ->
Expand Down Expand Up @@ -420,7 +416,7 @@ terminate(_Reason, #state{keepalive=KeepAlive}) ->
%%% ===================================================
%%% Private functions
%%% ===================================================
send_cast(PacketTuple, #state{socket=Socket, driver=Driver, driver_mod=DriverMod} = State, SendTimeout, Activate) ->
send_cast(PacketTuple, #state{socket=Socket, driver=Driver, driver_mod=DriverMod} = State, SendTimeout) ->
?tp_ignore_side_effects_in_prod(
gen_rpc_send_packet, #{ packet => PacketTuple
, timeout => SendTimeout
Expand All @@ -439,10 +435,7 @@ send_cast(PacketTuple, #state{socket=Socket, driver=Driver, driver_mod=DriverMod
}),
{stop, Reason, State};
ok ->
ok = case Activate of
true -> DriverMod:activate_socket(Socket);
_ -> ok
end,
ok = DriverMod:activate_socket(Socket),
?log(debug, "message=cast event=transmission_succeeded driver=~s socket=\"~s\"",
[Driver, gen_rpc_helper:socket_to_string(Socket)]),
{noreply, State, gen_rpc_helper:get_inactivity_timeout(?MODULE)}
Expand Down Expand Up @@ -578,12 +571,21 @@ drain_cast(N, CastReqs) ->
receive
{?CAST(_M,_F,_A) = Req, _} ->
drain_cast(N-1, [Req | CastReqs]);
{?ABCAST(_N, _M) = Req, _} ->
drain_cast(N-1, [Req | CastReqs]);
{?ORDERED_CAST(_M, _F, _A) = Req, _} ->
drain_cast(N-1, [Req | CastReqs])
after 0 ->
lists:reverse(CastReqs)
end.

do_abcast({Node, _Tag}, Name, Msg) when Node =:= node() ->
Msg = erlang:send({Name, Node}, Msg);
do_abcast(Node, Name, Msg) when Node =:= node() ->
Msg = erlang:send({Name, Node}, Msg);
do_abcast(Node, Name, Msg) ->
erlang:spawn(?MODULE, cast_worker, [Node, {abcast,Name,Msg}, abcast, undefined]).

-spec maybe_start_client(node_or_tuple()) -> {ok, pid()} | {error, any()}.
maybe_start_client(NodeOrTuple) ->
%% Create a unique name for the client because we register as such
Expand Down
2 changes: 1 addition & 1 deletion src/gen_rpc_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
%%% ===================================================

%% term_to_iovec/1 wrapper to conditionally compress based on threshold
-spec term_to_iovec(term()) -> ext_iovec.
-spec term_to_iovec(term()) -> erlang:iovec().
term_to_iovec(Term) ->
{ok, Compress} = application:get_env(?APP, compress),
do_term_to_iovec(Term, Compress).
Expand Down
28 changes: 28 additions & 0 deletions test/local_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,34 @@ call_with_receive_timeout(_Config) ->
call_with_worker_kill(_Config) ->
{badrpc, killed} = gen_rpc:call(?MASTER, timer, kill_after, [0]).

abcast(_Config) ->
true = erlang:register(test_process_123, self()),
abcast = gen_rpc:abcast([node()], test_process_123, this_is_a_test),
receive
this_is_a_test -> ok;
_ -> erlang:error(invalid_message)
after
2000 -> erlang:error(timeout)
end.

abcast_with_key(_Config) ->
true = erlang:register(test_process_123, self()),
abcast = gen_rpc:abcast([{node(), 123}], test_process_123, this_is_a_test),
receive
this_is_a_test -> ok;
_ -> erlang:error(invalid_message)
after
2000 -> erlang:error(timeout)
end.

abcast_with_unregistered_name(_Config) ->
abcast = gen_rpc:abcast([node()], test_process_123, this_is_a_test),
receive
_ -> erlang:error(invalid_message)
after
500 -> ok
end.

%% call_module_version_check_success(_Config) ->
%% stub_function = gen_rpc:call(?MASTER, {gen_rpc_test_helper, "1.0.0"}, stub_function, []).

Expand Down
8 changes: 8 additions & 0 deletions test/multi_rpc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ abcast_with_bad_server(_Config) ->
end,
true = erlang:unregister(test_process_123).

abcast_with_unregistered_name(_Config) ->
abcast = rpc:call(?SLAVE, gen_rpc, abcast, [[?MASTER, ?FAKE_NODE], test_process_123, this_is_a_test]),
receive
_ -> erlang:error(invalid_message)
after
2000 -> ok
end.

sbcast(_Config) ->
true = erlang:register(test_process_123, self()),
{[?MASTER], []} = rpc:call(?SLAVE, gen_rpc, sbcast, [[?MASTER], test_process_123, this_is_a_test]),
Expand Down
8 changes: 8 additions & 0 deletions test/multi_rpc_with_key_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ abcast_with_bad_server(_Config) ->
end,
true = erlang:unregister(test_process_123).

abcast_with_unregistered_name(_Config) ->
abcast = rpc:call(?SLAVE, gen_rpc, abcast, [[{?MASTER,random_key}, ?FAKE_NODE], test_process_123, this_is_a_test]),
receive
_ -> erlang:error(invalid_message)
after
2000 -> ok
end.

sbcast(_Config) ->
true = erlang:register(test_process_123, self()),
{[{?MASTER,random_key}], []} = rpc:call(?SLAVE, gen_rpc, sbcast, [[{?MASTER,random_key}], test_process_123, this_is_a_test]),
Expand Down