Skip to content

Commit 2fc5ca0

Browse files
committed
Add activate_stream_consumer command
New CLI command to trigger a rebalancing in a SAC group and activate a consumer. This is a last resort solution if all consumers in a group accidently end up in {connected, waiting} state. The command re-uses an existing function, which only picks the consumer that should be active. This means it does not try to "fix" the state (e.g. removing a disconnected consumer because its node is definitely gone from the cluster). Fixes #14055
1 parent 53cff3b commit 2fc5ca0

File tree

4 files changed

+257
-5
lines changed

4 files changed

+257
-5
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ unregister_consumer(VirtualHost,
128128
-spec activate_consumer(binary(), binary(), binary()) ->
129129
ok | {error, sac_error() | term()}.
130130
activate_consumer(VH, Stream, Name) ->
131-
process_command(#command_activate_consumer{vhost =VH,
131+
process_command(#command_activate_consumer{vhost = VH,
132132
stream = Stream,
133133
consumer_name= Name}).
134134

@@ -323,7 +323,13 @@ apply(#command_activate_consumer{vhost = VirtualHost,
323323
end,
324324
StreamGroups1 = update_groups(VirtualHost, Stream, ConsumerName,
325325
G, StreamGroups0),
326-
{State0#?MODULE{groups = StreamGroups1}, ok, Eff};
326+
R = case G of
327+
undefined ->
328+
{error, not_found};
329+
_ ->
330+
ok
331+
end,
332+
{State0#?MODULE{groups = StreamGroups1}, R, Eff};
327333
apply(#command_connection_reconnected{pid = Pid},
328334
#?MODULE{groups = Groups0} = State0) ->
329335
{State1, Eff} =
@@ -1157,9 +1163,8 @@ maybe_create_group(VirtualHost,
11571163
#{{VirtualHost, Stream, ConsumerName} := _} ->
11581164
{ok, StreamGroups};
11591165
SGS ->
1160-
{ok, maps:put({VirtualHost, Stream, ConsumerName},
1161-
#group{consumers = [], partition_index = PartitionIndex},
1162-
SGS)}
1166+
{ok, SGS#{{VirtualHost, Stream, ConsumerName} =>
1167+
#group{consumers = [], partition_index = PartitionIndex}}}
11631168
end.
11641169

11651170
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,82 @@ active_consumer_super_stream_disconn_active_block_rebalancing_test(_) ->
949949
assertEmpty(Eff),
950950
ok.
951951

952+
activate_consumer_simple_unblock_all_waiting_test(_) ->
953+
P = self(),
954+
GId = group_id(),
955+
Group = grp([csr(P, 0, {connected, waiting}),
956+
csr(P, 1, {connected, waiting}),
957+
csr(P, 2, {connected, waiting})]),
958+
959+
Groups0 = #{GId => Group},
960+
State0 = state(Groups0),
961+
Cmd = activate_consumer_command(stream(), name()),
962+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
963+
assertHasGroup(GId, grp([csr(P, 0, {connected, active}),
964+
csr(P, 1, {connected, waiting}),
965+
csr(P, 2, {connected, waiting})]),
966+
Groups1),
967+
assertContainsActivateMessage(P, 0, Eff),
968+
ok.
969+
970+
activate_consumer_simple_unblock_ignore_disconnected_test(_) ->
971+
P = self(),
972+
GId = group_id(),
973+
Group = grp([csr(P, 0, {disconnected, waiting}),
974+
csr(P, 1, {connected, waiting}),
975+
csr(P, 2, {connected, waiting}),
976+
csr(P, 3, {connected, waiting})]),
977+
978+
Groups0 = #{GId => Group},
979+
State0 = state(Groups0),
980+
Cmd = activate_consumer_command(stream(), name()),
981+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
982+
assertHasGroup(GId, grp([csr(P, 0, {disconnected, waiting}),
983+
csr(P, 1, {connected, active}),
984+
csr(P, 2, {connected, waiting}),
985+
csr(P, 3, {connected, waiting})]),
986+
Groups1),
987+
assertContainsActivateMessage(P, 1, Eff),
988+
ok.
989+
990+
activate_consumer_super_stream_unblock_all_waiting_test(_) ->
991+
P = self(),
992+
GId = group_id(),
993+
Group = grp(1, [csr(P, 0, {connected, waiting}),
994+
csr(P, 1, {connected, waiting}),
995+
csr(P, 2, {connected, waiting})]),
996+
997+
Groups0 = #{GId => Group},
998+
State0 = state(Groups0),
999+
Cmd = activate_consumer_command(stream(), name()),
1000+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
1001+
assertHasGroup(GId, grp(1, [csr(P, 0, {connected, waiting}),
1002+
csr(P, 1, {connected, active}),
1003+
csr(P, 2, {connected, waiting})]),
1004+
Groups1),
1005+
assertContainsActivateMessage(P, 1, Eff),
1006+
ok.
1007+
1008+
activate_consumer_super_stream_unblock_ignore_disconnected_test(_) ->
1009+
P = self(),
1010+
GId = group_id(),
1011+
Group = grp(1, [csr(P, 0, {disconnected, waiting}),
1012+
csr(P, 1, {connected, waiting}),
1013+
csr(P, 2, {connected, waiting}),
1014+
csr(P, 3, {connected, waiting})]),
1015+
1016+
Groups0 = #{GId => Group},
1017+
State0 = state(Groups0),
1018+
Cmd = activate_consumer_command(stream(), name()),
1019+
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
1020+
assertHasGroup(GId, grp(1, [csr(P, 0, {disconnected, waiting}),
1021+
csr(P, 1, {connected, waiting}),
1022+
csr(P, 2, {connected, active}),
1023+
csr(P, 3, {connected, waiting})]),
1024+
Groups1),
1025+
assertContainsActivateMessage(P, 2, Eff),
1026+
ok.
1027+
9521028
handle_connection_down_simple_disconn_active_block_rebalancing_test(_) ->
9531029
Pid0 = new_process(),
9541030
Pid1 = new_process(),
@@ -1729,6 +1805,10 @@ assertContainsCheckConnectionEffect(Pid, Effects) ->
17291805
assertContainsSendMessageEffect(Pid, Stream, Active, Effects) ->
17301806
assertContainsSendMessageEffect(Pid, 0, Stream, name(), Active, Effects).
17311807

1808+
assertContainsActivateMessage(Pid, SubId, Effects) ->
1809+
assertContainsSendMessageEffect(Pid, SubId, stream(), name(),
1810+
true, Effects).
1811+
17321812
assertContainsActivateMessage(Pid, Effects) ->
17331813
assertContainsSendMessageEffect(Pid, sub_id(), stream(), name(),
17341814
true, Effects).
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 2.0 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at https://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
15+
16+
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ActivateStreamConsumerCommand').
17+
18+
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
19+
20+
-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
21+
22+
-export([formatter/0,
23+
scopes/0,
24+
switches/0,
25+
aliases/0,
26+
usage/0,
27+
usage_additional/0,
28+
usage_doc_guides/0,
29+
banner/2,
30+
validate/2,
31+
merge_defaults/2,
32+
run/2,
33+
output/2,
34+
description/0,
35+
help_section/0]).
36+
37+
formatter() ->
38+
'Elixir.RabbitMQ.CLI.Formatters.String'.
39+
40+
scopes() ->
41+
[ctl, streams].
42+
43+
switches() ->
44+
[{stream, string}, {reference, string}].
45+
46+
aliases() ->
47+
[].
48+
49+
description() ->
50+
<<"Trigger a rebalancing to activate a consumer in "
51+
"a single active consumer group">>.
52+
53+
help_section() ->
54+
{plugin, stream}.
55+
56+
validate([], #{stream := _, reference := _}) ->
57+
ok;
58+
validate(Args, _) when is_list(Args) andalso length(Args) > 0 ->
59+
{validation_failure, too_many_args};
60+
validate(_, _) ->
61+
{validation_failure, not_enough_args}.
62+
63+
merge_defaults(_Args, Opts) ->
64+
{[], maps:merge(#{vhost => <<"/">>}, Opts)}.
65+
66+
usage() ->
67+
<<"activate_stream_consumer --stream <stream> "
68+
"--reference <reference> [--vhost <vhost>]">>.
69+
70+
usage_additional() ->
71+
<<"debugging command, use only when a group does not have "
72+
"an active consumer">>.
73+
74+
usage_doc_guides() ->
75+
[?STREAMS_GUIDE_URL].
76+
77+
run(_,
78+
#{node := NodeName,
79+
vhost := VHost,
80+
stream := Stream,
81+
reference := Reference,
82+
timeout := Timeout}) ->
83+
rabbit_misc:rpc_call(NodeName,
84+
rabbit_stream_sac_coordinator,
85+
activate_consumer,
86+
[VHost, Stream, Reference],
87+
Timeout).
88+
89+
banner(_, _) ->
90+
<<"Activating a consumer in the group ...">>.
91+
92+
output(ok, _Opts) ->
93+
'Elixir.RabbitMQ.CLI.DefaultOutput':output({ok,
94+
<<"OK">>});
95+
output({error, not_found}, _Opts) ->
96+
'Elixir.RabbitMQ.CLI.DefaultOutput':output({error_string,
97+
<<"The group does not exist">>});
98+
output(Result, _Opts) ->
99+
'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).

deps/rabbitmq_stream/test/commands_SUITE.erl

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,17 @@
3333
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand').
3434
-define(COMMAND_LIST_STREAM_TRACKING,
3535
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand').
36+
-define(COMMAND_ACTIVATE_STREAM_CONSUMER,
37+
'Elixir.RabbitMQ.CLI.Ctl.Commands.ActivateStreamConsumerCommand').
38+
3639

3740
all() ->
3841
[{group, list_connections},
3942
{group, list_consumers},
4043
{group, list_publishers},
4144
{group, list_consumer_groups},
4245
{group, list_group_consumers},
46+
{group, activate_consumer},
4347
{group, list_stream_tracking},
4448
{group, super_streams}].
4549

@@ -57,6 +61,9 @@ groups() ->
5761
{list_group_consumers, [],
5862
[list_group_consumers_validate, list_group_consumers_merge_defaults,
5963
list_group_consumers_run]},
64+
{activate_consumer, [],
65+
[activate_consumer_validate, activate_consumer_merge_defaults,
66+
activate_consumer_run]},
6067
{list_stream_tracking, [],
6168
[list_stream_tracking_validate, list_stream_tracking_merge_defaults,
6269
list_stream_tracking_run]},
@@ -524,6 +531,67 @@ list_group_consumers_run(Config) ->
524531
close(S, C),
525532
ok.
526533

534+
activate_consumer_validate(_) ->
535+
Cmd = ?COMMAND_ACTIVATE_STREAM_CONSUMER,
536+
ValidOpts = #{vhost => <<"/">>,
537+
stream => <<"s1">>,
538+
reference => <<"foo">>},
539+
?assertMatch({validation_failure, not_enough_args},
540+
Cmd:validate([], #{})),
541+
?assertMatch({validation_failure, not_enough_args},
542+
Cmd:validate([], #{vhost => <<"test">>})),
543+
?assertMatch({validation_failure, too_many_args},
544+
Cmd:validate([<<"foo">>], ValidOpts)),
545+
?assertMatch(ok, Cmd:validate([], ValidOpts)).
546+
547+
activate_consumer_merge_defaults(_Config) ->
548+
Cmd = ?COMMAND_ACTIVATE_STREAM_CONSUMER,
549+
Opts = #{vhost => <<"/">>,
550+
stream => <<"s1">>,
551+
reference => <<"foo">>},
552+
?assertEqual({[], Opts},
553+
Cmd:merge_defaults([], maps:without([vhost], Opts))),
554+
Merged = maps:merge(Opts, #{vhost => "vhost"}),
555+
?assertEqual({[], Merged},
556+
Cmd:merge_defaults([], Merged)).
557+
558+
activate_consumer_run(Config) ->
559+
Cmd = ?COMMAND_ACTIVATE_STREAM_CONSUMER,
560+
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
561+
Opts =#{node => Node,
562+
timeout => 10000,
563+
vhost => <<"/">>},
564+
Args = [],
565+
566+
St = atom_to_binary(?FUNCTION_NAME, utf8),
567+
ConsumerReference = <<"foo">>,
568+
OptsGroup = maps:merge(#{stream => St, reference => ConsumerReference},
569+
Opts),
570+
571+
%% the group does not exist yet
572+
?assertEqual({error, not_found}, Cmd:run(Args, OptsGroup)),
573+
574+
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
575+
{S, C} = start_stream_connection(StreamPort),
576+
?awaitMatch(1, connection_count(Config), ?WAIT),
577+
578+
SubProperties =#{<<"single-active-consumer">> => <<"true">>,
579+
<<"name">> => ConsumerReference},
580+
581+
create_stream(S, St, C),
582+
subscribe(S, 0, St, SubProperties, C),
583+
handle_consumer_update(S, C, 0),
584+
subscribe(S, 1, St, SubProperties, C),
585+
subscribe(S, 2, St, SubProperties, C),
586+
587+
?awaitMatch(3, consumer_count(Config), ?WAIT),
588+
589+
?assertEqual(ok, Cmd:run(Args, OptsGroup)),
590+
591+
delete_stream(S, St, C),
592+
close(S, C),
593+
ok.
594+
527595
handle_consumer_update(S, C0, SubId) ->
528596
{{request, CorrId, {consumer_update, SubId, true}}, C1} =
529597
rabbit_stream_SUITE:receive_commands(gen_tcp, S, C0),

0 commit comments

Comments
 (0)