Skip to content

Commit

Permalink
Add queue topology test
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed Mar 22, 2024
1 parent dd49f80 commit ce8324d
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 20 deletions.
34 changes: 24 additions & 10 deletions deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
QArgs = args_amqpl_to_amqp(QArgs091),
{Leader, Replicas} = queue_topology(Q),
KVList0 = [
{{utf8, <<"replicas">>}, {array, utf8, [{utf8, atom_to_binary(R)} || R <- Replicas]}},
{{utf8, <<"message_count">>}, {ulong, NumMsgs}},
{{utf8, <<"consumer_count">>}, {uint, NumConsumers}},
{{utf8, <<"name">>}, {utf8, QNameBin}},
Expand All @@ -383,28 +382,43 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
{{utf8, <<"type">>}, {utf8, rabbit_queue_type:to_binary(QType)}},
{{utf8, <<"arguments">>}, QArgs}
],
KVList1 = if is_list(Replicas) ->
[{{utf8, <<"replicas">>},
{array, utf8, [{utf8, atom_to_binary(R)} || R <- Replicas]}
} | KVList0];
Replicas =:= undefined ->
KVList0
end,
KVList = if is_atom(Leader) ->
[{{utf8, <<"leader">>}, {utf8, atom_to_binary(Leader)}} | KVList0];
[{{utf8, <<"leader">>},
{utf8, atom_to_binary(Leader)}
} | KVList1];
Leader =:= undefined ->
KVList0
KVList1
end,
{map, KVList}.

%% The returned Replicas contain both online and offline replicas.
-spec queue_topology(amqqueue:amqqueue()) ->
{Leader :: undefined | node(), Replicas :: [node(),...]}.
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
queue_topology(Q) ->
case amqqueue:get_type(Q) of
rabbit_quorum_queue ->
Leader = amqqueue:get_leader(Q),
[{members, Members}] = rabbit_quorum_queue:info(Q, [members]),
{Leader, Members};
rabbit_stream_queue ->
%% TODO How to best determine writer and replicas?
% #{name := StreamId} = amqqueue:get_type_state(Q),
% rabbit_stream_coordinator:members(StreamId)
[{leader, Leader},
{members, Members}] = rabbit_stream_queue:info(Q, [leader, members]),
{Leader, Members};
#{name := StreamId} = amqqueue:get_type_state(Q),
case rabbit_stream_coordinator:members(StreamId) of
{ok, Members} ->
maps:fold(fun(Node, {_Pid, writer}, {_, Replicas}) ->
{Node, [Node | Replicas]};
(Node, {_Pid, replica}, {Writer, Replicas}) ->
{Writer, [Node | Replicas]}
end, {undefined, []}, Members);
{error, _} ->
{undefined, undefined}
end;
_ ->
Pid = amqqueue:get_pid(Q),
Node = node(Pid),
Expand Down
78 changes: 68 additions & 10 deletions deps/rabbitmq_amqp_client/test/management_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ groups() ->
purge_stream
]},
{cluster_size_3, [shuffle],
[classic_queue_stopped
[classic_queue_stopped,
queue_topology
]}
].

Expand Down Expand Up @@ -563,19 +564,15 @@ declare_exchange_inequivalent_fields(Config) ->
ok = cleanup(Init).

classic_queue_stopped(Config) ->
OpnConf2 = connection_config(Config, 2),
{ok, Conn2} = amqp10_client:open_connection(OpnConf2),
{ok, Session2} = amqp10_client:begin_session_sync(Conn2),
{ok, LinkPair2} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session2, <<"my link pair">>),

