Skip to content
Draft

Ra v3 #13885

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0312b19
Ra v3
kjnilsson May 12, 2025
48b9407
ra v3 wip
kjnilsson Sep 9, 2025
ae4483b
QQ: implement snapshot_installed/4 callback
kjnilsson Sep 16, 2025
8edccae
QQ: add v8 modules
kjnilsson Sep 22, 2025
283d318
remove no longer needed test
kjnilsson Sep 24, 2025
1ed1ff2
QQ: remove use of rabbit_fifo_index
kjnilsson Sep 26, 2025
ddd32c6
QQ: track discarded bytes and take snapshots based on that.
kjnilsson Oct 3, 2025
9c55f28
QQ: fix test affected by automated rename of rabbit_fifo:apply/3
kjnilsson Oct 6, 2025
a29c9dd
QQ: Formatting and test flake improvement
kjnilsson Oct 6, 2025
c27dbec
QQ: optimise memory for message references.
kjnilsson Oct 7, 2025
ee9043b
QQ: fix effects ordering
kjnilsson Oct 10, 2025
41cef46
minor refactoring
kjnilsson Oct 14, 2025
c57c47a
QQ: allow unlimited returns.
kjnilsson Oct 17, 2025
2ff1b30
QQ: Better noconnection handling
kjnilsson Oct 24, 2025
7a8fc1c
QQ strict priority queue
kjnilsson Nov 7, 2025
3c38b14
refactor how tests get message counts
kjnilsson Nov 10, 2025
74e06db
QQ: Make rabbit_fifo_client:stat/2 backwards compatible.
kjnilsson Nov 11, 2025
235bd37
QQ: record lengths of each priority queue
kjnilsson Nov 12, 2025
417a2d9
Redeliver all pending consumer messages in correct order
ansd Nov 13, 2025
67a3b13
Redeliver in correct order
ansd Nov 13, 2025
eb69cd7
QQ: display per priority message counts in mgmt UI.
kjnilsson Nov 14, 2025
fe17615
Fix extracting message header
ansd Nov 14, 2025
b4f44ad
Tiny performance improvements
ansd Nov 14, 2025
4399315
Optimise get_lowest_index/1
ansd Nov 14, 2025
c104dff
Include DLX consumer in smallest_raft_index/1
ansd Nov 14, 2025
5a56c37
fix rabbit_fifo_prop_SUITE
kjnilsson Nov 18, 2025
27de8d6
QQ: wip - improve message expiry
kjnilsson Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ PARALLEL_CT_SET_2_C = disk_monitor dynamic_qq unit_disk_monitor unit_file_handle
PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_reconciliation rabbit_fifo rabbit_fifo_dlx rabbit_stream_coordinator

PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue
PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_online_and_offline logging lqueue maintenance_mode rabbit_fifo_q
PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_online_and_offline logging lqueue maintenance_mode rabbit_fifo_q rabbit_fifo_pq
PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_size_limit metadata_store_migration
PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor

Expand Down
2,011 changes: 1,232 additions & 779 deletions deps/rabbit/src/rabbit_fifo.erl

Large diffs are not rendered by default.

69 changes: 62 additions & 7 deletions deps/rabbit/src/rabbit_fifo.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,26 @@

-define(DELIVERY_SEND_MSG_OPTS, [local, ra_event]).

