Skip to content

Commit 42f9878

Browse files
committed
Re-issue node monitors and reset timers on leader change
1 parent a8a3342 commit 42f9878

File tree

3 files changed

+111
-12
lines changed

3 files changed

+111
-12
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -780,13 +780,15 @@ state_enter(recover, _) ->
780780
put('$rabbit_vm_category', ?MODULE),
781781
[];
782782
state_enter(leader, #?MODULE{streams = Streams,
783-
monitors = Monitors}) ->
783+
monitors = Monitors,
784+
single_active_consumer = SacState}) ->
784785
Pids = maps:keys(Monitors),
785786
%% monitor all the known nodes
786787
Nodes = all_member_nodes(Streams),
787788
NodeMons = [{monitor, node, N} || N <- Nodes],
788-
NodeMons ++ [{aux, fail_active_actions} |
789-
[{monitor, process, P} || P <- Pids]];
789+
SacEffects = ?SAC_CURRENT:state_enter(leader, SacState),
790+
SacEffects ++ NodeMons ++ [{aux, fail_active_actions} |
791+
[{monitor, process, P} || P <- Pids]];
790792
state_enter(_S, _) ->
791793
[].
792794

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@
4949
overview/1,
5050
import_state/2,
5151
check_conf_change/1,
52-
list_nodes/1]).
52+
list_nodes/1,
53+
state_enter/2
54+
]).
5355
-export([make_purge_nodes/1,
5456
make_update_conf/1]).
5557

