Skip to content

Commit 909b964

Browse files
SimonUngemichaelklishin
authored andcommitted
cluster wide queue limit
1 parent 9c30562 commit 909b964

File tree

5 files changed

+202
-2
lines changed

5 files changed

+202
-2
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,14 @@ rabbitmq_integration_suite(
334334
],
335335
)
336336

337+
rabbitmq_integration_suite(
338+
name = "cluster_limit_SUITE",
339+
size = "medium",
340+
additional_beam = [
341+
":test_queue_utils_beam",
342+
],
343+
)
344+
337345
rabbitmq_integration_suite(
338346
name = "clustering_management_SUITE",
339347
size = "large",

deps/rabbit/app.bzl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2079,6 +2079,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
20792079
erlc_opts = "//:test_erlc_opts",
20802080
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
20812081
)
2082+
erlang_bytecode(
2083+
name = "cluster_limit_SUITE_beam_files",
2084+
testonly = True,
2085+
srcs = ["test/cluster_limit_SUITE.erl"],
2086+
outs = ["test/cluster_limit_SUITE.beam"],
2087+
app_name = "rabbit",
2088+
erlc_opts = "//:test_erlc_opts",
2089+
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"],
2090+
)
20822091
erlang_bytecode(
20832092
name = "message_containers_SUITE_beam_files",
20842093
testonly = True,

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1437,6 +1437,21 @@ fun(Conf) ->
14371437
end}.
14381438

14391439

1440+
{mapping, "cluster_queue_limit", "rabbit.cluster_queue_limit",
1441+
[{datatype, [{atom, infinity}, integer]}]}.
1442+
1443+
{translation, "rabbit.cluster_queue_limit",
1444+
fun(Conf) ->
1445+
case cuttlefish:conf_get("cluster_queue_limit", Conf, undefined) of
1446+
undefined -> cuttlefish:unset();
1447+
infinity -> infinity;
1448+
Val when is_integer(Val) andalso Val > 0 -> Val;
1449+
_ -> cuttlefish:invalid("should be positive integer or 'infinity'")
1450+
end
1451+
end
1452+
}.
1453+
1454+
14401455
%% Interval (in milliseconds) at which we send keepalive messages
14411456
%% to other cluster members. Note that this is not the same thing
14421457
%% as net_ticktime; missed keepalive messages will not cause nodes

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -742,8 +742,8 @@ known_queue_type_names() ->
742742
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
743743
check_queue_limits(Q) ->
744744
maybe
745-
%% Prepare for more checks
746-
ok ?= check_vhost_queue_limit(Q)
745+
ok ?= check_vhost_queue_limit(Q),
746+
ok ?= check_cluster_queue_limit(Q)
747747
end.
748748

