Skip to content

Commit fb6dfd9

Browse files
Merge pull request #12112 from rabbitmq/mergify/bp/v4.0.x/pr-12109
Default queue type (DQT): fall back to node-wide default when virtual host has no metadata set (backport #12109)
2 parents e60d512 + bd2ea28 commit fb6dfd9

File tree

7 files changed

+109
-47
lines changed

7 files changed

+109
-47
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -733,31 +733,38 @@ augment_declare_args(VHost, Durable, Exclusive, AutoDelete, Args0) ->
733733
#{default_queue_type := DefaultQueueType}
734734
when is_binary(DefaultQueueType) andalso
735735
not HasQTypeArg ->
736-
Type = rabbit_queue_type:discover(DefaultQueueType),
737-
IsPermitted = is_queue_args_combination_permitted(
738-
Durable, Exclusive),
739-
IsCompatible = rabbit_queue_type:is_compatible(
740-
Type, Durable, Exclusive, AutoDelete),
741-
case IsPermitted andalso IsCompatible of
742-
true ->
743-
%% patch up declare arguments with x-queue-type if there
744-
%% is a vhost default set the queue is durable and not exclusive
745-
%% and there is no queue type argument
746-
%% present
747-
rabbit_misc:set_table_value(Args0,
748-
<<"x-queue-type">>,
749-
longstr,
750-
DefaultQueueType);
751-
false ->
752-
%% if the properties are incompatible with the declared
753-
%% DQT, use the fall back type
754-
rabbit_misc:set_table_value(Args0,
755-
<<"x-queue-type">>,
756-
longstr,
757-
rabbit_queue_type:short_alias_of(rabbit_queue_type:fallback()))
758-
end;
736+
update_args_table_with_queue_type(DefaultQueueType, Durable, Exclusive, AutoDelete, Args0);
759737
_ ->
760-
Args0
738+
case HasQTypeArg of
739+
true -> Args0;
740+
false ->
741+
update_args_table_with_queue_type(rabbit_queue_type:short_alias_of(rabbit_queue_type:default()), Durable, Exclusive, AutoDelete, Args0)
742+
end
743+
end.
744+
745+
update_args_table_with_queue_type(DefaultQueueType, Durable, Exclusive, AutoDelete, Args) ->
746+
Type = rabbit_queue_type:discover(DefaultQueueType),
747+
IsPermitted = is_queue_args_combination_permitted(
748+
Durable, Exclusive),
749+
IsCompatible = rabbit_queue_type:is_compatible(
750+
Type, Durable, Exclusive, AutoDelete),
751+
case IsPermitted andalso IsCompatible of
752+
true ->
753+
%% patch up declare arguments with x-queue-type if there
754+
%% is a vhost default set the queue is durable and not exclusive
755+
%% and there is no queue type argument
756+
%% present
757+
rabbit_misc:set_table_value(Args,
758+
<<"x-queue-type">>,
759+
longstr,
760+
DefaultQueueType);
761+
false ->
762+
%% if the properties are incompatible with the declared
763+
%% DQT, use the fall back type
764+
rabbit_misc:set_table_value(Args,
765+
<<"x-queue-type">>,
766+
longstr,
767+
rabbit_queue_type:short_alias_of(rabbit_queue_type:fallback()))
761768
end.
762769

763770
-spec check_exclusive_access(amqqueue:amqqueue(), pid()) ->

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,14 +300,23 @@ short_alias_of(<<"rabbit_quorum_queue">>) ->
300300
<<"quorum">>;
301301
short_alias_of(rabbit_quorum_queue) ->
302302
<<"quorum">>;
303+
%% AMQP 1.0 management client
304+
short_alias_of({utf8, <<"quorum">>}) ->
305+
<<"quorum">>;
303306
short_alias_of(<<"rabbit_classic_queue">>) ->
304307
<<"classic">>;
305308
short_alias_of(rabbit_classic_queue) ->
306309
<<"classic">>;
310+
%% AMQP 1.0 management client
311+
short_alias_of({utf8, <<"classic">>}) ->
312+
<<"classic">>;
307313
short_alias_of(<<"rabbit_stream_queue">>) ->
308314
<<"stream">>;
309315
short_alias_of(rabbit_stream_queue) ->
310316
<<"stream">>;
317+
%% AMQP 1.0 management client
318+
short_alias_of({utf8, <<"stream">>}) ->
319+
<<"stream">>;
311320
short_alias_of(_Other) ->
312321
undefined.
313322

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ all_tests() ->
144144
server_system_recover,
145145
vhost_with_quorum_queue_is_deleted,
146146
vhost_with_default_queue_type_declares_quorum_queue,
147+
node_wide_default_queue_type_declares_quorum_queue,
147148
delete_immediately_by_resource,
148149
consume_redelivery_count,
149150
subscribe_redelivery_count,
@@ -604,7 +605,7 @@ start_queue_concurrent(Config) ->
604605
quorum_cluster_size_3(Config) ->
605606
case rabbit_ct_helpers:is_mixed_versions() of
606607
true ->
607-
{skip, "quorum_cluster_size_3 tests isn't mixed version reliable"};
608+
{skip, "quorum_cluster_size_3 test isn't mixed version reliable"};
608609
false ->
609610
quorum_cluster_size_x(Config, 3, 3)
610611
end.
@@ -829,6 +830,40 @@ vhost_with_default_queue_type_declares_quorum_queue(Config) ->
829830
amqp_connection:close(Conn),
830831
ok.
831832

833+
node_wide_default_queue_type_declares_quorum_queue(Config) ->
834+
case rabbit_ct_helpers:is_mixed_versions() of
835+
true ->
836+
{skip, "node_wide_default_queue_type_declares_quorum_queue test isn't mixed version compatible"};
837+
false ->
838+
node_wide_default_queue_type_declares_quorum_queue0(Config)
839+
end.
840+
841+
node_wide_default_queue_type_declares_quorum_queue0(Config) ->
842+
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
843+
rpc:call(Node, application, set_env, [rabbit, default_queue_type, rabbit_quorum_queue]),
844+
VHost = atom_to_binary(?FUNCTION_NAME, utf8),
845+
QName = atom_to_binary(?FUNCTION_NAME, utf8),
846+
User = ?config(rmq_username, Config),
847+
848+
AddVhostArgs = [VHost, #{}, User],
849+
ok = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_vhost, add,
850+
AddVhostArgs),
851+
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
852+
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node, VHost),
853+
{ok, Ch} = amqp_connection:open_channel(Conn),
854+
?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [])),
855+
assert_queue_type(Node, VHost, QName, rabbit_quorum_queue),
856+
?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [])),
857+
?assertEqual({'queue.declare_ok', QName, 0, 0},
858+
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
859+
?assertEqual({'queue.declare_ok', QName, 0, 0}, declare_passive(Ch, QName, [])),
860+
?assertEqual({'queue.declare_ok', QName, 0, 0},
861+
declare_passive(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
862+
amqp_connection:close(Conn),
863+
864+
rpc:call(Node, application, set_env, [rabbit, default_queue_type, rabbit_classic_queue]),
865+
ok.
866+
832867
restart_all_types(Config) ->
833868
%% Test the node restart with both types of queues (quorum and classic) to
834869
%% ensure there are no regressions
@@ -1236,7 +1271,7 @@ shrink_all(Config) ->
12361271
rebalance(Config) ->
12371272
case rabbit_ct_helpers:is_mixed_versions() of
12381273
true ->
1239-
{skip, "rebalance tests isn't mixed version compatible"};
1274+
{skip, "rebalance test isn't mixed version compatible"};
12401275
false ->
12411276
rebalance0(Config)
12421277
end.
@@ -1704,7 +1739,7 @@ leadership_takeover(Config) ->
17041739
metrics_cleanup_on_leadership_takeover(Config) ->
17051740
case rabbit_ct_helpers:is_mixed_versions() of
17061741
true ->
1707-
{skip, "metrics_cleanup_on_leadership_takeover tests isn't mixed version compatible"};
1742+
{skip, "metrics_cleanup_on_leadership_takeover test isn't mixed version compatible"};
17081743
false ->
17091744
metrics_cleanup_on_leadership_takeover0(Config)
17101745
end.

deps/rabbitmq_amqp_client/test/management_SUITE.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ init_per_testcase(Testcase, Config) ->
117117
rabbit_ct_helpers:testcase_started(Config, Testcase).
118118

119119
end_per_testcase(Testcase, Config) ->
120-
%% Assert that every testcase cleaned up.
120+
%% Ensure that all queues were cleaned up
121121
eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))),
122122
rabbit_ct_helpers:testcase_finished(Config, Testcase).
123123