@@ -629,17 +631,20 @@ ensure_monitors(#command_register_consumer{vhost = VirtualHost,
629631
Monitors0,
630632
Effects) ->
631633
GroupId = {VirtualHost, Stream, ConsumerName},
634+
%% get the group IDs that depend on the PID
632635
Groups0 = maps:get(Pid, PidsGroups0, #{}),
633-
PidsGroups1 =
634-
maps:put(Pid, maps:put(GroupId, true, Groups0), PidsGroups0),
636+
%% add the group ID
637+
Groups1 = Groups0#{GroupId => true},
638+
%% update the PID-to-group map
639+
PidsGroups1 = PidsGroups0#{Pid => Groups1},
635640
{State0#?MODULE{pids_groups = PidsGroups1}, Monitors0#{Pid => sac},
636641
[{monitor, process, Pid}, {monitor, node, node(Pid)} | Effects]};
637642
ensure_monitors(#command_unregister_consumer{vhost = VirtualHost,
638643
stream = Stream,
639644
consumer_name = ConsumerName,
640645
connection_pid = Pid},
641-
#?MODULE{groups = StreamGroups0, pids_groups = PidsGroups0} =
642-
State0,
646+
#?MODULE{groups = StreamGroups0,
647+
pids_groups = PidsGroups0} = State0,
643648
Monitors,
644649
Effects)
645650
when is_map_key(Pid, PidsGroups0) ->
@@ -671,8 +676,7 @@ ensure_monitors(#command_unregister_consumer{vhost = VirtualHost,
671676
maps:remove(Pid, Monitors), [{demonitor, process, Pid} | Effects]};
672677
false ->
673678
%% one or more groups still depend on the PID
674-
{State0#?MODULE{pids_groups =
675-
maps:put(Pid, PidGroup1, PidsGroups0)},
679+
{State0#?MODULE{pids_groups = PidsGroups0#{Pid => PidGroup1}},
676680
Monitors, Effects}
677681
end;
678682
ensure_monitors(#command_connection_reconnected{pid = Pid},
@@ -735,8 +739,7 @@ handle_connection_node_disconnected(ConnPid,
735739
ConnPid, Acc, G)
736740
end, State1, Groups),
737741
T = disconnected_timeout(State2),
738-
{State2, [{timer, {sac, node_disconnected,
739-
#{connection_pid => ConnPid}}, T}]}
742+
{State2, [node_disconnected_timer_effect(ConnPid, T)]}
740743
end.
741744

742745
-spec handle_node_reconnected(node(), state(), ra_machine:effects()) ->
@@ -885,6 +888,44 @@ list_nodes(#?MODULE{groups = Groups}) ->
885888
end, #{}, Groups),
886889
maps:keys(Nodes).
887890

891+
-spec state_enter(ra_server:ra_state(), state() | term()) ->
892+
ra_machine:effects().
893+
state_enter(leader, #?MODULE{groups = Groups} = State)
894+
when is_record(State, ?MODULE)->
895+
%% iterate over groups
896+
{Nodes, DisConns} =
897+
maps:fold(fun(_, #group{consumers = Cs}, Acc) ->
898+
%% iterage over group consumers
899+
lists:foldl(fun(#consumer{pid = P,
900+
status = {?DISCONNECTED, _},
901+
ts = Ts},
902+
{Nodes, DisConns}) ->
903+
%% disconnected consumer,
904+
%% store connection PID and node
905+
{Nodes#{node(P) => true},
906+
DisConns#{P => Ts}};
907+
(#consumer{pid = P}, {Nodes, DisConns}) ->
908+
%% store connection node
909+
{Nodes#{node(P) => true}, DisConns}
910+
end, Acc, Cs)
911+
end, {#{}, #{}}, Groups),
912+
DisTimeout = disconnected_timeout(State),
913+
%% monitor involved nodes
914+
%% reset a timer for disconnected connections
915+
[{monitor, node, N} || N <- maps:keys(Nodes)] ++
916+
[begin
917+
Time = case ts() - Ts of
918+
T when T < 10_000 ->
919+
%% 10 seconds is arbitrary, nothing specific about the value
920+
10_000;
921+
T when T > DisTimeout ->
922+
DisTimeout
923+
end,
924+
node_disconnected_timer_effect(P, Time)
925+
end || P := Ts <- DisConns];
926+
state_enter(_, _) ->
927+
[].
928+
888929
nodes_from_group(#group{consumers = Cs}) when is_list(Cs) ->
889930
lists:foldl(fun(#consumer{pid = Pid}, Acc) ->
890931
Acc#{node(Pid) => true}
@@ -1295,6 +1336,10 @@ csr(Pid, Id, Owner, Status) ->
12951336
csr_status(C, Status) ->
12961337
C#consumer{status = Status, ts = ts()}.
12971338

1339+
node_disconnected_timer_effect(Pid, T) ->
1340+
{timer, {sac, node_disconnected,
1341+
#{connection_pid => Pid}}, T}.
1342+
12981343
ts() ->
12991344
erlang:system_time(millisecond).
13001345

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1534,6 +1534,58 @@ list_nodes_test(_) ->
15341534
stop_node(N2Pid),
15351535
ok.
15361536

1537+
state_enter_test(_) ->
1538+
N0 = node(),
1539+
{N1Pid, N1} = start_node(list_to_atom(atom_to_list(?FUNCTION_NAME) ++ "1")),
1540+
{N2Pid, N2} = start_node(list_to_atom(atom_to_list(?FUNCTION_NAME) ++ "2")),
1541+
1542+
P0 = new_process(N0),
1543+
P1 = new_process(N1),
1544+
P2 = new_process(N2),
1545+
1546+
Id0 = group_id(<<"sO">>),
1547+
Id1 = group_id(<<"s1">>),
1548+
Id2 = group_id(<<"s2">>),
1549+
1550+
assertEmpty(?MOD:state_enter(follower, #{})),
1551+
1552+
?assertEqual(mon_node_eff([N0, N1, N2]),
1553+
state_enter_leader(#{Id0 => grp([csr(P0), csr(P0), csr(P0)]),
1554+
Id1 => grp([csr(P1), csr(P1), csr(P1)]),
1555+
Id2 => grp([csr(P2), csr(P2), csr(P2)])})),
1556+
1557+
?assertEqual(mon_node_eff([N0, N1]),
1558+
state_enter_leader(#{Id0 => grp([csr(P0), csr(P0), csr(P0)]),
1559+
Id1 => grp([csr(P1), csr(P1), csr(P1)]),
1560+
Id2 => grp([csr(P0), csr(P1), csr(P1)])})),
1561+
1562+
?assertEqual(lists:sort(mon_node_eff([N0, N1]) ++ [timer_eff(P1)]),
1563+
state_enter_leader(#{Id0 => grp([csr(P0), csr(P1, {disconnected, waiting})]),
1564+
Id2 => grp([csr(P0)])})),
1565+
1566+
?assertEqual(lists:sort(mon_node_eff([N0, N1, N2]) ++ timer_eff([P1, P2])),
1567+
state_enter_leader(#{Id0 => grp([csr(P0), csr(P1, {disconnected, waiting})]),
1568+
Id1 => grp([csr(P0), csr(P2, {disconnected, waiting})]),
1569+
Id2 => grp([csr(P0), csr(P1, {disconnected, waiting})])})),
1570+
1571+
stop_node(N1Pid),
1572+
stop_node(N2Pid),
1573+
ok.
1574+
1575+
mon_node_eff(Nodes) when is_list(Nodes) ->
1576+
lists:sort([mon_node_eff(N) || N <- Nodes]);
1577+
mon_node_eff(N) ->
1578+
{monitor, node, N}.
1579+
1580+
timer_eff(Pids) when is_list(Pids) ->
1581+
lists:sort([timer_eff(Pid) || Pid <- Pids]);
1582+
timer_eff(Pid) ->
1583+
{timer, {sac, node_disconnected,
1584+
#{connection_pid => Pid}}, 10_000}.
1585+
1586+
state_enter_leader(MapState) ->
1587+
lists:sort(?MOD:state_enter(leader, state(MapState))).
1588+
15371589
list_nodes(MapState) ->
15381590
lists:sort(?MOD:list_nodes(state(MapState))).
15391591

0 commit comments

Comments
 (0)