749749
check_vhost_queue_limit(Q) ->
@@ -758,3 +758,20 @@ check_vhost_queue_limit(Q) ->
758758
"queue limit in vhost '~ts' (~tp) is reached",
759759
[QueueName, VHost, Limit]}
760760
end.
761+
762+
check_cluster_queue_limit(Q) ->
763+
#resource{name = QueueName} = amqqueue:get_name(Q),
764+
case rabbit_misc:get_env(rabbit, cluster_queue_limit, infinity) of
765+
infinity ->
766+
ok;
767+
Limit ->
768+
case rabbit_db_queue:count() >= Limit of
769+
true ->
770+
{protocol_error, precondition_failed,
771+
"cannot declare queue '~ts': "
772+
"queue limit in cluster (~tp) is reached",
773+
[QueueName, Limit]};
774+
false ->
775+
ok
776+
end
777+
end.
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
8+
-module(cluster_limit_SUITE).
9+
10+
-include_lib("common_test/include/ct.hrl").
11+
-include_lib("eunit/include/eunit.hrl").
12+
-include_lib("amqp_client/include/amqp_client.hrl").
13+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
14+
15+
-compile([nowarn_export_all, export_all]).
16+
17+
18+
all() ->
19+
[
20+
{group, clustered}
21+
].
22+
23+
groups() ->
24+
[
25+
{clustered, [],
26+
[
27+
{size_2, [], [queue_limit]}
28+
]}
29+
].
30+
31+
%% -------------------------------------------------------------------
32+
%% Testsuite setup/teardown.
33+
%% -------------------------------------------------------------------
34+
35+
init_per_suite(Config0) ->
36+
rabbit_ct_helpers:log_environment(),
37+
Config1 = rabbit_ct_helpers:merge_app_env(
38+
Config0, {rabbit, [{quorum_tick_interval, 1000},
39+
{cluster_queue_limit, 3}]}),
40+
rabbit_ct_helpers:run_setup_steps(Config1, []).
41+
42+
end_per_suite(Config) ->
43+
rabbit_ct_helpers:run_teardown_steps(Config).
44+
init_per_group(clustered, Config) ->
45+
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
46+
init_per_group(Group, Config) ->
47+
ClusterSize = case Group of
48+
size_2 -> 2
49+
end,
50+
IsMixed = rabbit_ct_helpers:is_mixed_versions(),
51+
case ClusterSize of
52+
2 when IsMixed ->
53+
{skip, "cluster size 2 isn't mixed versions compatible"};
54+
_ ->
55+
Config1 = rabbit_ct_helpers:set_config(Config,
56+
[{rmq_nodes_count, ClusterSize},
57+
{rmq_nodename_suffix, Group},
58+
{tcp_ports_base}]),
59+
Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]),
60+
rabbit_ct_helpers:run_steps(Config1b,
61+
[fun merge_app_env/1 ] ++
62+
rabbit_ct_broker_helpers:setup_steps())
63+
end.
64+
65+
end_per_group(clustered, Config) ->
66+
Config;
67+
end_per_group(_, Config) ->
68+
rabbit_ct_helpers:run_steps(Config,
69+
rabbit_ct_broker_helpers:teardown_steps()).
70+
71+
init_per_testcase(Testcase, Config) ->
72+
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
73+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
74+
Q = rabbit_data_coercion:to_binary(Testcase),
75+
Config2 = rabbit_ct_helpers:set_config(Config1,
76+
[{queue_name, Q},
77+
{alt_queue_name, <<Q/binary, "_alt">>},
78+
{alt_2_queue_name, <<Q/binary, "_alt_2">>},
79+
{over_limit_queue_name, <<Q/binary, "_over_limit">>}
80+
]),
81+
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
82+
83+
merge_app_env(Config) ->
84+
rabbit_ct_helpers:merge_app_env(
85+
rabbit_ct_helpers:merge_app_env(Config,
86+
{rabbit, [{core_metrics_gc_interval, 100}]}),
87+
{ra, [{min_wal_roll_over_interval, 30000}]}).
88+
89+
end_per_testcase(Testcase, Config) ->
90+
Config1 = rabbit_ct_helpers:run_steps(
91+
Config,
92+
rabbit_ct_client_helpers:teardown_steps()),
93+
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
94+
95+
96+
%% -------------------------------------------------------------------
97+
%% Testcases.
98+
%% -------------------------------------------------------------------
99+
100+
queue_limit(Config) ->
101+
[Server0, Server1] =
102+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
103+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
104+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1),
105+
Q1 = ?config(queue_name, Config),
106+
?assertEqual({'queue.declare_ok', Q1, 0, 0},
107+
declare(Ch, Q1)),
108+
109+
Q2 = ?config(alt_queue_name, Config),
110+
?assertEqual({'queue.declare_ok', Q2, 0, 0},
111+
declare(Ch, Q2)),
112+
113+
Q3 = ?config(alt_2_queue_name, Config),
114+
?assertEqual({'queue.declare_ok', Q3, 0, 0},
115+
declare(Ch, Q3)),
116+
Q4 = ?config(over_limit_queue_name, Config),
117+
ExpectedError = list_to_binary(io_lib:format("PRECONDITION_FAILED - cannot declare queue '~s': queue limit in cluster (3) is reached", [Q4])),
118+
?assertExit(
119+
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
120+
declare(Ch, Q4)),
121+
122+
%% Trying the second server, in the cluster, but no queues on it,
123+
%% but should still fail as the limit is cluster wide.
124+
?assertExit(
125+
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
126+
declare(Ch2, Q4)),
127+
128+
%Trying other types of queues
129+
ChQQ = rabbit_ct_client_helpers:open_channel(Config, Server0),
130+
ChStream = rabbit_ct_client_helpers:open_channel(Config, Server1),
131+
?assertExit(
132+
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
133+
declare(ChQQ, Q4, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
134+
?assertExit(
135+
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
136+
declare(ChStream, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
137+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
138+
ok.
139+
140+
declare(Ch, Q) ->
141+
declare(Ch, Q, []).
142+
143+
declare(Ch, Q, Args) ->
144+
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
145+
durable = true,
146+
auto_delete = false,
147+
arguments = Args}).
148+
149+
delete_queues() ->
150+
[rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
151+
|| Q <- rabbit_amqqueue:list()].

0 commit comments

Comments
 (0)