Skip to content

Commit 3dfdb3d

Browse files
michaelklishinmergify[bot]
authored andcommitted
Make 'queue.declare' aware of virtual host DQT
at validation time. DQT = default queue type. When a client provides no queue type, validation should take the defaults (virtual host, global, and the last resort fallback) into account instead of considering the type to be "undefined". References #11457 ##11528 (cherry picked from commit f3b7a34) # Conflicts: # deps/rabbit/src/rabbit_queue_type.erl # deps/rabbit/src/rabbit_vhost.erl
1 parent 3bbca4b commit 3dfdb3d

File tree

4 files changed

+121
-9
lines changed

4 files changed

+121
-9
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
-export([is_server_named_allowed/1]).
6868

6969
-export([check_max_age/1]).
70-
-export([get_queue_type/1, get_resource_vhost_name/1, get_resource_name/1]).
70+
-export([get_queue_type/1, get_queue_type/2, get_resource_vhost_name/1, get_resource_name/1]).
7171

7272
-export([deactivate_limit_all/2]).
7373

@@ -224,8 +224,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) ->
224224
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
225225
declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
226226
Owner, ActingUser, Node) ->
227-
ok = check_declare_arguments(QueueName, Args),
228-
Type = get_queue_type(Args),
227+
%% note: this is a module name, not a shortcut such as <<"quorum">>
228+
DQT = rabbit_vhost:default_queue_type(VHost, rabbit_queue_type:fallback()),
229+
ok = check_declare_arguments(QueueName, Args, DQT),
230+
Type = get_queue_type(Args, DQT),
229231
case rabbit_queue_type:is_enabled(Type) of
230232
true ->
231233
Q = amqqueue:new(QueueName,
@@ -252,10 +254,25 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
252254
[rabbit_misc:rs(QueueName), Type, Node]}
253255
end.
254256

257+
-spec get_queue_type(Args :: rabbit_framing:amqp_table()) -> rabbit_queue_type:queue_type().
258+
%% This version is not virtual host metadata-aware but will use
259+
%% the node-wide default type as well as 'rabbit_queue_type:fallback/0'.
260+
get_queue_type([]) ->
261+
rabbit_queue_type:default();
255262
get_queue_type(Args) ->
263+
get_queue_type(Args, rabbit_queue_type:default()).
264+
265+
%% This version should be used together with 'rabbit_vhost:default_queue_type/{1,2}'
266+
get_queue_type([], DefaultQueueType) ->
267+
rabbit_queue_type:discover(DefaultQueueType);
268+
get_queue_type(Args, DefaultQueueType) ->
256269
case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of
257270
undefined ->
258-
rabbit_queue_type:default();
271+
rabbit_queue_type:discover(DefaultQueueType);
272+
{longstr, undefined} ->
273+
rabbit_queue_type:discover(DefaultQueueType);
274+
{longstr, <<"undefined">>} ->
275+
rabbit_queue_type:discover(DefaultQueueType);
259276
{_, V} ->
260277
rabbit_queue_type:discover(V)
261278
end.
@@ -804,7 +821,33 @@ assert_args_equivalence(Q, NewArgs) ->
804821
QueueTypeArgs = rabbit_queue_type:arguments(queue_arguments, Type),
805822
rabbit_misc:assert_args_equivalence(ExistingArgs, NewArgs, QueueName, QueueTypeArgs).
806823