@@ -268,12 +268,12 @@ all_management_operations(Config) ->
268268
queue_defaults(Config) ->
269269
Init = {_, LinkPair} = init(Config),
270270
QName = atom_to_binary(?FUNCTION_NAME),
271+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
271272
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
272-
[Q] = rpc(Config, rabbit_amqqueue, list, []),
273+
{ok, Q} = rpc(Config, rabbit_amqqueue, lookup, [QName, <<"/">>]),
273274
?assert(rpc(Config, amqqueue, is_durable, [Q])),
274275
?assertNot(rpc(Config, amqqueue, is_exclusive, [Q])),
275276
?assertNot(rpc(Config, amqqueue, is_auto_delete, [Q])),
276-
?assertEqual([], rpc(Config, amqqueue, get_arguments, [Q])),
277277

278278
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
279279
ok = cleanup(Init).
@@ -448,10 +448,11 @@ declare_queue_default_queue_type(Config) ->
448448
{ok, Session} = amqp10_client:begin_session_sync(Connection),
449449
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>),
450450

451+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
451452
?assertMatch({ok, #{type := <<"quorum">>}},
452453
rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{})),
453454

454-
{ok, #{}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
455+
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
455456
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
456457
ok = amqp10_client:end_session(Session),
457458
ok = amqp10_client:close_connection(Connection),

deps/rabbitmq_ct_helpers/src/rabbit_mgmt_test_util.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,10 @@ atomize_map_keys(I) ->
268268

269269
%% @todo There wasn't a specific order before; now there is; maybe we shouldn't have one?
270270
assert_list(Exp, Act) ->
271-
case length(Exp) == length(Act) of
271+
%% allow actual map to include keys we do not assert on
272+
%% but not the other way around: we may want to only assert on a subset
273+
%% of keys
274+
case length(Act) >= length(Exp) of
272275
true -> ok;
273276
false -> error({expected, Exp, actual, Act})
274277
end,

deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,21 +1152,24 @@ queues_test(Config) ->
11521152
durable => true,
11531153
auto_delete => false,
11541154
exclusive => false,
1155-
arguments => #{}},
1155+
arguments => #{'x-queue-type' => <<"classic">>}
1156+
},
11561157
#{name => <<"foo">>,
11571158
vhost => <<"downvhost">>,
11581159
state => <<"stopped">>,
11591160
durable => true,
11601161
auto_delete => false,
11611162
exclusive => false,
1162-
arguments => #{}}], DownQueues),
1163+
arguments => #{'x-queue-type' => <<"classic">>}
1164+
}], DownQueues),
11631165
assert_item(#{name => <<"foo">>,
11641166
vhost => <<"downvhost">>,
11651167
state => <<"stopped">>,
11661168
durable => true,
11671169
auto_delete => false,
11681170
exclusive => false,
1169-
arguments => #{}}, DownQueue),
1171+
arguments => #{'x-queue-type' => <<"classic">>}
1172+
}, DownQueue),
11701173

