Skip to content

Commit

Permalink
Use initial_machine_version config to avoid initalising
Browse files Browse the repository at this point in the history
from rabbit_fifo version 0.

QQ: avoid dead lock in queue federation.

When processing the queue federation startup even the process
may call back into the ra process causing a deadlock. in this
case we spawn a temporary process to avoid this.
  • Loading branch information
kjnilsson committed Jan 15, 2025
1 parent 4c30372 commit 562756c
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 17 deletions.
4 changes: 2 additions & 2 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ erlang_package.hex_package(
name = "ra",
build_file = "@rabbitmq-server//bazel:BUILD.ra",
pkg = "ra",
sha256 = "a7eae50b0c1c0be4daf9b7ee97be796e6fda372ea6b2047c3aeac89cdc2011df",
version = "2.16.0-pre.10",
sha256 = "cfc0dbe5ebbd54f44081f95ea6a1daeb28a89df82aa9baa234f68abbb36bdc67",
version = "2.16.0-pre.11",
)

erlang_package.git_package(
Expand Down
24 changes: 18 additions & 6 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,14 @@ start_cluster(Q) ->
NewQ1 = amqqueue:set_type_state(NewQ0,
#{nodes => [LeaderNode | FollowerNodes]}),

Versions = erpc:multicall(FollowerNodes, rabbit_fifo, version, []),
MinVersion = lists:min([rabbit_fifo:version() | Versions]),

rabbit_log:debug("Will start up to ~w replicas for quorum ~ts with leader on node '~ts'",
[QuorumSize, rabbit_misc:rs(QName), LeaderNode]),
case rabbit_amqqueue:internal_declare(NewQ1, false) of
{created, NewQ} ->
RaConfs = [make_ra_conf(NewQ, ServerId)
RaConfs = [make_ra_conf(NewQ, ServerId, voter, MinVersion)
|| ServerId <- members(NewQ)],

%% khepri projections on remote nodes are eventually consistent
Expand Down Expand Up @@ -544,6 +547,10 @@ spawn_deleter(QName) ->
delete(Q, false, false, <<"expired">>)
end).

spawn_notify_decorators(QName, startup = Fun, Args) ->
spawn(fun() ->
notify_decorators(QName, Fun, Args)
end);
spawn_notify_decorators(QName, Fun, Args) ->
%% run in ra process for now
catch notify_decorators(QName, Fun, Args).
Expand Down Expand Up @@ -1339,7 +1346,9 @@ add_member(Q, Node, Membership, Timeout) when ?amqqueue_is_quorum(Q) ->
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
ServerId = {RaName, Node},
Members = members(Q),
Conf = make_ra_conf(Q, ServerId, Membership),

MachineVersion = erpc:call(Node, rabbit_fifo, version, []),
Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion),
case ra:start_server(?RA_SYSTEM, Conf) of
ok ->
ServerIdSpec =
Expand Down Expand Up @@ -1912,9 +1921,9 @@ format_ra_event(ServerId, Evt, QRef) ->
{'$gen_cast', {queue_event, QRef, {ServerId, Evt}}}.

make_ra_conf(Q, ServerId) ->
make_ra_conf(Q, ServerId, voter).
make_ra_conf(Q, ServerId, voter, rabbit_fifo:version()).

make_ra_conf(Q, ServerId, Membership) ->
make_ra_conf(Q, ServerId, Membership, MacVersion) ->
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
?TICK_INTERVAL),
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
Expand All @@ -1923,10 +1932,12 @@ make_ra_conf(Q, ServerId, Membership) ->
quorum_min_checkpoint_interval,
?MIN_CHECKPOINT_INTERVAL),
make_ra_conf(Q, ServerId, TickTimeout,
SnapshotInterval, CheckpointInterval, Membership).
SnapshotInterval, CheckpointInterval,
Membership, MacVersion).

make_ra_conf(Q, ServerId, TickTimeout,
SnapshotInterval, CheckpointInterval, Membership) ->
SnapshotInterval, CheckpointInterval,
Membership, MacVersion) ->
QName = amqqueue:get_name(Q),
RaMachine = ra_machine(Q),
[{ClusterName, _} | _] = Members = members(Q),
Expand All @@ -1947,6 +1958,7 @@ make_ra_conf(Q, ServerId, TickTimeout,
log_init_args => LogCfg,
tick_timeout => TickTimeout,
machine => RaMachine,
initial_machine_version => MacVersion,
ra_event_formatter => Formatter}).

