Skip to content

Commit 2bfb092

Browse files
committed
Remove only stream subscriptions affected by down stream member
The clean-up of a stream connection state when a stream member goes down can remove subscriptions not affected by the member. The subscription state is removed from the connection, but the subscription is not removed from the SAC state (if the subscription is a SAC), because the subscription member PID does not match the down member PID. When the actual member of the subscription goes down, the subscription is no longer part of the state, so the clean-up does not find the subscription and does not remove it from the SAC state. This lets a ghost consumer in the corresponding SAC group. This commit makes sure only the affected subscriptions are removed from the state when a stream member goes down. Fixes #13961 Conflicts: deps/rabbitmq_stream/src/rabbit_stream_reader.erl
1 parent 910a5e6 commit 2bfb092

File tree

2 files changed

+139
-77
lines changed

2 files changed

+139
-77
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 101 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@
106106
close_sent/3]).
107107
-ifdef(TEST).
108108
-export([ensure_token_expiry_timer/2,
109-
evaluate_state_after_secret_update/4]).
109+
evaluate_state_after_secret_update/4,
110+
clean_subscriptions/4]).
110111
-endif.
111112

112113
callback_mode() ->
@@ -3280,89 +3281,19 @@ clean_state_after_super_stream_deletion(Partitions, Connection, State, Transport
32803281

32813282
clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
32823283
#stream_connection{
3283-
user = #user{username = Username},
3284-
virtual_host = VirtualHost,
3285-
stream_subscriptions = StreamSubscriptions,
3286-
publishers = Publishers,
3287-
publisher_to_ids = PublisherToIds,
3288-
stream_leaders = Leaders,
3289-
outstanding_requests = Requests0} = C0,
3290-
#stream_connection_state{consumers = Consumers} = S0) ->
3284+
stream_leaders = Leaders} = C0,
3285+
S0) ->
32913286
{SubscriptionsCleaned, C1, S1} =
32923287
case stream_has_subscriptions(Stream, C0) of
32933288
true ->
3294-
#{Stream := SubscriptionIds} = StreamSubscriptions,
3295-
Requests1 = lists:foldl(
3296-
fun(SubId, Rqsts0) ->
3297-
#{SubId := Consumer} = Consumers,
3298-
case {MemberPid, Consumer} of
3299-
{undefined, _C} ->
3300-
rabbit_stream_metrics:consumer_cancelled(self(),
3301-
stream_r(Stream,
3302-
C0),
3303-
SubId,
3304-
Username),
3305-
maybe_unregister_consumer(
3306-
VirtualHost, Consumer,
3307-
single_active_consumer(Consumer),
3308-
Rqsts0);
3309-
{MemberPid, #consumer{configuration =
3310-
#consumer_configuration{member_pid = MemberPid}}} ->
3311-
rabbit_stream_metrics:consumer_cancelled(self(),
3312-
stream_r(Stream,
3313-
C0),
3314-
SubId,
3315-
Username),
3316-
maybe_unregister_consumer(
3317-
VirtualHost, Consumer,
3318-
single_active_consumer(Consumer),
3319-
Rqsts0);
3320-
_ ->
3321-
Rqsts0
3322-
end
3323-
end, Requests0, SubscriptionIds),
3324-
{true,
3325-
C0#stream_connection{stream_subscriptions =
3326-
maps:remove(Stream,
3327-
StreamSubscriptions),
3328-
outstanding_requests = Requests1},
3329-
S0#stream_connection_state{consumers =
3330-
maps:without(SubscriptionIds,
3331-
Consumers)}};
3289+
clean_subscriptions(MemberPid, Stream, C0, S0);
33323290
false ->
33333291
{false, C0, S0}
33343292
end,
33353293
{PublishersCleaned, C2, S2} =
33363294
case stream_has_publishers(Stream, C1) of
33373295
true ->
3338-
{PurgedPubs, PurgedPubToIds} =
3339-
maps:fold(fun(PubId,
3340-
#publisher{stream = S, reference = Ref},
3341-
{Pubs, PubToIds}) when S =:= Stream andalso MemberPid =:= undefined ->
3342-
rabbit_stream_metrics:publisher_deleted(self(),
3343-
stream_r(Stream,
3344-
C1),
3345-
PubId),
3346-
{maps:remove(PubId, Pubs),
3347-
maps:remove({Stream, Ref}, PubToIds)};
3348-
(PubId,
3349-
#publisher{stream = S, reference = Ref, leader = MPid},
3350-
{Pubs, PubToIds}) when S =:= Stream andalso MPid =:= MemberPid ->
3351-
rabbit_stream_metrics:publisher_deleted(self(),
3352-
stream_r(Stream,
3353-
C1),
3354-
PubId),
3355-
{maps:remove(PubId, Pubs),
3356-
maps:remove({Stream, Ref}, PubToIds)};
3357-
3358-
(_PubId, _Publisher, {Pubs, PubToIds}) ->
3359-
{Pubs, PubToIds}
3360-
end,
3361-
{Publishers, PublisherToIds}, Publishers),
3362-
{true,
3363-
C1#stream_connection{publishers = PurgedPubs,
3364-
publisher_to_ids = PurgedPubToIds},
3365-
S1};
3296+
clean_publishers(MemberPid, Stream, C1, S1);
33663297
false ->
33673298
{false, C1, S1}
33683299
end,
@@ -3384,6 +3315,99 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
33843315
{not_cleaned, C2#stream_connection{stream_leaders = Leaders1}, S2}
33853316
end.
33863317

3318+
clean_subscriptions(MemberPid, Stream,
3319+
#stream_connection{user = #user{username = Username},
3320+
virtual_host = VirtualHost,
3321+
stream_subscriptions = StreamSubs,
3322+
outstanding_requests = Requests0} = C0,
3323+
#stream_connection_state{consumers = Consumers} = S0) ->
3324+
#{Stream := SubIds} = StreamSubs,
3325+
{DelSubs1, Requests1} =
3326+
lists:foldl(
3327+
fun(SubId, {DelSubIds, Rqsts0}) ->
3328+
#{SubId := Consumer} = Consumers,
3329+
case {MemberPid, Consumer} of
3330+
{undefined, _C} ->
3331+
rabbit_stream_metrics:consumer_cancelled(self(),
3332+
stream_r(Stream,
3333+
C0),
3334+
SubId,
3335+
Username),
3336+
Rqsts1 = maybe_unregister_consumer(
3337+
VirtualHost, Consumer,
3338+
single_active_consumer(Consumer),
3339+
Rqsts0),
3340+
{[SubId | DelSubIds], Rqsts1};
3341+
{MemberPid,
3342+
#consumer{configuration =
3343+
#consumer_configuration{member_pid = MemberPid}}} ->
3344+
rabbit_stream_metrics:consumer_cancelled(self(),
3345+
stream_r(Stream,
3346+
C0),
3347+
SubId,
3348+
Username,
3349+
false),
3350+
Rqsts1 = maybe_unregister_consumer(
3351+
VirtualHost, Consumer,
3352+
single_active_consumer(Consumer),
3353+
Rqsts0),
3354+
{[SubId | DelSubIds], Rqsts1};
3355+
_ ->
3356+
{DelSubIds, Rqsts0}
3357+
end
3358+
end, {[], Requests0}, SubIds),
3359+
case DelSubs1 of
3360+
[] ->
3361+
{false, C0, S0};
3362+
_ ->
3363+
StreamSubs1 = case SubIds -- DelSubs1 of
3364+
[] ->
3365+
maps:remove(Stream, StreamSubs);
3366+
RemSubIds ->
3367+
StreamSubs#{Stream => RemSubIds}
3368+
end,
3369+
Consumers1 = maps:without(DelSubs1, Consumers),
3370+
{true,
3371+
C0#stream_connection{stream_subscriptions = StreamSubs1,
3372+
outstanding_requests = Requests1},
3373+
S0#stream_connection_state{consumers = Consumers1}}
3374+
end.
3375+
3376+
clean_publishers(MemberPid, Stream,
3377+
#stream_connection{
3378+
publishers = Publishers,
3379+
publisher_to_ids = PublisherToIds} = C0, S0) ->
3380+
{Updated, PurgedPubs, PurgedPubToIds} =
3381+
maps:fold(fun(PubId, #publisher{stream = S, reference = Ref},
3382+
{_, Pubs, PubToIds})
3383+
when S =:= Stream andalso MemberPid =:= undefined ->
3384+
rabbit_stream_metrics:publisher_deleted(self(),
3385+
stream_r(Stream,
3386+
C0),
3387+
PubId),
3388+
{true,
3389+
maps:remove(PubId, Pubs),
3390+
maps:remove({Stream, Ref}, PubToIds)};
3391+
(PubId, #publisher{stream = S, reference = Ref, leader = MPid},
3392+
{_, Pubs, PubToIds})
3393+
when S =:= Stream andalso MPid =:= MemberPid ->
3394+
rabbit_stream_metrics:publisher_deleted(self(),
3395+
stream_r(Stream,
3396+
C0),
3397+
PubId),
3398+
{true,
3399+
maps:remove(PubId, Pubs),
3400+
maps:remove({Stream, Ref}, PubToIds)};
3401+
3402+
(_PubId, _Publisher, {Updated, Pubs, PubToIds}) ->
3403+
{Updated, Pubs, PubToIds}
3404+
end,
3405+
{false, Publishers, PublisherToIds}, Publishers),
3406+
{Updated,
3407+
C0#stream_connection{publishers = PurgedPubs,
3408+
publisher_to_ids = PurgedPubToIds},
3409+
S0}.
3410+
33873411
store_offset(Reference, _, _, C) when ?IS_INVALID_REF(Reference) ->
33883412
rabbit_log:warning("Reference is too long to store offset: ~p", [byte_size(Reference)]),
33893413
C;
@@ -3401,8 +3425,7 @@ store_offset(Reference, Stream, Offset, Connection0) ->
34013425

