Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -733,31 +733,38 @@ augment_declare_args(VHost, Durable, Exclusive, AutoDelete, Args0) ->
#{default_queue_type := DefaultQueueType}
when is_binary(DefaultQueueType) andalso
not HasQTypeArg ->
Type = rabbit_queue_type:discover(DefaultQueueType),
IsPermitted = is_queue_args_combination_permitted(
Durable, Exclusive),
IsCompatible = rabbit_queue_type:is_compatible(
Type, Durable, Exclusive, AutoDelete),
case IsPermitted andalso IsCompatible of
true ->
%% patch up declare arguments with x-queue-type if there
%% is a vhost default set the queue is durable and not exclusive
%% and there is no queue type argument
%% present
rabbit_misc:set_table_value(Args0,
<<"x-queue-type">>,
longstr,
DefaultQueueType);
false ->
%% if the properties are incompatible with the declared
%% DQT, use the fall back type
rabbit_misc:set_table_value(Args0,
<<"x-queue-type">>,
longstr,
rabbit_queue_type:short_alias_of(rabbit_queue_type:fallback()))
end;
update_args_table_with_queue_type(DefaultQueueType, Durable, Exclusive, AutoDelete, Args0);
_ ->
Args0
case HasQTypeArg of
true -> Args0;
false ->
update_args_table_with_queue_type(rabbit_queue_type:short_alias_of(rabbit_queue_type:default()), Durable, Exclusive, AutoDelete, Args0)
end
end.

update_args_table_with_queue_type(DefaultQueueType, Durable, Exclusive, AutoDelete, Args) ->
Type = rabbit_queue_type:discover(DefaultQueueType),
IsPermitted = is_queue_args_combination_permitted(
Durable, Exclusive),
IsCompatible = rabbit_queue_type:is_compatible(
Type, Durable, Exclusive, AutoDelete),
case IsPermitted andalso IsCompatible of
true ->
%% patch up declare arguments with x-queue-type if there
%% is a vhost default set the queue is durable and not exclusive
%% and there is no queue type argument
%% present
rabbit_misc:set_table_value(Args,
<<"x-queue-type">>,
longstr,
DefaultQueueType);
false ->
%% if the properties are incompatible with the declared
%% DQT, use the fall back type
rabbit_misc:set_table_value(Args,
<<"x-queue-type">>,
longstr,
rabbit_queue_type:short_alias_of(rabbit_queue_type:fallback()))
end.

-spec check_exclusive_access(amqqueue:amqqueue(), pid()) ->
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,23 @@ short_alias_of(<<"rabbit_quorum_queue">>) ->
<<"quorum">>;
short_alias_of(rabbit_quorum_queue) ->
<<"quorum">>;
%% AMQP 1.0 management client
short_alias_of({utf8, <<"quorum">>}) ->
<<"quorum">>;
short_alias_of(<<"rabbit_classic_queue">>) ->
<<"classic">>;
short_alias_of(rabbit_classic_queue) ->
<<"classic">>;
%% AMQP 1.0 management client
short_alias_of({utf8, <<"classic">>}) ->
<<"classic">>;
short_alias_of(<<"rabbit_stream_queue">>) ->
<<"stream">>;
short_alias_of(rabbit_stream_queue) ->
<<"stream">>;
%% AMQP 1.0 management client
short_alias_of({utf8, <<"stream">>}) ->
<<"stream">>;
short_alias_of(_Other) ->
undefined.

Expand Down
41 changes: 38 additions & 3 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ all_tests() ->
server_system_recover,
vhost_with_quorum_queue_is_deleted,
vhost_with_default_queue_type_declares_quorum_queue,
node_wide_default_queue_type_declares_quorum_queue,
delete_immediately_by_resource,
consume_redelivery_count,
subscribe_redelivery_count,
Expand Down Expand Up @@ -604,7 +605,7 @@ start_queue_concurrent(Config) ->
quorum_cluster_size_3(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "quorum_cluster_size_3 tests isn't mixed version reliable"};
{skip, "quorum_cluster_size_3 test isn't mixed version reliable"};
false ->
quorum_cluster_size_x(Config, 3, 3)
end.
Expand Down Expand Up @@ -829,6 +830,40 @@ vhost_with_default_queue_type_declares_quorum_queue(Config) ->
amqp_connection:close(Conn),
ok.

node_wide_default_queue_type_declares_quorum_queue(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "node_wide_default_queue_type_declares_quorum_queue test isn't mixed version compatible"};
false ->
node_wide_default_queue_type_declares_quorum_queue0(Config)
end.