%% constants for packed msg references where both the raft index and the size
%% is packed into a single immidate term
%%
%% 59 bytes as immedate ints are signed
-define(PACKED_MAX, 16#7FFF_FFFF_FFFF_FFF).
%% index bits - enough for 2000 days at 100k indexes p/sec
-define(PACKED_IDX_BITS, 44).
-define(PACKED_IDX_MAX, 16#FFFF_FFFF_FFF).
-define(PACKED_SZ_BITS, 15). %% size
-define(PACKED_SZ_MAX, 16#7FFF). %% 15 bits

-define(PACK(Idx, Sz),
(Idx bxor (Sz bsl ?PACKED_IDX_BITS))).
-define(PACKED_IDX(PackedInt),
(PackedInt band ?PACKED_IDX_MAX)).
-define(PACKED_SZ(PackedInt),
((PackedInt bsr 44) band 16#7FFF)).

-define(IS_PACKED(Int), (Int >= 0 andalso Int =< ?PACKED_MAX)).

-type optimised_tuple(A, B) :: nonempty_improper_list(A, B).

-type option(T) :: undefined | T.
Expand Down Expand Up @@ -57,7 +77,10 @@
-type msg_size() :: non_neg_integer().
%% the size in bytes of the msg payload

-type msg() :: optimised_tuple(ra:index(), msg_header()).
%% 60 byte integer, immediate
-type packed_msg() :: 0..?PACKED_MAX.

-type msg() :: packed_msg() | optimised_tuple(ra:index(), msg_header()).

-type delivery_msg() :: {msg_id(), {msg_header(), raw_msg()}}.
%% A tuple consisting of the message id, and the headered message.
Expand Down Expand Up @@ -105,6 +128,7 @@
%% once these many bytes have been written since the last checkpoint
%% we request a checkpoint irrespectively
-define(CHECK_MAX_BYTES, 128_000_000).
-define(SNAP_OUT_BYTES, 64_000_000).

-define(USE_AVG_HALF_LIFE, 10000.0).
%% an average QQ without any message uses about 100KB so setting this limit
Expand All @@ -129,9 +153,11 @@
lifetime = once :: once | auto,
priority = 0 :: integer()}).

-type consumer_status() :: up | cancelled | quiescing.

-record(consumer,
{cfg = #consumer_cfg{},
status = up :: up | suspected_down | cancelled | quiescing,
status = up :: consumer_status() | {suspected_down, consumer_status()},
next_msg_id = 0 :: msg_id(),
checked_out = #{} :: #{msg_id() => msg()},
%% max number of messages that can be sent
Expand Down Expand Up @@ -179,16 +205,43 @@
unused_3 = ?NIL
}).

-record(messages,
{
messages = rabbit_fifo_pq:new() :: rabbit_fifo_pq:state(),
messages_total = 0 :: non_neg_integer(),
% queue of returned msg_in_ids - when checking out it picks from
returns = lqueue:new() :: lqueue:lqueue(term())
}).

-record(dlx_consumer,
{pid :: pid(),
prefetch :: non_neg_integer(),
checked_out = #{} :: #{msg_id() =>
optimised_tuple(rabbit_dead_letter:reason(), msg())},
next_msg_id = 0 :: msg_id()}).

-record(rabbit_fifo_dlx,
{consumer :: option(#dlx_consumer{}),
%% Queue of dead-lettered messages.
discards = lqueue:new() :: lqueue:lqueue(optimised_tuple(rabbit_dead_letter:reason(), msg())),
%% Raft indexes of messages in both discards queue and dlx_consumer's checked_out map
%% so that we get the smallest ra index in O(1).
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
msg_bytes = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer()}).

-record(rabbit_fifo,
{cfg :: #cfg{},
% unassigned messages
messages = rabbit_fifo_q:new() :: rabbit_fifo_q:state(),
messages = rabbit_fifo_pq:new() :: rabbit_fifo_pq:state(),
messages_total = 0 :: non_neg_integer(),
% queue of returned msg_in_ids - when checking out it picks from
returns = lqueue:new() :: lqueue:lqueue(term()),
% a counter of enqueues - used to trigger shadow copy points
% discareded bytes - a counter that is incremented every time a command
% is procesesed that does not need to be kept (live indexes).
% Approximate, used for triggering snapshots
% reset to 0 when release_cursor gets stored
enqueue_count = 0 :: non_neg_integer(),
discarded_bytes = 0,
% a map containing all the live processes that have ever enqueued
% a message to this queue
enqueuers = #{} :: #{pid() => #enqueuer{}},
Expand All @@ -197,19 +250,21 @@
% rabbit_fifo_index can be slow when calculating the smallest
% index when there are large gaps but should be faster than gb_trees
% for normal appending operations as it's backed by a map
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
unused_0 = ?NIL,
unused_1 = ?NIL,
% consumers need to reflect consumer state at time of snapshot
consumers = #{} :: #{consumer_key() => consumer()},
% consumers that require further service are queued here
service_queue = priority_queue:new() :: priority_queue:q(),
%% state for at-least-once dead-lettering
dlx = rabbit_fifo_dlx:init() :: rabbit_fifo_dlx:state(),
dlx = #rabbit_fifo_dlx{} :: #rabbit_fifo_dlx{},
msg_bytes_enqueue = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer(),
%% one is picked if active consumer is cancelled or dies
%% used only when single active consumer is on
waiting_consumers = [] :: [{consumer_key(), consumer()}],
%% records the timestamp whenever the queue was last considered
%% active in terms of consumer activity
last_active :: option(non_neg_integer()),
msg_cache :: option({ra:index(), raw_msg()}),
unused_2 = ?NIL
Expand Down
70 changes: 32 additions & 38 deletions deps/rabbit/src/rabbit_fifo_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ enqueue(QName, Correlation, Msg,
{reject_publish, State0};
{error, {shutdown, delete}} ->
?LOG_DEBUG("~ts: QQ ~ts tried to register enqueuer during delete shutdown",
[?MODULE, rabbit_misc:rs(QName)]),
[?MODULE, rabbit_misc:rs(QName)]),
{reject_publish, State0};
{timeout, _} ->
{reject_publish, State0};
Err ->
?LOG_DEBUG("~ts: QQ ~ts error when registering enqueuer ~p",
[?MODULE, rabbit_misc:rs(QName), Err]),
[?MODULE, rabbit_misc:rs(QName), Err]),
exit(Err)
end;
enqueue(_QName, _Correlation, _Msg,
Expand Down Expand Up @@ -373,24 +373,12 @@ checkout(ConsumerTag, CreditMode, #{} = Meta,
is_tuple(CreditMode) ->
Servers = sorted_servers(State0),
ConsumerId = consumer_id(ConsumerTag),
Spec = case rabbit_fifo:is_v4() of
true ->
case CreditMode of
{simple_prefetch, 0} ->
{auto, {simple_prefetch,
?UNLIMITED_PREFETCH_COUNT}};
_ ->
{auto, CreditMode}
end;
false ->
case CreditMode of
{credited, _} ->
{auto, 0, credited};
{simple_prefetch, 0} ->
{auto, ?UNLIMITED_PREFETCH_COUNT, simple_prefetch};
{simple_prefetch, Num} ->
{auto, Num, simple_prefetch}
end
Spec = case CreditMode of
{simple_prefetch, 0} ->
{auto, {simple_prefetch,
?UNLIMITED_PREFETCH_COUNT}};
_ ->
{auto, CreditMode}
end,
Cmd = rabbit_fifo:make_checkout(ConsumerId, Spec, Meta),
%% ???
Expand All @@ -415,13 +403,12 @@ checkout(ConsumerTag, CreditMode, #{} = Meta,
end
end,
ConsumerKey = maps:get(key, Reply, ConsumerId),
SDels = maps:update_with(
ConsumerTag,
fun (C) -> C#consumer{ack = Ack} end,
#consumer{key = ConsumerKey,
last_msg_id = LastMsgId,
ack = Ack},
CDels0),
SDels = maps:update_with(ConsumerTag,
fun (C) -> C#consumer{ack = Ack} end,
#consumer{key = ConsumerKey,
last_msg_id = LastMsgId,
ack = Ack},
CDels0),
{ok, Reply, State0#state{leader = Leader,
consumers = SDels}};
Err ->
Expand Down Expand Up @@ -534,10 +521,16 @@ stat(Leader) ->
stat(Leader, Timeout) ->
%% short timeout as we don't want to spend too long if it is going to
%% fail anyway
case ra:local_query(Leader, fun rabbit_fifo:query_stat/1, Timeout) of
{ok, {_, {R, C}}, _} -> {ok, R, C};
{error, _} = Error -> Error;
{timeout, _} = Error -> Error
%% TODO: the overview is too large to be super efficient
%% but we use it for backwards compatibilty
case ra:member_overview(Leader, Timeout) of
{ok, #{machine := #{num_ready_messages := R,
num_checked_out := C}}, _} ->
{ok, R, C};
{error, _} = Error ->
Error;
{timeout, _} = Error ->
Error
end.

update_machine_state(Server, Conf) ->
Expand Down Expand Up @@ -994,13 +987,14 @@ send_command(Server, Correlation, Command, Priority,
#state{pending = Pending,
next_seq = Seq,
cfg = #cfg{soft_limit = SftLmt}} = State) ->
ok = case rabbit_fifo:is_return(Command) of
true ->
%% returns are sent to the aux machine for pre-evaluation
ra:cast_aux_command(Server, {Command, Seq, self()});
_ ->
ra:pipeline_command(Server, Command, Seq, Priority)
end,
% ok = case rabbit_fifo:is_return(Command) of
% true ->
% %% returns are sent to the aux machine for pre-evaluation
% ra:cast_aux_command(Server, {Command, Seq, self()});
% _ ->
% ra:pipeline_command(Server, Command, Seq, Priority)
% end,
ok = ra:pipeline_command(Server, Command, Seq, Priority),
State#state{pending = Pending#{Seq => {Correlation, Command}},
next_seq = Seq + 1,
slow = map_size(Pending) >= SftLmt}.
Expand Down
13 changes: 9 additions & 4 deletions deps/rabbit/src/rabbit_fifo_dlx.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
-module(rabbit_fifo_dlx).

-include("rabbit_fifo_dlx.hrl").
-include("rabbit_fifo.hrl").
-include("rabbit_fifo_v7.hrl").
-include_lib("kernel/include/logger.hrl").
-compile({no_auto_import, [apply/3]}).

Expand All @@ -26,7 +26,8 @@
dehydrate/1,
stat/1,
update_config/4,
smallest_raft_index/1
smallest_raft_index/1,
live_indexes/1
]).

