Skip to content

Commit

Permalink
Add queue.declare tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed Mar 22, 2024
1 parent 72d8165 commit dd49f80
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 66 deletions.
98 changes: 51 additions & 47 deletions deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,19 @@ handle_http_req(<<"GET">>,
_ConnPid) ->
QNameBin = uri_string:unquote(QNameBinQuoted),
QName = rabbit_misc:r(Vhost, queue, QNameBin),
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
{ok, NumMsgs, NumConsumers} = rabbit_amqqueue:stat(Q),
RespPayload = encode_queue(Q, NumMsgs, NumConsumers),
{<<"200">>, RespPayload};
case rabbit_amqqueue:with(
QName,
fun(Q) ->
{ok, NumMsgs, NumConsumers} = rabbit_amqqueue:stat(Q),
RespPayload = encode_queue(Q, NumMsgs, NumConsumers),
{ok, {<<"200">>, RespPayload}}
end) of
{ok, Result} ->
Result;
{error, not_found} ->
case rabbit_amqqueue:not_found_or_absent_dirty(QName) of
not_found ->
throw(<<"404">>, "~ts not found", [rabbit_misc:rs(QName)]);
{absent, Q, Reason} ->
absent(Q, Reason)
end
throw(<<"404">>, "~ts not found", [rabbit_misc:rs(QName)]);
{error, {absent, Q, Reason}} ->
absent(Q, Reason)
end;

handle_http_req(HttpMethod = <<"PUT">>,
Expand Down Expand Up @@ -107,44 +108,47 @@ handle_http_req(HttpMethod = <<"PUT">>,
rabbit_core_metrics:queue_declared(QName),

{Q1, NumMsgs, NumConsumers, StatusCode} =
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
try rabbit_amqqueue:assert_equivalence(
Q, Durable, AutoDelete, QArgs, Owner) of
ok ->
{ok, Msgs, Consumers} = rabbit_amqqueue:stat(Q),
{Q, Msgs, Consumers, <<"200">>}
catch exit:#amqp_error{name = precondition_failed,
explanation = Expl} ->
throw(<<"409">>, Expl, [])
end;
case rabbit_amqqueue:with(
QName,
fun(Q) ->
try rabbit_amqqueue:assert_equivalence(
Q, Durable, AutoDelete, QArgs, Owner) of
ok ->
{ok, Msgs, Consumers} = rabbit_amqqueue:stat(Q),
{ok, {Q, Msgs, Consumers, <<"200">>}}
catch exit:#amqp_error{name = precondition_failed,
explanation = Expl} ->
throw(<<"409">>, Expl, []);
exit:#amqp_error{explanation = Expl} ->
throw(<<"400">>, Expl, [])
end
end) of
{ok, Result} ->
Result;
{error, not_found} ->
case rabbit_amqqueue:not_found_or_absent_dirty(QName) of
not_found ->
ok = check_vhost_queue_limit(QName),
ok = check_dead_letter_exchange(QName, QArgs, User),
case rabbit_amqqueue:declare(
QName, Durable, AutoDelete, QArgs, Owner, Username) of
{new, Q} ->
rabbit_core_metrics:queue_created(QName),
{Q, 0, 0, <<"201">>};
{absent, Q, Reason} ->
absent(Q, Reason);
{existing, _Q} ->
%% Must have been created in the meantime. Loop around again.
handle_http_req(HttpMethod, PathSegments, Query,
ReqPayload, Vhost, User, ConnPid);
{owner_died, Q} ->
%% Presumably our own days are numbered since the
%% connection has died. Pretend the queue exists though,
%% just so nothing fails.
{Q, 0, 0, <<"201">>};
{protocol_error, _ErrorType, Reason, ReasonArgs} ->
throw(<<"400">>, Reason, ReasonArgs)
end;
ok = check_vhost_queue_limit(QName),
ok = check_dead_letter_exchange(QName, QArgs, User),
case rabbit_amqqueue:declare(
QName, Durable, AutoDelete, QArgs, Owner, Username) of
{new, Q} ->
rabbit_core_metrics:queue_created(QName),
{Q, 0, 0, <<"201">>};
{owner_died, Q} ->
%% Presumably our own days are numbered since the
%% connection has died. Pretend the queue exists though,
%% just so nothing fails.
{Q, 0, 0, <<"201">>};
{absent, Q, Reason} ->
absent(Q, Reason)
end
absent(Q, Reason);
{existing, _Q} ->
%% Must have been created in the meantime. Loop around again.
handle_http_req(HttpMethod, PathSegments, Query,
ReqPayload, Vhost, User, ConnPid);
{protocol_error, _ErrorType, Reason, ReasonArgs} ->
throw(<<"400">>, Reason, ReasonArgs)
end;
{error, {absent, Q, Reason}} ->
absent(Q, Reason)
end,

