Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions deps/rabbit/src/amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
set_decorators/2,
% exclusive_owner
get_exclusive_owner/1,
get_leader/1,
get_leader_node/1,
get_nodes/1,
% name (#resource)
get_name/1,
set_name/2,
Expand Down Expand Up @@ -387,9 +388,21 @@ set_decorators(#amqqueue{} = Queue, Decorators) ->
get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) ->
Owner.

-spec get_leader(amqqueue_v2()) -> node().
-spec get_leader_node(amqqueue_v2()) -> node() | none.

get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader.
get_leader_node(#amqqueue{pid = {_, Leader}}) -> Leader;
get_leader_node(#amqqueue{pid = none}) -> none;
get_leader_node(#amqqueue{pid = Pid}) -> node(Pid).

-spec get_nodes(amqqueue_v2()) -> [node(),...].

get_nodes(Q) ->
case amqqueue:get_type_state(Q) of
#{nodes := Nodes} ->
Nodes;
_ ->
[get_leader_node(Q)]
end.

% operator_policy

Expand Down
16 changes: 2 additions & 14 deletions deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -463,20 +463,8 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
-spec queue_topology(amqqueue:amqqueue()) ->
{Leader :: node() | none, Replicas :: [node(),...]}.
queue_topology(Q) ->
Leader = case amqqueue:get_pid(Q) of
{_RaName, Node} ->
Node;
none ->
none;
Pid ->
node(Pid)
end,
Replicas = case amqqueue:get_type_state(Q) of
#{nodes := Nodes} ->
Nodes;
_ ->
[Leader]
end,
Leader = amqqueue:get_leader_node(Q),
Replicas = amqqueue:get_nodes(Q),
{Leader, Replicas}.

decode_exchange({map, KVList}) ->
Expand Down
66 changes: 38 additions & 28 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
-export([update/2, store_queue/1, update_decorators/2, policy_changed/2]).
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
-export([is_match/2, is_in_virtual_host/2]).
-export([is_replicated/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
-export([is_replicable/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0,
list_local_stream_queues/0, list_stream_queues_on/1,
list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1,
Expand Down Expand Up @@ -150,11 +150,7 @@ filter_pid_per_type(QPids) ->

-spec stop(rabbit_types:vhost()) -> 'ok'.
stop(VHost) ->
%% Classic queues
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:stop(VHost),
rabbit_quorum_queue:stop(VHost).
rabbit_queue_type:stop(VHost).

-spec start([amqqueue:amqqueue()]) -> 'ok'.

Expand Down Expand Up @@ -424,14 +420,16 @@ rebalance(Type, VhostSpec, QueueSpec) ->
%% We have not yet acquired the rebalance_queues global lock.
maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec).

%% TODO: classic queues do not support rebalancing, it looks like they are simply
%% filtered out with is_replicable(Q). Maybe error instead?
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
rabbit_log:info("Starting queue rebalance operation: '~ts' for vhosts matching '~ts' and queues matching '~ts'",
[Type, VhostSpec, QueueSpec]),
Running = rabbit_maintenance:filter_out_drained_nodes_consistent_read(rabbit_nodes:list_running()),
NumRunning = length(Running),
ToRebalance = [Q || Q <- list(),
filter_per_type(Type, Q),
is_replicated(Q),
is_replicable(Q),
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)],
NumToRebalance = length(ToRebalance),
Expand Down Expand Up @@ -459,10 +457,20 @@ filter_per_type(stream, Q) ->
filter_per_type(classic, Q) ->
?amqqueue_is_classic(Q).

rebalance_module(Q) when ?amqqueue_is_quorum(Q) ->
rabbit_quorum_queue;
rebalance_module(Q) when ?amqqueue_is_stream(Q) ->
rabbit_stream_queue.
%% TODO: note that it can return {error, not_supported}.
%% this will result in a badmatch. However that's fine
%% for now because the original function will fail with
%% bad clause if called with classical queue.
%% The assumption is all non-replicated queues
%% are filtered before calling this with is_replicable/0
rebalance_module(Q) ->
case rabbit_queue_type:rebalance_module(Q) of
undefined ->
rabbit_log:error("Undefined rebalance module for queue type: ~s", [amqqueue:get_type(Q)]),
{error, not_supported};
RBModule ->
RBModule
end.

get_resource_name(#resource{name = Name}) ->
Name.
Expand All @@ -487,13 +495,19 @@ iterative_rebalance(ByNode, MaxQueuesDesired) ->
maybe_migrate(ByNode, MaxQueuesDesired) ->
maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)).

%% TODO: unfortunate part - UI bits mixed deep inside logic.
%% I will not be moving this inside queue type. Instead
%% an attempt to generate something more readable than
%% Other made.
column_name(rabbit_classic_queue) -> <<"Number of replicated classic queues">>;
column_name(rabbit_quorum_queue) -> <<"Number of quorum queues">>;
column_name(rabbit_stream_queue) -> <<"Number of streams">>;
column_name(Other) -> Other.
column_name(TypeModule) ->
Alias = rabbit_queue_type:short_alias_of(TypeModule),
<<"Number of \"", Alias/binary, "\" queues">>.

maybe_migrate(ByNode, _, []) ->
ByNodeAndType = maps:map(fun(_Node, Queues) -> maps:groups_from_list(fun({_, Q, _}) -> column_name(?amqqueue_v2_field_type(Q)) end, Queues) end, ByNode),
ByNodeAndType = maps:map(fun(_Node, Queues) -> maps:groups_from_list(fun({_, Q, _}) -> column_name(amqqueue:get_type(Q)) end, Queues) end, ByNode),
CountByNodeAndType = maps:map(fun(_Node, Type) -> maps:map(fun (_, Qs)-> length(Qs) end, Type) end, ByNodeAndType),
{ok, maps:values(maps:map(fun(Node,Counts) -> [{<<"Node name">>, Node} | maps:to_list(Counts)] end, CountByNodeAndType))};
maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
Expand Down Expand Up @@ -1281,14 +1295,12 @@ list_durable() ->

-spec list_by_type(atom()) -> [amqqueue:amqqueue()].

list_by_type(classic) -> list_by_type(rabbit_classic_queue);
list_by_type(quorum) -> list_by_type(rabbit_quorum_queue);
list_by_type(stream) -> list_by_type(rabbit_stream_queue);
list_by_type(Type) ->
rabbit_db_queue:get_all_durable_by_type(Type).
list_by_type(TypeDescriptor) ->
TypeModule = rabbit_queue_type:discover(TypeDescriptor),
rabbit_db_queue:get_all_durable_by_type(TypeModule).

%% TODO: looks unused
-spec list_local_quorum_queue_names() -> [name()].

list_local_quorum_queue_names() ->
[ amqqueue:get_name(Q) || Q <- list_by_type(quorum),
amqqueue:get_state(Q) =/= crashed,
Expand All @@ -1313,18 +1325,19 @@ list_stream_queues_on(Node) when is_atom(Node) ->
list_local_leaders() ->
[ Q || Q <- list(),
amqqueue:is_quorum(Q),
amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =:= node()].
amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader_node(Q) =:= node()].

-spec list_local_followers() -> [amqqueue:amqqueue()].
list_local_followers() ->
[Q
|| Q <- list(),
amqqueue:is_quorum(Q),
amqqueue:get_leader(Q) =/= node(),
amqqueue:get_leader_node(Q) =/= node(),
lists:member(node(), get_quorum_nodes(Q)),
rabbit_quorum_queue:is_recoverable(Q)
].

%% TODO: looks unused
-spec list_local_quorum_queues_with_name_matching(binary()) -> [amqqueue:amqqueue()].
list_local_quorum_queues_with_name_matching(Pattern) ->
[ Q || Q <- list_by_type(quorum),
Expand Down Expand Up @@ -1909,13 +1922,10 @@ forget_node_for_queue(Q) ->
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).

-spec is_replicated(amqqueue:amqqueue()) -> boolean().
-spec is_replicable(amqqueue:amqqueue()) -> boolean().

is_replicated(Q) when ?amqqueue_is_classic(Q) ->
false;
is_replicated(_Q) ->
%% streams and quorum queues are all replicated
true.
is_replicable(Q) ->
rabbit_queue_type:is_replicable(Q).

is_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) ->
false;
Expand Down Expand Up @@ -1985,7 +1995,7 @@ filter_transient_queues_to_delete(Node) ->
amqqueue:qnode(Q) == Node andalso
not rabbit_process:is_process_alive(amqqueue:get_pid(Q))
andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q))
andalso (not is_replicated(Q)
andalso (not is_replicable(Q)
orelse is_dead_exclusive(Q))
andalso amqqueue:get_type(Q) =/= rabbit_mqtt_qos0_queue
end.
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbit/src/rabbit_boot_steps.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
%% README: https://github.com/rabbitmq/internals/blob/master/rabbit_boot_process.md
%%

-module(rabbit_boot_steps).

Expand Down
17 changes: 12 additions & 5 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1279,11 +1279,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
?INCR_STATS(queue_stats, QueueName, 1, get_empty, State),
{reply, #'basic.get_empty'{}, State#ch{queue_states = QueueStates}};
{error, {unsupported, single_active_consumer}} ->
rabbit_misc:protocol_error(
resource_locked,
"cannot obtain access to locked ~ts. basic.get operations "
"are not supported by quorum queues with single active consumer",
[rabbit_misc:rs(QueueName)]);
rabbit_amqqueue:with_or_die(QueueName, fun unsupported_single_active_consumer_error/1);
{error, Reason} ->
%% TODO add queue type to error message
rabbit_misc:protocol_error(internal_error,
Expand Down Expand Up @@ -1996,6 +1992,7 @@ foreach_per_queue(_F, [], Acc) ->
foreach_per_queue(F, [#pending_ack{tag = CTag,
queue = QName,
msg_id = MsgId}], Acc) ->
%% TODO: fix this abstraction leak
%% quorum queue, needs the consumer tag
F({QName, CTag}, [MsgId], Acc);
foreach_per_queue(F, UAL, Acc) ->
Expand Down Expand Up @@ -2023,6 +2020,7 @@ notify_limiter(Limiter, Acked) ->
case rabbit_limiter:is_active(Limiter) of
false -> ok;
true -> case lists:foldl(fun (#pending_ack{tag = CTag}, Acc) when is_integer(CTag) ->
%% TODO: fix absctraction leak
%% Quorum queues use integer CTags
%% classic queues use binaries
%% Quorum queues do not interact
Expand Down Expand Up @@ -2787,3 +2785,12 @@ maybe_decrease_global_publishers(#ch{publishing_mode = true}) ->

is_global_qos_permitted() ->
rabbit_deprecated_features:is_permitted(global_qos).

-spec unsupported_single_active_consumer_error(amqqueue:amqqueue()) -> no_return().
unsupported_single_active_consumer_error(Q) ->
rabbit_misc:protocol_error(
resource_locked,
"cannot obtain access to locked ~ts. basic.get operations "
"are not supported by ~p queues with single active consumer",
[rabbit_misc:rs(amqqueue:get_name(Q)),
rabbit_queue_type:short_alias_of(amqqueue:get_type(Q))]).
56 changes: 54 additions & 2 deletions deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,26 @@
send_drained_credit_api_v1/4,
send_credit_reply/7]).

-export([policy_apply_to_name/0,
stop/1,
list_with_minimum_quorum/0,
drain/1,
revive/0,
queue_vm_stats_sups/0,
queue_vm_ets/0]).

-export([validate_policy/1]).

-rabbit_boot_step(
{rabbit_classic_queue_type,
[{description, "Classic queue: queue type"},
{mfa, {rabbit_registry, register,
[queue, <<"classic">>, ?MODULE]}},
{cleanup, {rabbit_registry, unregister,
[queue, <<"classic">>]}},
{requires, rabbit_registry},
{enables, ?MODULE}]}).

-rabbit_boot_step(
{?MODULE,
[{description, "Deprecated queue-master-locator support."
Expand All @@ -74,7 +92,7 @@
[policy_validator, <<"queue-master-locator">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[operator_policy_validator, <<"queue-master-locator">>, ?MODULE]}},
{requires, rabbit_registry},
{requires, [rabbit_classic_queue_type]},
{enables, recovery}]}).

validate_policy(Args) ->
Expand Down Expand Up @@ -590,7 +608,11 @@ capabilities() ->
false -> []
end,
consumer_arguments => [<<"x-priority">>],
server_named => true}.
server_named => true,
rebalance_module => undefined,
can_redeliver => false,
is_replicable => false
}.

notify_decorators(Q) when ?is_amqqueue(Q) ->
QPid = amqqueue:get_pid(Q),
Expand Down Expand Up @@ -678,3 +700,33 @@ send_credit_reply(Pid, QName, Ctag, DeliveryCount, Credit, Available, Drain) ->

send_queue_event(Pid, QName, Event) ->
gen_server:cast(Pid, {queue_event, QName, Event}).

policy_apply_to_name() ->
<<"classic_queues">>.

stop(VHost) ->
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:stop(VHost).

list_with_minimum_quorum() ->
[].

drain(_TransferCandidates) ->
ok.

revive() ->
ok.

queue_vm_stats_sups() ->
{[queue_procs], [rabbit_vm:all_vhosts_children(rabbit_amqqueue_sup_sup)]}.

%% return nothing because of this line in rabbit_vm:
%% {msg_index, MsgIndexETS + MsgIndexProc},
%% it mixes procs and ets,
%% TODO: maybe instead of separating sups and ets
%% I need vm_memory callback that just
%% returns proplist? And rabbit_vm calculates
%% Other as usual by substraction.
queue_vm_ets() ->
{[], []}.
9 changes: 2 additions & 7 deletions deps/rabbit/src/rabbit_definitions.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1045,16 +1045,11 @@ list_queues() ->

queue_definition(Q) ->
#resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Q),
Type = case amqqueue:get_type(Q) of
rabbit_classic_queue -> classic;
rabbit_quorum_queue -> quorum;
rabbit_stream_queue -> stream;
T -> T
end,
TypeModule = amqqueue:get_type(Q),
#{
<<"vhost">> => VHost,
<<"name">> => Name,
<<"type">> => Type,
<<"type">> => rabbit_registry:lookup_type_name(queue, TypeModule),
<<"durable">> => amqqueue:is_durable(Q),
<<"auto_delete">> => amqqueue:is_auto_delete(Q),
<<"arguments">> => rabbit_misc:amqp_table(amqqueue:get_arguments(Q))
Expand Down
11 changes: 1 addition & 10 deletions deps/rabbit/src/rabbit_fifo_dlx_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -537,16 +537,7 @@ redeliver0(#pending{delivery = Msg0,
[rabbit_amqqueue:name()].
clients_redeliver(Qs, QTypeState) ->
lists:filter(fun(Q) ->
case rabbit_queue_type:module(Q, QTypeState) of
{ok, rabbit_quorum_queue} ->
% If #enqueue{} Raft command does not get applied
% rabbit_fifo_client will resend.
true;
{ok, rabbit_stream_queue} ->
true;
_ ->
false
end
rabbit_queue_type:can_redeliver(Q, QTypeState)
end, Qs).

maybe_set_timer(#state{timer = TRef} = State)
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_global_counters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) ->
end,
counters:add(fetch(QueueType, DeadLetterStrategy), Index, Num).

messages_dead_lettered_confirmed(rabbit_quorum_queue, at_least_once, Num) ->
counters:add(fetch(rabbit_quorum_queue, at_least_once), ?MESSAGES_DEAD_LETTERED_CONFIRMED, Num).
messages_dead_lettered_confirmed(QTypeModule, at_least_once, Num) ->
counters:add(fetch(QTypeModule, at_least_once), ?MESSAGES_DEAD_LETTERED_CONFIRMED, Num).

fetch(Protocol) ->
persistent_term:get({?MODULE, Protocol}).
Expand Down
Loading
Loading