807-
check_declare_arguments(QueueName, Args) ->
824+
-spec maybe_inject_default_queue_type_shortcut_into_args(
825+
rabbit_framing:amqp_table(), rabbit_queue_type:queue_type()) -> rabbit_framing:amqp_table().
826+
maybe_inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType) ->
827+
case rabbit_misc:table_lookup(Args0, <<"x-queue-type">>) of
828+
undefined ->
829+
inject_default_queue_type_shortcut_into_args([], DefaultQueueType);
830+
{longstr, undefined} ->
831+
%% Important: use a shortcut such as 'quorum' or 'stream' that for the given queue type module
832+
inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType);
833+
{longstr, <<"undefined">>} ->
834+
%% Important: use a shortcut such as 'quorum' or 'stream' that for the given queue type module
835+
inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType);
836+
_ValueIsAlreadySet ->
837+
Args0
838+
end.
839+
840+
-spec inject_default_queue_type_shortcut_into_args(
841+
rabbit_framing:amqp_table(), rabbit_queue_type:queue_type()) -> rabbit_framing:amqp_table().
842+
inject_default_queue_type_shortcut_into_args(Args0, QueueType) ->
843+
Shortcut = rabbit_queue_type:short_alias_of(QueueType),
844+
NewVal = rabbit_data_coercion:to_binary(Shortcut),
845+
rabbit_misc:set_table_value(Args0, <<"x-queue-type">>, longstr, NewVal).
846+
847+
check_declare_arguments(QueueName, Args0, DefaultQueueType) ->
848+
%% If the x-queue-type was not provided by the client, inject the
849+
%% (virtual host, global or fallback) default before performing validation. MK.
850+
Args = maybe_inject_default_queue_type_shortcut_into_args(Args0, DefaultQueueType),
808851
check_arguments_type_and_value(QueueName, Args, [{<<"x-queue-type">>, fun check_queue_type/2}]),
809852
Type = get_queue_type(Args),
810853
QueueTypeArgs = rabbit_queue_type:arguments(queue_arguments, Type),

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
init/0,
1818
close/1,
1919
discover/1,
20+
short_alias_of/1,
2021
feature_flag_name/1,
2122
default/0,
23+
fallback/0,
2224
is_enabled/1,
2325
is_compatible/4,
2426
declare/2,
@@ -66,9 +68,16 @@
6668
-type queue_state() :: term().
6769
-type msg_tag() :: term().
6870
-type arguments() :: queue_arguments | consumer_arguments.
71+
<<<<<<< HEAD
6972
-type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue.
7073

7174
-export_type([queue_type/0]).
75+
=======
76+
-type queue_type() :: rabbit_classic_queue | rabbit_quorum_queue | rabbit_stream_queue | module().
77+
%% see AMQP 1.0 §2.6.7
78+
-type delivery_count() :: sequence_no().
79+
-type credit() :: uint().
80+
>>>>>>> f3b7a346f9 (Make 'queue.declare' aware of virtual host DQT)
7281

7382
-define(STATE, ?MODULE).
7483

@@ -239,20 +248,49 @@
239248
-callback notify_decorators(amqqueue:amqqueue()) ->
240249
ok.
241250

251+
-spec discover(binary() | atom()) -> queue_type().
252+
discover(<<"undefined">>) ->
253+
fallback();
254+
discover(undefined) ->
255+
fallback();
242256
%% TODO: should this use a registry that's populated on boot?
243257
discover(<<"quorum">>) ->
244258
rabbit_quorum_queue;
259+
discover(rabbit_quorum_queue) ->
260+
rabbit_quorum_queue;
245261
discover(<<"classic">>) ->
246262
rabbit_classic_queue;
263+
discover(rabbit_classic_queue) ->
264+
rabbit_classic_queue;
265+
discover(rabbit_stream_queue) ->
266+
rabbit_stream_queue;
247267
discover(<<"stream">>) ->
248268
rabbit_stream_queue;
249269
discover(Other) when is_atom(Other) ->
250270
discover(rabbit_data_coercion:to_binary(Other));
251271
discover(Other) when is_binary(Other) ->
252272
T = rabbit_registry:binary_to_type(Other),
273+
rabbit_log:debug("Queue type discovery: will look up a module for type '~tp'", [T]),
253274
{ok, Mod} = rabbit_registry:lookup_module(queue, T),
254275
Mod.
255276