11711174
http_put(Config, "/queues/badvhost/bar", Good, ?NOT_FOUND),
11721175
http_put(Config, "/queues/%2F/bar",
@@ -1188,21 +1191,21 @@ queues_test(Config) ->
11881191
durable => true,
11891192
auto_delete => false,
11901193
exclusive => false,
1191-
arguments => #{},
1194+
arguments => #{'x-queue-type' => <<"classic">>},
11921195
storage_version => 2},
11931196
#{name => <<"foo">>,
11941197
vhost => <<"/">>,
11951198
durable => true,
11961199
auto_delete => false,
11971200
exclusive => false,
1198-
arguments => #{},
1201+
arguments => #{'x-queue-type' => <<"classic">>},
11991202
storage_version => 2}], Queues),
12001203
assert_item(#{name => <<"foo">>,
12011204
vhost => <<"/">>,
12021205
durable => true,
12031206
auto_delete => false,
12041207
exclusive => false,
1205-
arguments => #{},
1208+
arguments => #{'x-queue-type' => <<"classic">>},
12061209
storage_version => 2}, Queue),
12071210

12081211
http_delete(Config, "/queues/%2F/foo", {group, '2xx'}),
@@ -2242,7 +2245,8 @@ exclusive_queue_test(Config) ->
22422245
durable => false,
22432246
auto_delete => false,
22442247
exclusive => true,
2245-
arguments => #{}}, Queue),
2248+
arguments => #{'x-queue-type' => <<"classic">>}
2249+
}, Queue),
22462250
true
22472251
end),
22482252
amqp_channel:close(Ch),
@@ -2809,7 +2813,7 @@ columns_test(Config) ->
28092813
http_delete(Config, Path, [{group, '2xx'}, 404]),
28102814
http_put(Config, Path, [{arguments, [{<<"x-message-ttl">>, TTL}]}],
28112815
{group, '2xx'}),
2812-
Item = #{arguments => #{'x-message-ttl' => TTL}, name => <<"columns.test">>},
2816+
Item = #{arguments => #{'x-message-ttl' => TTL, 'x-queue-type' => <<"classic">>}, name => <<"columns.test">>},
28132817

