Skip to content

Commit 5aea098

Browse files
authored
fix: include abcast as part of the cast batch (#2)
Also handle local abcast
1 parent d161cf2 commit 5aea098

File tree

7 files changed

+84
-28
lines changed

7 files changed

+84
-28
lines changed

include/types.hrl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,11 @@
1313
%% atom names saves a little bit of bandwidth.
1414
-define(CAST(M, F, A), {cast, M, F, A}).
1515
-define(ORDERED_CAST(M, F, A), {oc, M, F, A}).
16+
-define(ABCAST(N, M), {abcast, N, M}).
1617

17-
-define(IS_CAST(MSG), ((MSG) =:= cast orelse (MSG) =:= oc)).
18+
-define(IS_CAST_MSG(MSG),
19+
is_tuple(MSG) andalso
20+
(element(1, MSG) =:= abcast orelse
21+
element(1, MSG) =:= cast orelse
22+
element(1, MSG) =:= oc)
23+
).

src/gen_rpc_acceptor.erl

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -136,24 +136,17 @@ waiting_for_data(info, {Driver,Socket,Data},
136136
?ORDERED_CAST(M, F, A) ->
137137
handle_cast(M, F, A, true, State),
138138
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
139+
?ABCAST(N, M) ->
140+
handle_abcast(N, M, State),
141+
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
139142
BatchCast when is_list(BatchCast) ->
140143
lists:foreach(fun(?CAST(M, F, A)) -> handle_cast(M, F, A, false, State);
141144
(?ORDERED_CAST(M, F, A)) -> handle_cast(M, F, A, true, State);
145+
(?ABCAST(N, M)) -> handle_abcast(N, M, State);
142146
(Invalid) -> ?tp(error, gen_rpc_invalid_batch, #{socket => gen_rpc_helper:socket_to_string(Socket), data => Invalid})
143147
end,
144148
BatchCast),
145149
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
146-
{abcast, Name, Msg} ->
147-
_Result = case check_if_module_allowed(erlang, Control, List) of
148-
true ->
149-
?log(debug, "event=abcast_received driver=~s socket=\"~s\" peer=\"~s\" process=~s message=\"~p\"",
150-
[Driver, gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Name, Msg]),
151-
Msg = erlang:send(Name, Msg);
152-
false ->
153-
?log(debug, "event=request_not_allowed driver=~s socket=\"~s\" control=~s method=~s",
154-
[Driver, gen_rpc_helper:socket_to_string(Socket), Control, abcast])
155-
end,
156-
{keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)};
157150
{sbcast, Name, Msg, Caller} ->
158151
Reply = case check_if_module_allowed(erlang, Control, List) of
159152
true ->
@@ -350,6 +343,17 @@ handle_cast(M, F, A, Ordered, #state{socket=Socket, driver=Driver, peer=Peer, co
350343
?log(debug, "event=request_not_allowed driver=~s socket=\"~s\" control=~s method=cast module=~s",[Driver, gen_rpc_helper:socket_to_string(Socket), Control, RealM])
351344
end.
352345

346+
handle_abcast(Name, Msg, #state{socket=Socket, driver=Driver, peer=Peer, control=Control, list=List}) ->
347+
case check_if_module_allowed(erlang, Control, List) of
348+
true ->
349+
?log(debug, "event=abcast_received driver=~s socket=\"~s\" peer=\"~s\" process=~s message=\"~p\"",
350+
[Driver, gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Name, Msg]),
351+
Msg = erlang:send({Name, node()}, Msg);
352+
false ->
353+
?log(debug, "event=request_not_allowed driver=~s socket=\"~s\" control=~s method=~s",
354+
[Driver, gen_rpc_helper:socket_to_string(Socket), Control, abcast])
355+
end.
356+
353357
exec_cast(M, F, A, _PreserveOrder = true) ->
354358
{Pid, MRef} = erlang:spawn_monitor(M, F, A),
355359
receive

src/gen_rpc_client.erl

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ abcast(Name, Msg) when is_atom(Name) ->
210210

211211
-spec abcast(list(), atom(), term()) -> abcast.
212212
abcast(Nodes, Name, Msg) when is_list(Nodes), is_atom(Name) ->
213-
_ = [erlang:spawn(?MODULE, cast_worker, [Node, {abcast,Name,Msg}, abcast, undefined]) || Node <- Nodes],
213+
_ = [do_abcast(Node, Name, Msg) || Node <- Nodes],
214214
abcast.
215215

216216
-spec sbcast(atom(), term()) -> {list(), list()}.
@@ -330,18 +330,14 @@ handle_cast(Msg, #state{socket=Socket, driver=Driver} = State) ->
330330
{stop, {unknown_cast, Msg}, State}.
331331

332332
%% This is the actual CAST handler for CAST
333-
handle_info({{Cast, _M, _F, _A} = PacketTuple, SendTimeout}, State = #state{max_batch_size = 0}) when ?IS_CAST(Cast) ->
334-
send_cast(PacketTuple, State, SendTimeout, true);
335-
handle_info({{Cast, _M, _F, _A} = PacketTuple, SendTimeout}, State = #state{max_batch_size = MaxBatchSize}) when ?IS_CAST(Cast) ->
336-
send_cast(drain_cast(MaxBatchSize, [PacketTuple]), State, SendTimeout, true);
337-
338-
%% This is the actual CAST handler for ABCAST
339-
handle_info({{abcast,_Name,_Msg} = PacketTuple, undefined}, State) ->
340-
send_cast(PacketTuple, State, undefined, false);
333+
handle_info({PacketTuple, SendTimeout}, State = #state{max_batch_size = 0}) when ?IS_CAST_MSG(PacketTuple) ->
334+
send_cast(PacketTuple, State, SendTimeout);
335+
handle_info({PacketTuple, SendTimeout}, State = #state{max_batch_size = MaxBatchSize}) when ?IS_CAST_MSG(PacketTuple) ->
336+
send_cast(drain_cast(MaxBatchSize, [PacketTuple]), State, SendTimeout);
341337

342338
%% This is the actual CAST handler for SBCAST
343339
handle_info({{sbcast,_Name,_Msg,_Caller} = PacketTuple, undefined}, State) ->
344-
send_cast(PacketTuple, State, undefined, true);
340+
send_cast(PacketTuple, State, undefined);
345341

346342
%% Handle any TCP packet coming in
347343
handle_info({Driver,Socket,Data}, #state{socket=Socket, driver=Driver, driver_mod=DriverMod} = State) ->
@@ -420,7 +416,7 @@ terminate(_Reason, #state{keepalive=KeepAlive}) ->
420416
%%% ===================================================
421417
%%% Private functions
422418
%%% ===================================================
423-
send_cast(PacketTuple, #state{socket=Socket, driver=Driver, driver_mod=DriverMod} = State, SendTimeout, Activate) ->
419+
send_cast(PacketTuple, #state{socket=Socket, driver=Driver, driver_mod=DriverMod} = State, SendTimeout) ->
424420
?tp_ignore_side_effects_in_prod(
425421
gen_rpc_send_packet, #{ packet => PacketTuple
426422
, timeout => SendTimeout
@@ -439,10 +435,7 @@ send_cast(PacketTuple, #state{socket=Socket, driver=Driver, driver_mod=DriverMod
439435
}),
440436
{stop, Reason, State};
441437
ok ->
442-
ok = case Activate of
443-
true -> DriverMod:activate_socket(Socket);
444-
_ -> ok
445-
end,
438+
ok = DriverMod:activate_socket(Socket),
446439
?log(debug, "message=cast event=transmission_succeeded driver=~s socket=\"~s\"",
447440
[Driver, gen_rpc_helper:socket_to_string(Socket)]),
448441
{noreply, State, gen_rpc_helper:get_inactivity_timeout(?MODULE)}
@@ -578,12 +571,21 @@ drain_cast(N, CastReqs) ->
578571
receive
579572
{?CAST(_M,_F,_A) = Req, _} ->
580573
drain_cast(N-1, [Req | CastReqs]);
574+
{?ABCAST(_N, _M) = Req, _} ->
575+
drain_cast(N-1, [Req | CastReqs]);
581576
{?ORDERED_CAST(_M, _F, _A) = Req, _} ->
582577
drain_cast(N-1, [Req | CastReqs])
583578
after 0 ->
584579
lists:reverse(CastReqs)
585580
end.
586581

582+
do_abcast({Node, _Tag}, Name, Msg) when Node =:= node() ->
583+
Msg = erlang:send({Name, Node}, Msg);
584+
do_abcast(Node, Name, Msg) when Node =:= node() ->
585+
Msg = erlang:send({Name, Node}, Msg);
586+
do_abcast(Node, Name, Msg) ->
587+
erlang:spawn(?MODULE, cast_worker, [Node, {abcast,Name,Msg}, abcast, undefined]).
588+
587589
-spec maybe_start_client(node_or_tuple()) -> {ok, pid()} | {error, any()}.
588590
maybe_start_client(NodeOrTuple) ->
589591
%% Create a unique name for the client because we register as such

src/gen_rpc_helper.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
%%% ===================================================
5252

5353
%% term_to_iovec/1 wrapper to conditionally compress based on threshold
54-
-spec term_to_iovec(term()) -> ext_iovec.
54+
-spec term_to_iovec(term()) -> erlang:iovec().
5555
term_to_iovec(Term) ->
5656
{ok, Compress} = application:get_env(?APP, compress),
5757
do_term_to_iovec(Term, Compress).

test/local_SUITE.erl

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,34 @@ call_with_receive_timeout(_Config) ->
9191
call_with_worker_kill(_Config) ->
9292
{badrpc, killed} = gen_rpc:call(?MASTER, timer, kill_after, [0]).
9393

94+
abcast(_Config) ->
95+
true = erlang:register(test_process_123, self()),
96+
abcast = gen_rpc:abcast([node()], test_process_123, this_is_a_test),
97+
receive
98+
this_is_a_test -> ok;
99+
_ -> erlang:error(invalid_message)
100+
after
101+
2000 -> erlang:error(timeout)
102+
end.
103+
104+
abcast_with_key(_Config) ->
105+
true = erlang:register(test_process_123, self()),
106+
abcast = gen_rpc:abcast([{node(), 123}], test_process_123, this_is_a_test),
107+
receive
108+
this_is_a_test -> ok;
109+
_ -> erlang:error(invalid_message)
110+
after
111+
2000 -> erlang:error(timeout)
112+
end.
113+
114+
abcast_with_unregistered_name(_Config) ->
115+
abcast = gen_rpc:abcast([node()], test_process_123, this_is_a_test),
116+
receive
117+
_ -> erlang:error(invalid_message)
118+
after
119+
500 -> ok
120+
end.
121+
94122
%% call_module_version_check_success(_Config) ->
95123
%% stub_function = gen_rpc:call(?MASTER, {gen_rpc_test_helper, "1.0.0"}, stub_function, []).
96124

test/multi_rpc_SUITE.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,14 @@ abcast_with_bad_server(_Config) ->
197197
end,
198198
true = erlang:unregister(test_process_123).
199199

200+
abcast_with_unregistered_name(_Config) ->
201+
abcast = rpc:call(?SLAVE, gen_rpc, abcast, [[?MASTER, ?FAKE_NODE], test_process_123, this_is_a_test]),
202+
receive
203+
_ -> erlang:error(invalid_message)
204+
after
205+
2000 -> ok
206+
end.
207+
200208
sbcast(_Config) ->
201209
true = erlang:register(test_process_123, self()),
202210
{[?MASTER], []} = rpc:call(?SLAVE, gen_rpc, sbcast, [[?MASTER], test_process_123, this_is_a_test]),

test/multi_rpc_with_key_SUITE.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,14 @@ abcast_with_bad_server(_Config) ->
114114
end,
115115
true = erlang:unregister(test_process_123).
116116

117+
abcast_with_unregistered_name(_Config) ->
118+
abcast = rpc:call(?SLAVE, gen_rpc, abcast, [[{?MASTER,random_key}, ?FAKE_NODE], test_process_123, this_is_a_test]),
119+
receive
120+
_ -> erlang:error(invalid_message)
121+
after
122+
2000 -> ok
123+
end.
124+
117125
sbcast(_Config) ->
118126
true = erlang:register(test_process_123, self()),
119127
{[{?MASTER,random_key}], []} = rpc:call(?SLAVE, gen_rpc, sbcast, [[{?MASTER,random_key}], test_process_123, this_is_a_test]),

0 commit comments

Comments
 (0)