node_wide_default_queue_type_declares_quorum_queue0(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
rpc:call(Node, application, set_env, [rabbit, default_queue_type, rabbit_quorum_queue]),
VHost = atom_to_binary(?FUNCTION_NAME, utf8),
QName = atom_to_binary(?FUNCTION_NAME, utf8),
User = ?config(rmq_username, Config),

AddVhostArgs = [VHost, #{}, User],
ok = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_vhost, add,
AddVhostArgs),
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node, VHost),
{ok, Ch} = amqp_connection:open_channel(Conn),
?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [])),
assert_queue_type(Node, VHost, QName, rabbit_quorum_queue),
?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [])),
?assertEqual({'queue.declare_ok', QName, 0, 0},
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertEqual({'queue.declare_ok', QName, 0, 0}, declare_passive(Ch, QName, [])),
?assertEqual({'queue.declare_ok', QName, 0, 0},
declare_passive(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
amqp_connection:close(Conn),

rpc:call(Node, application, set_env, [rabbit, default_queue_type, rabbit_classic_queue]),
ok.

restart_all_types(Config) ->
%% Test the node restart with both types of queues (quorum and classic) to
%% ensure there are no regressions
Expand Down Expand Up @@ -1236,7 +1271,7 @@ shrink_all(Config) ->
rebalance(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "rebalance tests isn't mixed version compatible"};
{skip, "rebalance test isn't mixed version compatible"};
false ->
rebalance0(Config)
end.
Expand Down Expand Up @@ -1704,7 +1739,7 @@ leadership_takeover(Config) ->
metrics_cleanup_on_leadership_takeover(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "metrics_cleanup_on_leadership_takeover tests isn't mixed version compatible"};
{skip, "metrics_cleanup_on_leadership_takeover test isn't mixed version compatible"};
false ->
metrics_cleanup_on_leadership_takeover0(Config)
end.
Expand Down
9 changes: 5 additions & 4 deletions deps/rabbitmq_amqp_client/test/management_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

end_per_testcase(Testcase, Config) ->
%% Assert that every testcase cleaned up.
%% Ensure that all queues were cleaned up
eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))),
rabbit_ct_helpers:testcase_finished(Config, Testcase).

Expand Down Expand Up @@ -268,12 +268,12 @@ all_management_operations(Config) ->
queue_defaults(Config) ->
Init = {_, LinkPair} = init(Config),
QName = atom_to_binary(?FUNCTION_NAME),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
[Q] = rpc(Config, rabbit_amqqueue, list, []),
{ok, Q} = rpc(Config, rabbit_amqqueue, lookup, [QName, <<"/">>]),
?assert(rpc(Config, amqqueue, is_durable, [Q])),
?assertNot(rpc(Config, amqqueue, is_exclusive, [Q])),
?assertNot(rpc(Config, amqqueue, is_auto_delete, [Q])),
?assertEqual([], rpc(Config, amqqueue, get_arguments, [Q])),

{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = cleanup(Init).
Expand Down Expand Up @@ -448,10 +448,11 @@ declare_queue_default_queue_type(Config) ->
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>),

{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
?assertMatch({ok, #{type := <<"quorum">>}},
rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{})),

{ok, #{}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection),
Expand Down
5 changes: 4 additions & 1 deletion deps/rabbitmq_ct_helpers/src/rabbit_mgmt_test_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,10 @@ atomize_map_keys(I) ->

%% @todo There wasn't a specific order before; now there is; maybe we shouldn't have one?
assert_list(Exp, Act) ->
case length(Exp) == length(Act) of
%% allow actual map to include keys we do not assert on
%% but not the other way around: we may want to only assert on a subset
%% of keys
case length(Act) >= length(Exp) of
true -> ok;
false -> error({expected, Exp, actual, Act})
end,
Expand Down
20 changes: 12 additions & 8 deletions deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1152,21 +1152,24 @@ queues_test(Config) ->
durable => true,
auto_delete => false,
exclusive => false,
arguments => #{}},
arguments => #{'x-queue-type' => <<"classic">>}
},
#{name => <<"foo">>,
vhost => <<"downvhost">>,
state => <<"stopped">>,
durable => true,
auto_delete => false,
exclusive => false,
arguments => #{}}], DownQueues),
arguments => #{'x-queue-type' => <<"classic">>}
}], DownQueues),
assert_item(#{name => <<"foo">>,
vhost => <<"downvhost">>,
state => <<"stopped">>,
durable => true,
auto_delete => false,
exclusive => false,
arguments => #{}}, DownQueue),
arguments => #{'x-queue-type' => <<"classic">>}
}, DownQueue),

