Skip to content

Commit 4e71683

Browse files
Make 'queue.declare' aware of virtual host DQT
at validation time. DQT = default queue type. When a client provides no queue type, validation should take the defaults (virtual host, global, and the last resort fallback) into account instead of considering the type to be "undefined". References #11457 ##11528
1 parent f5cb65b commit 4e71683

File tree

4 files changed

+104
-11
lines changed

4 files changed

+104
-11
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
-export([is_server_named_allowed/1]).
6464

6565
-export([check_max_age/1]).
66-
-export([get_queue_type/1, get_resource_vhost_name/1, get_resource_name/1]).
66+
-export([get_queue_type/1, get_queue_type/2, get_resource_vhost_name/1, get_resource_name/1]).
6767

6868
-export([deactivate_limit_all/2]).
6969

@@ -220,8 +220,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
220220
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
221221
declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
222222
Owner, ActingUser, Node) ->
223-
ok = check_declare_arguments(QueueName, Args),
224-
Type = get_queue_type(Args),
223+
%% note: this is a module name, not a shortcut such as <<"quorum">>
224+
DQT = rabbit_vhost:default_queue_type(VHost, rabbit_queue_type:fallback()),
225+
ok = check_declare_arguments(QueueName, Args, DQT),
226+
Type = get_queue_type(Args, DQT),
225227
case rabbit_queue_type:is_enabled(Type) of
226228
true ->
227229
Q = amqqueue:new(QueueName,
@@ -248,10 +250,19 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
248250
[rabbit_misc:rs(QueueName), Type, Node]}
249251
end.
250252

253+
-spec get_queue_type(Args :: rabbit_framing:amqp_table()) -> rabbit_queue_type:queue_type().
254+
%% This version is not virtual host metadata-aware but will use
255+
%% the node-wide default type as well as 'rabbit_queue_type:fallback/0'.
251256
get_queue_type(Args) ->
257+
get_queue_type(Args, rabbit_queue_type:default()).
258+
259+
%% This version should be used together with 'rabbit_vhost:default_queue_type/{1,2}'
260+
get_queue_type(Args, DefaultQueueType) ->
252261
case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of
253-
undefined ->
254-
rabbit_queue_type:default();
262+
{longstr, undefined} ->
263+
rabbit_queue_type:discover(DefaultQueueType);
264+
{longstr, <<"undefined">>} ->
265+
rabbit_queue_type:discover(DefaultQueueType);
255266
{_, V} ->
256267
rabbit_queue_type:discover(V)
257268
end.
@@ -783,7 +794,31 @@ assert_args_equivalence(Q, NewArgs) ->
783794
QueueTypeArgs = rabbit_queue_type:arguments(queue_arguments, Type),
784795
rabbit_misc:assert_args_equivalence(ExistingArgs, NewArgs, QueueName, QueueTypeArgs).
785796

786-
check_declare_arguments(QueueName, Args) ->
797+
-spec maybe_inject_default_queue_type_shortcut_into_args(
798+
rabbit_framing:amqp_table(), rabbit_queue_type:queue_type()) -> rabbit_framing:amqp_table().
799+
maybe_inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType) ->
800+
case rabbit_misc:table_lookup(Args0, <<"x-queue-type">>) of
801+
{longstr, undefined} ->
802+
%% Important: use a shortcut such as 'quorum' or 'stream' that for the given queue type module
803+
inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType);
804+
{longstr, <<"undefined">>} ->
805+
%% Important: use a shortcut such as 'quorum' or 'stream' that for the given queue type module
806+
inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType);
807+
_ValueIsAlreadySet ->
808+
Args0
809+
end.
810+
811+
-spec inject_default_queue_type_shortcut_into_args(
812+
rabbit_framing:amqp_table(), rabbit_queue_type:queue_type()) -> rabbit_framing:amqp_table().
813+
inject_default_queue_type_shortcut_into_args(Args0, QueueType) ->
814+
Shortcut = rabbit_queue_type:short_alias_of(QueueType),
815+
NewVal = rabbit_data_coercion:to_binary(Shortcut),
816+
rabbit_misc:set_table_value(Args0, <<"x-queue-type">>, longstr, NewVal).
817+
818+
check_declare_arguments(QueueName, Args0, DefaultQueueType) ->
819+
%% If the x-queue-type was not provided by the client, inject the
820+
%% (virtual host, global or fallback) default before performing validation. MK.
821+
Args = maybe_inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType),
787822
check_arguments_type_and_value(QueueName, Args, [{<<"x-queue-type">>, fun check_queue_type/2}]),
788823
Type = get_queue_type(Args),
789824
QueueTypeArgs = rabbit_queue_type:arguments(queue_arguments, Type),

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
init/0,
1919
close/1,
2020
discover/1,
21+
short_alias_of/1,
2122
feature_flag_name/1,
2223
to_binary/1,
2324
default/0,
25+
fallback/0,
2426
is_enabled/1,
2527
is_compatible/4,
2628
declare/2,
@@ -70,7 +72,7 @@
7072
%% sequence number typically
7173
-type correlation() :: term().
7274
-type arguments() :: queue_arguments | consumer_arguments.
73-
-type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue.
75+
-type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue | module().
7476
%% see AMQP 1.0 §2.6.7
7577
-type delivery_count() :: sequence_no().
7678
-type credit() :: uint().
@@ -253,20 +255,49 @@
253255
-callback notify_decorators(amqqueue:amqqueue()) ->
254256
ok.
255257