28142818
?AWAIT(
28152819
begin

deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -381,21 +381,23 @@ queues_test(Config) ->
381381
durable => true,
382382
auto_delete => false,
383383
exclusive => false,
384-
arguments => #{}},
384+
arguments => #{'x-queue-type' => <<"classic">>}
385+
},
385386
#{name => <<"foo">>,
386387
vhost => <<"downvhost">>,
387388
state => <<"stopped">>,
388389
durable => true,
389390
auto_delete => false,
390391
exclusive => false,
391-
arguments => #{}}], DownQueues),
392+
arguments => #{'x-queue-type' => <<"classic">>}
393+
}], DownQueues),
392394
assert_item(#{name => <<"foo">>,
393395
vhost => <<"downvhost">>,
394396
state => <<"stopped">>,
395397
durable => true,
396398
auto_delete => false,
397399
exclusive => false,
398-
arguments => #{}}, DownQueue),
400+
arguments => #{'x-queue-type' => <<"classic">>}}, DownQueue),
399401

400402
http_put(Config, "/queues/badvhost/bar", Good, ?NOT_FOUND),
401403
http_put(Config, "/queues/%2F/bar",
@@ -418,7 +420,7 @@ queues_test(Config) ->
418420
durable => true,
419421
auto_delete => false,
420422
exclusive => false,
421-
arguments => #{},
423+
arguments => #{'x-queue-type' => <<"classic">>},
422424
node => NodeBin},
423425
#{name => <<"foo">>,
424426
vhost => <<"/">>,
@@ -495,7 +497,7 @@ queues_enable_totals_test(Config) ->
495497
durable => true,
496498
auto_delete => false,
497499
exclusive => false,
498-
arguments => #{},
500+
arguments => #{'x-queue-type' => <<"classic">>},
499501
node => NodeBin,
500502
messages => 1,
501503
messages_ready => 1,
@@ -882,7 +884,8 @@ exclusive_queue_test(Config) ->
882884
durable => false,
883885
auto_delete => false,
884886
exclusive => true,
885-
arguments => #{}}, Queue),
887+
arguments => #{'x-queue-type' => <<"classic">>}
888+
}, Queue),
886889
amqp_channel:close(Ch),
887890
close_connection(Conn),
888891
passed.
@@ -1514,7 +1517,7 @@ columns_test(Config) ->
15141517
http_delete(Config, Path, [{group, '2xx'}, 404]),
15151518
http_put(Config, Path, [{arguments, [{<<"x-message-ttl">>, TTL}]}],
15161519
{group, '2xx'}),
1517-
Item = #{arguments => #{'x-message-ttl' => TTL}, name => <<"columns.test">>},
1520+
Item = #{arguments => #{'x-message-ttl' => TTL, 'x-queue-type' => <<"classic">>}, name => <<"columns.test">>},
15181521
timer:sleep(2000),
15191522
[Item] = http_get(Config, "/queues?columns=arguments.x-message-ttl,name", ?OK),
15201523
Item = http_get(Config, "/queues/%2F/columns.test?columns=arguments.x-message-ttl,name", ?OK),

0 commit comments

Comments
 (0)