http_put(Config, "/queues/badvhost/bar", Good, ?NOT_FOUND),
http_put(Config, "/queues/%2F/bar",
Expand All @@ -1188,21 +1191,21 @@ queues_test(Config) ->
durable => true,
auto_delete => false,
exclusive => false,
arguments => #{},
arguments => #{'x-queue-type' => <<"classic">>},
storage_version => 2},
#{name => <<"foo">>,
vhost => <<"/">>,
durable => true,
auto_delete => false,
exclusive => false,
arguments => #{},
arguments => #{'x-queue-type' => <<"classic">>},
storage_version => 2}], Queues),
assert_item(#{name => <<"foo">>,
vhost => <<"/">>,
durable => true,
auto_delete => false,
exclusive => false,
arguments => #{},
arguments => #{'x-queue-type' => <<"classic">>},
storage_version => 2}, Queue),

http_delete(Config, "/queues/%2F/foo", {group, '2xx'}),
Expand Down Expand Up @@ -2242,7 +2245,8 @@ exclusive_queue_test(Config) ->
durable => false,
auto_delete => false,
exclusive => true,
arguments => #{}}, Queue),
arguments => #{'x-queue-type' => <<"classic">>}
}, Queue),
true
end),
amqp_channel:close(Ch),
Expand Down Expand Up @@ -2809,7 +2813,7 @@ columns_test(Config) ->
http_delete(Config, Path, [{group, '2xx'}, 404]),
http_put(Config, Path, [{arguments, [{<<"x-message-ttl">>, TTL}]}],
{group, '2xx'}),
Item = #{arguments => #{'x-message-ttl' => TTL}, name => <<"columns.test">>},
Item = #{arguments => #{'x-message-ttl' => TTL, 'x-queue-type' => <<"classic">>}, name => <<"columns.test">>},

?AWAIT(
begin
Expand Down
17 changes: 10 additions & 7 deletions deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -381,21 +381,23 @@ queues_test(Config) ->
durable => true,
auto_delete => false,
exclusive => false,
arguments => #{}},
arguments => #{'x-queue-type' => <<"classic">>}
},
#{name => <<"foo">>,
vhost => <<"downvhost">>,
state => <<"stopped">>,
durable => true,
auto_delete => false,
exclusive => false,
arguments => #{}}], DownQueues),
arguments => #{'x-queue-type' => <<"classic">>}
}], DownQueues),
assert_item(#{name => <<"foo">>,
vhost => <<"downvhost">>,
state => <<"stopped">>,
durable => true,
auto_delete => false,
exclusive => false,
arguments => #{}}, DownQueue),
arguments => #{'x-queue-type' => <<"classic">>}}, DownQueue),

http_put(Config, "/queues/badvhost/bar", Good, ?NOT_FOUND),
http_put(Config, "/queues/%2F/bar",
Expand All @@ -418,7 +420,7 @@ queues_test(Config) ->
durable => true,
auto_delete => false,
exclusive => false,
arguments => #{},
arguments => #{'x-queue-type' => <<"classic">>},
node => NodeBin},
#{name => <<"foo">>,
vhost => <<"/">>,
Expand Down Expand Up @@ -495,7 +497,7 @@ queues_enable_totals_test(Config) ->
durable => true,
auto_delete => false,
exclusive => false,
arguments => #{},
arguments => #{'x-queue-type' => <<"classic">>},
node => NodeBin,
messages => 1,
messages_ready => 1,
Expand Down Expand Up @@ -882,7 +884,8 @@ exclusive_queue_test(Config) ->
durable => false,
auto_delete => false,
exclusive => true,
arguments => #{}}, Queue),
arguments => #{'x-queue-type' => <<"classic">>}
}, Queue),
amqp_channel:close(Ch),
close_connection(Conn),
passed.
Expand Down Expand Up @@ -1514,7 +1517,7 @@ columns_test(Config) ->
http_delete(Config, Path, [{group, '2xx'}, 404]),
http_put(Config, Path, [{arguments, [{<<"x-message-ttl">>, TTL}]}],
{group, '2xx'}),
Item = #{arguments => #{'x-message-ttl' => TTL}, name => <<"columns.test">>},
Item = #{arguments => #{'x-message-ttl' => TTL, 'x-queue-type' => <<"classic">>}, name => <<"columns.test">>},
timer:sleep(2000),
[Item] = http_get(Config, "/queues?columns=arguments.x-message-ttl,name", ?OK),
Item = http_get(Config, "/queues/%2F/columns.test?columns=arguments.x-message-ttl,name", ?OK),
Expand Down