277+
-spec short_alias_of(queue_type()) -> binary().
278+
%% The opposite of discover/1: returns a short alias given a module name
279+
short_alias_of(<<"rabbit_quorum_queue">>) ->
280+
<<"quorum">>;
281+
short_alias_of(rabbit_quorum_queue) ->
282+
<<"quorum">>;
283+
short_alias_of(<<"rabbit_classic_queue">>) ->
284+
<<"classic">>;
285+
short_alias_of(rabbit_classic_queue) ->
286+
<<"classic">>;
287+
short_alias_of(<<"rabbit_stream_queue">>) ->
288+
<<"stream">>;
289+
short_alias_of(rabbit_stream_queue) ->
290+
<<"stream">>;
291+
short_alias_of(_Other) ->
292+
undefined.
293+
256294
feature_flag_name(<<"quorum">>) ->
257295
quorum_queue;
258296
feature_flag_name(<<"classic">>) ->
@@ -262,10 +300,19 @@ feature_flag_name(<<"stream">>) ->
262300
feature_flag_name(_) ->
263301
undefined.
264302

303+
%% If the client does not specify the type, the virtual host does not have any
304+
%% metadata default, and rabbit.default_queue_type is not set in the application env,
305+
%% use this type as the last resort.
306+
-spec fallback() -> queue_type().
307+
fallback() ->
308+
rabbit_classic_queue.
309+
310+
-spec default() -> queue_type().
265311
default() ->
266-
rabbit_misc:get_env(rabbit,
267-
default_queue_type,
268-
rabbit_classic_queue).
312+
V = rabbit_misc:get_env(rabbit,
313+
default_queue_type,
314+
fallback()),
315+
rabbit_data_coercion:to_atom(V).
269316

270317
%% is a specific queue type implementation enabled
271318
-spec is_enabled(module()) -> boolean().

deps/rabbit/src/rabbit_vhost.erl

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,13 @@
2222
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0, msg_store_dir_base/0, config_file_path/1, ensure_config_file/1]).
2323
-export([delete_storage/1]).
2424
-export([vhost_down/1]).
25+
<<<<<<< HEAD
2526
-export([put_vhost/5,
2627
put_vhost/6]).
28+
=======
29+
-export([put_vhost/6]).
30+
-export([default_queue_type/1, default_queue_type/2]).
31+
>>>>>>> f3b7a346f9 (Make 'queue.declare' aware of virtual host DQT)
2732

2833
%%
2934
%% API
@@ -503,6 +508,22 @@ default_name() ->
503508
undefined -> <<"/">>
504509
end.
505510

511+
-spec default_queue_type(VirtualHost :: vhost:name()) -> rabbit_queue_type:queue_type().
512+
default_queue_type(VirtualHost) ->
513+
default_queue_type(VirtualHost, rabbit_queue_type:fallback()).
514+
-spec default_queue_type(VirtualHost :: vhost:name(), Fallback :: rabbit_queue_type:queue_type()) -> rabbit_queue_type:queue_type().
515+
default_queue_type(VirtualHost, FallbackQueueType) ->
516+
case exists(VirtualHost) of
517+
false -> FallbackQueueType;
518+
true ->
519+
Record = lookup(VirtualHost),
520+
case vhost:get_default_queue_type(Record) of
521+
undefined -> FallbackQueueType;
522+
<<"undefined">> -> FallbackQueueType;
523+
Type -> Type
524+
end
525+
end.
526+
506527
-spec lookup(vhost:name()) -> vhost:vhost() | rabbit_types:ok_or_error(any()).
507528
lookup(VHostName) ->
508529
case rabbit_db_vhost:get(VHostName) of

deps/rabbitmq_management/test/stats_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
-module(stats_SUITE).
99

1010
-include_lib("proper/include/proper.hrl").
11+
-include_lib("eunit/include/eunit.hrl").
1112
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
1213

1314
-compile(export_all).
@@ -175,4 +176,4 @@ format_range_constant(_Config) ->
175176
SamplesFun),
176177
5 = proplists:get_value(publish, Got),
177178
PD = proplists:get_value(publish_details, Got),
178-
0.0 = proplists:get_value(rate, PD).
179+
?assertEqual(0.0, proplists:get_value(rate, PD)).

0 commit comments

Comments
 (0)