make_mutable_config(Q) ->
Expand Down
13 changes: 9 additions & 4 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(rabbit_stream_coordinator).

Expand Down Expand Up @@ -495,7 +494,11 @@ start_coordinator_cluster() ->
Nodes = rabbit_nodes:list_reachable(),
rabbit_log:debug("Starting stream coordinator on nodes: ~w", [Nodes]),
true = Nodes =/= [],
case ra:start_cluster(?RA_SYSTEM, [make_ra_conf(Node, Nodes) || Node <- Nodes]) of
Versions = erpc:multicall(Nodes, ?MODULE, version, []),
MinVersion = lists:min([version() | Versions]),
case ra:start_cluster(?RA_SYSTEM,
[make_ra_conf(Node, Nodes, MinVersion)
|| Node <- Nodes]) of
{ok, Started, _} ->
rabbit_log:debug("Started stream coordinator on ~w", [Started]),
Started;
Expand Down Expand Up @@ -813,7 +816,8 @@ maybe_resize_coordinator_cluster() ->
end).

add_member(Members, Node) ->
Conf = make_ra_conf(Node, [N || {_, N} <- Members]),
MinMacVersion = erpc:call(Node, ?MODULE, version, []),
Conf = make_ra_conf(Node, [N || {_, N} <- Members], MinMacVersion),
ServerId = {?MODULE, Node},
case ra:start_server(?RA_SYSTEM, Conf) of
ok ->
Expand Down Expand Up @@ -1255,7 +1259,7 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
format_ra_event(ServerId, Evt) ->
{stream_coordinator_event, ServerId, Evt}.

make_ra_conf(Node, Nodes) ->
make_ra_conf(Node, Nodes, MinMacVersion) ->
UId = ra:new_uid(ra_lib:to_binary(?MODULE)),
Formatter = {?MODULE, format_ra_event, []},
Members = [{?MODULE, N} || N <- Nodes],
Expand All @@ -1270,6 +1274,7 @@ make_ra_conf(Node, Nodes) ->
log_init_args => #{uid => UId},
tick_timeout => TickTimeout,
machine => {module, ?MODULE, #{}},
initial_machine_version => MinMacVersion,
ra_event_formatter => Formatter}.

filter_command(_Meta, {delete_replica, _, #{node := Node}}, #stream{id = StreamId,
Expand Down
15 changes: 11 additions & 4 deletions deps/rabbit/test/rabbit_fifo_int_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ init_per_testcase(TestCase, Config) ->
meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end),
meck:new(rabbit_feature_flags, []),
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end),
meck:expect(rabbit_feature_flags, is_enabled, fun (_, _) -> true end),
ra_server_sup_sup:remove_all(?RA_SYSTEM),
ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
Expand Down Expand Up @@ -880,10 +881,16 @@ discard_next_delivery(ClusterName, State0, Wait) ->
end.

start_cluster(ClusterName, ServerIds, RaFifoConfig) ->
{ok, Started, _} = ra:start_cluster(?RA_SYSTEM,
ClusterName#resource.name,
{module, rabbit_fifo, RaFifoConfig},
ServerIds),
UId = ra:new_uid(ra_lib:to_binary(ClusterName#resource.name)),
Confs = [#{id => Id,
uid => UId,
cluster_name => ClusterName#resource.name,
log_init_args => #{uid => UId},
initial_members => ServerIds,
initial_machine_version => rabbit_fifo:version(),
machine => {module, rabbit_fifo, RaFifoConfig}}
|| Id <- ServerIds],
{ok, Started, _} = ra:start_cluster(?RA_SYSTEM, Confs),
?assertEqual(length(Started), length(ServerIds)),
ok.

Expand Down
2 changes: 1 addition & 1 deletion rabbitmq-components.mk
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ dep_khepri = hex 0.16.0
dep_khepri_mnesia_migration = hex 0.7.1
dep_osiris = git https://github.com/rabbitmq/osiris v1.8.5
dep_prometheus = hex 4.11.0
dep_ra = hex 2.16.0-pre.10
dep_ra = hex 2.16.0-pre.11
dep_ranch = hex 2.1.0
dep_recon = hex 2.5.6
dep_redbug = hex 2.0.7
Expand Down

0 comments on commit 562756c

Please sign in to comment.