258+
-spec discover(binary() | atom()) -> queue_type().
259+
discover(<<"undefined">>) ->
260+
fallback();
261+
discover(undefined) ->
262+
fallback();
256263
%% TODO: should this use a registry that's populated on boot?
257264
discover(<<"quorum">>) ->
258265
rabbit_quorum_queue;
266+
discover(rabbit_quorum_queue) ->
267+
rabbit_quorum_queue;
259268
discover(<<"classic">>) ->
260269
rabbit_classic_queue;
270+
discover(rabbit_classic_queue) ->
271+
rabbit_classic_queue;
272+
discover(rabbit_stream_queue) ->
273+
rabbit_stream_queue;
261274
discover(<<"stream">>) ->
262275
rabbit_stream_queue;
263276
discover(Other) when is_atom(Other) ->
264277
discover(rabbit_data_coercion:to_binary(Other));
265278
discover(Other) when is_binary(Other) ->
266279
T = rabbit_registry:binary_to_type(Other),
280+
rabbit_log:debug("Queue type discovery: will look up a module for type '~tp'", [T]),
267281
{ok, Mod} = rabbit_registry:lookup_module(queue, T),
268282
Mod.
269283

284+
-spec short_alias_of(queue_type()) -> binary().
285+
%% The opposite of discover/1: returns a short alias given a module name
286+
short_alias_of(<<"rabbit_quorum_queue">>) ->
287+
<<"quorum">>;
288+
short_alias_of(rabbit_quorum_queue) ->
289+
<<"quorum">>;
290+
short_alias_of(<<"rabbit_classic_queue">>) ->
291+
<<"classic">>;
292+
short_alias_of(rabbit_classic_queue) ->
293+
<<"classic">>;
294+
short_alias_of(<<"rabbit_stream_queue">>) ->
295+
<<"stream">>;
296+
short_alias_of(rabbit_stream_queue) ->
297+
<<"stream">>;
298+
short_alias_of(_Other) ->
299+
undefined.
300+
270301
feature_flag_name(<<"quorum">>) ->
271302
quorum_queue;
272303
feature_flag_name(<<"classic">>) ->
@@ -276,10 +307,19 @@ feature_flag_name(<<"stream">>) ->
276307
feature_flag_name(_) ->
277308
undefined.
278309

310+
%% If the client does not specify the type, the virtual host does not have any
311+
%% metadata default, and rabbit.default_queue_type is not set in the application env,
312+
%% use this type as the last resort.
313+
-spec fallback() -> queue_type().
314+
fallback() ->
315+
rabbit_classic_queue.
316+
317+
-spec default() -> queue_type().
279318
default() ->
280-
rabbit_misc:get_env(rabbit,
281-
default_queue_type,
282-
rabbit_classic_queue).
319+
V = rabbit_misc:get_env(rabbit,
320+
default_queue_type,
321+
fallback()),
322+
rabbit_data_coercion:to_atom(V).
283323

284324
-spec to_binary(module()) -> binary().
285325
to_binary(rabbit_classic_queue) ->

deps/rabbit/src/rabbit_vhost.erl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
-export([delete_storage/1]).
2323
-export([vhost_down/1]).
2424
-export([put_vhost/6]).
25+
-export([default_queue_type/1, default_queue_type/2]).
2526

2627
%%
2728
%% API
@@ -481,6 +482,22 @@ default_name() ->
481482
undefined -> <<"/">>
482483
end.
483484

485+
-spec default_queue_type(VirtualHost :: vhost:name()) -> rabbit_queue_type:queue_type().
486+
default_queue_type(VirtualHost) ->
487+
default_queue_type(VirtualHost, rabbit_queue_type:fallback()).
488+
-spec default_queue_type(VirtualHost :: vhost:name(), Fallback :: rabbit_queue_type:queue_type()) -> rabbit_queue_type:queue_type().
489+
default_queue_type(VirtualHost, FallbackQueueType) ->
490+
case exists(VirtualHost) of
491+
false -> FallbackQueueType;
492+
true ->
493+
Record = lookup(VirtualHost),
494+
case vhost:get_default_queue_type(Record) of
495+
undefined -> FallbackQueueType;
496+
<<"undefined">> -> FallbackQueueType;
497+
Type -> Type
498+
end
499+
end.
500+
484501
-spec lookup(vhost:name()) -> vhost:vhost() | rabbit_types:ok_or_error(any()).
485502
lookup(VHostName) ->
486503
case rabbit_db_vhost:get(VHostName) of

deps/rabbitmq_management/test/stats_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
-module(stats_SUITE).
99

1010
-include_lib("proper/include/proper.hrl").
11+
-include_lib("eunit/include/eunit.hrl").
1112
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
1213

1314
-compile(export_all).
@@ -175,4 +176,4 @@ format_range_constant(_Config) ->
175176
SamplesFun),
176177
5 = proplists:get_value(publish, Got),
177178
PD = proplists:get_value(publish_details, Got),
178-
0.0 = proplists:get_value(rate, PD).
179+
?assertEqual(0.0, proplists:get_value(rate, PD)).

0 commit comments

Comments
 (0)