Init2 = {_, _, LinkPair2} = init(Config, 2),
QName = <<"👌"/utf8>>,
{ok, #{durable := true,
type := <<"classic">>}} = rabbitmq_amqp_client:declare_queue(LinkPair2, QName, #{}),
ok = amqp10_client:close_connection(Conn2),
ok = cleanup(Init2),
ok = rabbit_ct_broker_helpers:stop_node(Config, 2),
%% Classic queue is now stopped.

Init = {_, _, LinkPair0} = init(Config),
Init0 = {_, _, LinkPair0} = init(Config),
{error, Resp0} = rabbitmq_amqp_client:declare_queue(LinkPair0, QName, #{}),
?assertMatch(#{subject := <<"400">>}, amqp10_msg:properties(Resp0)),
ExpectedResponseBody = #'v1_0.amqp_value'{
Expand All @@ -591,7 +588,7 @@ classic_queue_stopped(Config) ->

ok = rabbit_ct_broker_helpers:start_node(Config, 2),
{ok, #{}} = rabbitmq_amqp_client:delete_queue(LinkPair0, QName),
ok = cleanup(Init).
ok = cleanup(Init0).

delete_default_exchange(Config) ->
Init = {_, _, LinkPair} = init(Config),
Expand Down Expand Up @@ -731,8 +728,69 @@ purge_stream(Config) ->
{ok, #{}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = cleanup(Init).

queue_topology(Config) ->
NodeNames = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Nodes = [N0, N1, N2] = lists:map(fun erlang:atom_to_binary/1, NodeNames),
Init0 = {_, _, LinkPair0} = init(Config, 0),

CQName = <<"my classic queue">>,
QQName = <<"my quorum queue">>,
SQName = <<"my stream queue">>,

CQProps = #{arguments => #{<<"x-queue-type">> => {utf8, <<"classic">>}}},
QQProps = #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}},
SQProps = #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}},

{ok, CQInfo0} = rabbitmq_amqp_client:declare_queue(LinkPair0, CQName, CQProps),
{ok, QQInfo0} = rabbitmq_amqp_client:declare_queue(LinkPair0, QQName, QQProps),
{ok, SQInfo0} = rabbitmq_amqp_client:declare_queue(LinkPair0, SQName, SQProps),

%% The default queue leader strategy is client-local.
?assertEqual({ok, N0}, maps:find(leader, CQInfo0)),
?assertEqual({ok, N0}, maps:find(leader, QQInfo0)),
?assertEqual({ok, N0}, maps:find(leader, SQInfo0)),

?assertEqual({ok, [N0]}, maps:find(replicas, CQInfo0)),
{ok, QQReplicas0} = maps:find(replicas, QQInfo0),
?assertEqual(Nodes, lists:usort(QQReplicas0)),
{ok, SQReplicas0} = maps:find(replicas, SQInfo0),
?assertEqual(Nodes, lists:usort(SQReplicas0)),

ok = cleanup(Init0),
ok = rabbit_ct_broker_helpers:stop_node(Config, 0),

Init2 = {_, _, LinkPair2} = init(Config, 2),
{ok, QQInfo2} = rabbitmq_amqp_client:get_queue(LinkPair2, QQName),
{ok, SQInfo2} = rabbitmq_amqp_client:get_queue(LinkPair2, SQName),

case maps:get(leader, QQInfo2) of
N1 -> ok;
N2 -> ok;
Other0 -> ct:fail({?LINE, Other0})
end,
case maps:get(leader, SQInfo2) of
N1 -> ok;
N2 -> ok;
Other1 -> ct:fail({?LINE, Other1})
end,

%% Replicas should include both online and offline replicas.
{ok, QQReplicas2} = maps:find(replicas, QQInfo2),
?assertEqual(Nodes, lists:usort(QQReplicas2)),
{ok, SQReplicas2} = maps:find(replicas, SQInfo2),
?assertEqual(Nodes, lists:usort(SQReplicas2)),

ok = rabbit_ct_broker_helpers:start_node(Config, 0),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair2, CQName),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair2, QQName),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair2, SQName),
ok = cleanup(Init2).

init(Config) ->
OpnConf = connection_config(Config),
init(Config, 0).

init(Config, Node) ->
OpnConf = connection_config(Config, Node),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>),
Expand Down

0 comments on commit ce8324d

Please sign in to comment.