-record(checkout, {consumer :: pid(),
Expand Down Expand Up @@ -164,7 +165,7 @@ discard(Msgs0, Reason, {at_most_once, {Mod, Fun, Args}}, State) ->
Cmd = maps:get(Idx, Lookup),
%% ensure header delivery count
%% is copied to the message container
annotate_msg(H, rabbit_fifo:get_msg(Cmd))
annotate_msg(H, rabbit_fifo:get_msg_from_cmd(Cmd))
end || ?MSG(Idx, H) <- Msgs0],
[{mod_call, Mod, Fun, Args ++ [Reason, Msgs]}]
end},
Expand Down Expand Up @@ -237,7 +238,7 @@ delivery_effects(CPid, Msgs0) ->
Msgs = lists:zipwith(
fun (Cmd, {Reason, H, MsgId}) ->
{MsgId, {Reason,
annotate_msg(H, rabbit_fifo:get_msg(Cmd))}}
annotate_msg(H, rabbit_fifo:get_msg_from_cmd(Cmd))}}
end, Log, RsnIds),
[{send_msg, CPid, {dlx_event, self(), {dlx_delivery, Msgs}}, [cast]}]
end}].
Expand Down Expand Up @@ -365,5 +366,9 @@ dehydrate(State) ->
smallest_raft_index(#?MODULE{ra_indexes = Indexes}) ->
rabbit_fifo_index:smallest(Indexes).

-spec live_indexes(state()) -> [ra:index()].
live_indexes(#?MODULE{ra_indexes = Indexes}) ->
rabbit_fifo_index:indexes(Indexes).

annotate_msg(H, Msg) ->
rabbit_fifo:annotate_msg(H, Msg).
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_fifo_dlx.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
-record(dlx_consumer,
{pid :: pid(),
prefetch :: non_neg_integer(),
checked_out = #{} :: #{msg_id() => optimised_tuple(rabbit_dead_letter:reason(), msg())},
checked_out = #{} :: #{msg_id() =>
optimised_tuple(rabbit_dead_letter:reason(), msg())},
next_msg_id = 0 :: msg_id()}).

-record(rabbit_fifo_dlx,
Expand Down
5 changes: 5 additions & 0 deletions deps/rabbit/src/rabbit_fifo_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
delete/2,
size/1,
smallest/1,
indexes/1,
map/2,
to_list/1
]).
Expand Down Expand Up @@ -90,6 +91,10 @@ size(#?MODULE{data = Data}) ->
smallest(#?MODULE{smallest = Smallest}) ->
Smallest.

-spec indexes(state()) -> [ra:index()].
indexes(#?MODULE{data = Data}) ->
maps:keys(Data).

-spec map(fun(), state()) -> state().
map(F, #?MODULE{data = Data} = State) ->
State#?MODULE{data = maps:map(F, Data)}.
Expand Down
Loading
Loading