RespPayload = encode_queue(Q1, NumMsgs, NumConsumers),
Expand Down
9 changes: 1 addition & 8 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,7 @@
%% An even better approach in future would be to dynamically grow (or shrink) the link credit
%% we grant depending on how fast target queue(s) actually confirm messages.
-define(LINK_CREDIT_RCV, 128).

%% Pure HTTP clients of a future HTTP API (v2) would call endpoints as follows:
%% GET /v2/vhosts/:vhost/queues/:queue
%%
%% Here, we use the terminus address /management/v2 so that AMQP 1.0 clients declare the HTTP API version
%% at link attachment time. The vhost is already determined at AMQP connection open time.
%% Therefore, there is no need to send the HTTP API version and the vhost in every HTTP over AMQP request.
-define(MANAGEMENT_NODE_ADDRESS, <<"/management/v2">>).
-define(MANAGEMENT_NODE_ADDRESS, <<"/management">>).

-export([start_link/8,
process_frame/2,
Expand Down
19 changes: 19 additions & 0 deletions deps/rabbit/test/amqp_auth_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ groups() ->
declare_queue,
declare_queue_dlx_queue,
declare_queue_dlx_exchange,
declare_queue_vhost_queue_limit,
delete_queue,
purge_queue,
bind_queue_source,
Expand Down Expand Up @@ -620,6 +621,24 @@ declare_queue_dlx_exchange(Config) ->
rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps)),
ok = close_connection_sync(Conn).

declare_queue_vhost_queue_limit(Config) ->
QName = <<"🍿"/utf8>>,
ok = set_permissions(Config, QName, <<>>, <<>>),
Vhost = proplists:get_value(test_vhost, Config),
ok = rabbit_ct_broker_helpers:set_vhost_limit(Config, 0, Vhost, max_queues, 0),

Init = {_, _, LinkPair} = init_pair(Config),
{error, Resp} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
?assertMatch(#{subject := <<"403">>}, amqp10_msg:properties(Resp)),
?assertEqual(
#'v1_0.amqp_value'{
content = {utf8, <<"refused to declare queue '", QName/binary, "' in vhost 'test vhost' ",
"because vhost queue limit 0 is reached">>}},
amqp10_msg:body(Resp)),

ok = cleanup_pair(Init),
ok = rabbit_ct_broker_helpers:clear_vhost_limit(Config, 0, Vhost).

delete_queue(Config) ->
{Conn, _, LinkPair} = init_pair(Config),
QName = <<"🍿"/utf8>>,
Expand Down
1 change: 1 addition & 0 deletions deps/rabbitmq_amqp_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ TEST_DEPS = [
rabbitmq_integration_suite(
name = "management_SUITE",
size = "medium",
shard_count = 2,
deps = TEST_DEPS,
)

Expand Down
6 changes: 4 additions & 2 deletions deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
-include("rabbitmq_amqp_client.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").

-export[attach_management_link_pair_sync/2,
-export[
%% link pair operations
attach_management_link_pair_sync/2,
detach_management_link_pair_sync/1,

%% queue operations
Expand All @@ -30,7 +32,7 @@
].

-define(TIMEOUT, 15_000).
-define(MANAGEMENT_NODE_ADDRESS, <<"/management/v2">>).
-define(MANAGEMENT_NODE_ADDRESS, <<"/management">>).

-type arguments() :: #{binary() => {atom(), term()}}.

Expand Down
Loading

0 comments on commit dd49f80

Please sign in to comment.