34023426
lookup_leader(Stream,
34033427
#stream_connection{stream_leaders = StreamLeaders,
3404-
virtual_host = VirtualHost} =
3405-
Connection) ->
3428+
virtual_host = VirtualHost} = Connection) ->
34063429
case maps:get(Stream, StreamLeaders, undefined) of
34073430
undefined ->
34083431
case lookup_leader_from_manager(VirtualHost, Stream) of
@@ -3411,6 +3434,7 @@ lookup_leader(Stream,
34113434
{ok, LeaderPid} ->
34123435
Connection1 =
34133436
maybe_monitor_stream(LeaderPid, Stream, Connection),
3437+
34143438
{LeaderPid,
34153439
Connection1#stream_connection{stream_leaders =
34163440
StreamLeaders#{Stream =>

deps/rabbitmq_stream/test/rabbit_stream_reader_SUITE.erl

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,44 @@ evaluate_state_after_secret_update_test(_) ->
184184
?assert(is_integer(Cancel2)),
185185
ok.
186186

187+
clean_subscriptions_should_remove_only_affected_subscriptions_test(_) ->
188+
Mod = rabbit_stream_reader,
189+
meck:new(Mod, [passthrough]),
190+
meck:new(rabbit_stream_metrics, [stub_all]),
191+
meck:new(rabbit_stream_sac_coordinator, [stub_all]),
192+
193+
S = <<"s1">>,
194+
Pid1 = new_process(),
195+
Pid2 = new_process(),
196+
StreamSubs = #{S => [0, 1]},
197+
Consumers = #{0 => consumer(S, Pid1),
198+
1 => consumer(S, Pid2)},
199+
200+
C0 = #stream_connection{stream_subscriptions = StreamSubs,
201+
user = #user{}},
202+
S0 = #stream_connection_state{consumers = Consumers},
203+
{Cleaned1, C1, S1} = Mod:clean_subscriptions(Pid1, S, C0, S0),
204+
?assert(Cleaned1),
205+
?assertEqual(#{S => [1]},
206+
C1#stream_connection.stream_subscriptions),
207+
?assertEqual(#{1 => consumer(S, Pid2)},
208+
S1#stream_connection_state.consumers),
209+
210+
{Cleaned2, C2, S2} = Mod:clean_subscriptions(Pid2, S, C1, S1),
211+
?assert(Cleaned2),
212+
?assertEqual(#{}, C2#stream_connection.stream_subscriptions),
213+
?assertEqual(#{}, S2#stream_connection_state.consumers),
214+
215+
ok.
216+
217+
consumer(S, Pid) ->
218+
#consumer{configuration = #consumer_configuration{stream = S,
219+
member_pid = Pid}}.
220+
187221
consumer(S) ->
188222
#consumer{configuration = #consumer_configuration{stream = S},
189223
log = osiris_log:init(#{})}.
224+
225+
new_process() ->
226+
spawn(node(), fun() -> ok end).
227+

0 commit comments

Comments
 (0)