Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
-type bind_ok_or_error() :: 'ok' | bind_errors() |
rabbit_types:error({'binding_invalid', string(), [any()]}) |
%% inner_fun() result
rabbit_types:error(rabbit_types:amqp_error()).
rabbit_types:error(rabbit_types:amqp_error()) |
rabbit_khepri:timeout_error().
-type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()).
-type inner_fun() ::
fun((rabbit_types:exchange(),
Expand Down
3 changes: 3 additions & 0 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,9 @@ binding_action(Action, Binding, Username, ConnPid) ->
rabbit_misc:protocol_error(precondition_failed, Fmt, Args);
{error, #amqp_error{} = Error} ->
rabbit_misc:protocol_error(Error);
{error, timeout} ->
rabbit_misc:protocol_error(
internal_error, "Could not ~s binding due to timeout", [Action]);
ok ->
ok
end.
Expand Down
15 changes: 10 additions & 5 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ exists_in_khepri(#binding{source = SrcName,
Errs ->
Errs
end
end) of
end, ro) of
{ok, not_found} -> false;
{ok, Set} -> sets:is_element(Binding, Set);
Errs -> not_found_errs_in_khepri(not_found(Errs, SrcName, DstName))
Expand Down Expand Up @@ -150,8 +150,9 @@ not_found({[], []}, SrcName, DstName) ->
Binding :: rabbit_types:binding(),
Src :: rabbit_types:binding_source(),
Dst :: rabbit_types:binding_destination(),
ChecksFun :: fun((Src, Dst) -> ok | {error, Reason :: any()}),
Ret :: ok | {error, Reason :: any()}.
ChecksFun :: fun((Src, Dst) -> ok | {error, ChecksErrReason}),
ChecksErrReason :: any(),
Ret :: ok | {error, ChecksErrReason} | rabbit_khepri:timeout_error().
%% @doc Writes a binding if it doesn't exist already and passes the validation in
%% `ChecksFun' i.e. exclusive access
%%
Expand Down Expand Up @@ -255,8 +256,12 @@ serial_in_khepri(true, X) ->
Binding :: rabbit_types:binding(),
Src :: rabbit_types:binding_source(),
Dst :: rabbit_types:binding_destination(),
ChecksFun :: fun((Src, Dst) -> ok | {error, Reason :: any()}),
Ret :: ok | {ok, rabbit_binding:deletions()} | {error, Reason :: any()}.
ChecksFun :: fun((Src, Dst) -> ok | {error, ChecksErrReason}),
ChecksErrReason :: any(),
Ret :: ok |
{ok, rabbit_binding:deletions()} |
{error, ChecksErrReason} |
rabbit_khepri:timeout_error().
%% @doc Deletes a binding record from the database if it passes the validation in
%% `ChecksFun'. It also triggers the deletion of auto-delete exchanges if needed.
%%
Expand Down
17 changes: 11 additions & 6 deletions deps/rabbit/src/rabbit_definitions.erl
Original file line number Diff line number Diff line change
Expand Up @@ -883,12 +883,17 @@ add_binding(VHost, Binding, ActingUser) ->
rv(VHost, DestType, destination, Binding), ActingUser).

add_binding_int(Binding, Source, Destination, ActingUser) ->
rabbit_binding:add(
#binding{source = Source,
destination = Destination,
key = maps:get(routing_key, Binding, undefined),
args = args(maps:get(arguments, Binding, undefined))},
ActingUser).
case rabbit_binding:add(
#binding{source = Source,
destination = Destination,
key = maps:get(routing_key, Binding, undefined),
args = args(maps:get(arguments, Binding, undefined))},
ActingUser) of
ok ->
ok;
{error, _} = Err ->
throw(Err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't object to this change but throwing on the definition import path will halt the entire process. Perhaps we already do that in other places :( It's a good question whether we should continue or halt, "some definitions" can be better than none.

But for cases where the definitions are imported from an external tool on boot (and that happens to early), failing makes more sense.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was originally thinking we should refactor rabbit_definitions to import all definitions in one big transaction. Then the side effects like starting vhost or queue processes would happen after the transaction succeeded. It would be a really really big refactor though and definitions import is idempotent so if an import fails partially all you should need to do is re-run it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do end up catching these throws here

_ = try
WorkPoolFun(M)
catch {error, E} -> gatherer:in(Gatherer, {error, E});
_:E:Stacktrace ->
rabbit_log:debug("Definition import: a work pool operation has thrown an exception ~st, stacktrace: ~p",
[E, Stacktrace]),
gatherer:in(Gatherer, {error, E})
end,
gatherer:finish(Gatherer)

and convert it into an {error, _} but it will halt the import after the first concurrent_for_all/4 throws. It won't be a crash+stacktrace in the logs at least 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the metadata store is timing out, most likely it won't be the only failing operation. I wonder if this is one of the cases where we should retry the operation. Like if it fails during boot, retry a few times. Or just abort the boot process?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a bullet for looking into startup specifically to my todo list. It looks like currently we crash when trying to register projections if you start in a minority during rabbit_khepri:setup/0 which comes before definitions import. (Start a 3-node cluster, enable khepri_db, stop the cluster, start one node.) I think we should have retries there so you have some time to form a majority before we error out and fail the boot. There are also hard matches on ok for when we insert default data

insert_default_data() ->
DefaultUser = get_default_data_param(default_user),
DefaultPass = get_default_data_param(default_pass),
{ok, DefaultTags} = application:get_env(default_user_tags),
DefaultVHost = get_default_data_param(default_vhost),
{ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} =
application:get_env(default_permissions),
DefaultUserBin = rabbit_data_coercion:to_binary(DefaultUser),
DefaultPassBin = rabbit_data_coercion:to_binary(DefaultPass),
DefaultVHostBin = rabbit_data_coercion:to_binary(DefaultVHost),
DefaultConfigurePermBin = rabbit_data_coercion:to_binary(DefaultConfigurePerm),
DefaultWritePermBin = rabbit_data_coercion:to_binary(DefaultWritePerm),
DefaultReadPermBin = rabbit_data_coercion:to_binary(DefaultReadPerm),
ok = rabbit_vhost:add(DefaultVHostBin, <<"Default virtual host">>, [], ?INTERNAL_USER),
ok = rabbit_auth_backend_internal:add_user(
DefaultUserBin,
DefaultPassBin,
?INTERNAL_USER
),
ok = rabbit_auth_backend_internal:set_tags(DefaultUserBin, DefaultTags,
?INTERNAL_USER),
ok = rabbit_auth_backend_internal:set_permissions(DefaultUserBin,
DefaultVHostBin,
DefaultConfigurePermBin,
DefaultWritePermBin,
DefaultReadPermBin,
?INTERNAL_USER),
ok.

so I think keeping definitions imports and default data behaving the same is probably ok for now. These operations happen right after we start up Khepri and send commands like register_projections so if we get to this part of the boot we have already been in a majority and probably still are, unless some of the nodes disappear right after boot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we added retries at some point to wait for other nodes, as we do for Mnesia. I'll have a look

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just wait for the leader, so I opened a new PR waiting for the projections: #11741

end.

dest_type(Binding) ->
rabbit_data_coercion:to_atom(maps:get(destination_type, Binding, undefined)).
Expand Down
11 changes: 11 additions & 0 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,17 @@
clear_forced_metadata_store/0]).
-endif.

-type timeout_error() :: khepri:error(timeout).
%% Commands like 'put'/'delete' etc. might time out in Khepri. It might take
%% the leader longer to apply the command and reply to the caller than the
%% configured timeout. This error is easy to reproduce - a cluster which is
%% only running a minority of nodes will consistently return `{error, timeout}`
%% for commands until the cluster majority can be re-established. Commands
%% returning `{error, timeout}` are a likely (but not certain) indicator that
%% the node which submitted the command is running in a minority.

-export_type([timeout_error/0]).

-compile({no_auto_import, [get/1, get/2, nodes/0]}).

-define(RA_SYSTEM, coordination).
Expand Down
15 changes: 14 additions & 1 deletion deps/rabbit/test/cluster_minority_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ groups() ->
open_channel,
declare_exchange,
declare_binding,
delete_binding,
declare_queue,
publish_to_exchange,
publish_and_consume_to_local_classic_queue,
Expand Down Expand Up @@ -85,7 +86,7 @@ init_per_group(Group, Config0) when Group == client_operations;
{skip, _} ->
Config1;
_ ->
%% Before partitioning the cluster, create a policy and queue that can be used in
%% Before partitioning the cluster, create resources that can be used in
%% the test cases. They're needed for delete and consume operations, which can list
%% them but fail to operate anything else.
%%
Expand All @@ -95,6 +96,10 @@ init_per_group(Group, Config0) when Group == client_operations;
%% To be used in consume_from_queue
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = <<"test-queue">>,
arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]}),
%% To be used in delete_binding
#'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = <<"amq.fanout">>,
source = <<"amq.direct">>,
routing_key = <<"binding-to-be-deleted">>}),

%% Lower the default Khepri command timeout. By default this is set
%% to 30s in `rabbit_khepri:setup/1' which makes the cases in this
Expand Down Expand Up @@ -160,6 +165,14 @@ declare_binding(Config) ->
source = <<"amq.direct">>,
routing_key = <<"key">>})).

delete_binding(Config) ->
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
amqp_channel:call(Ch, #'exchange.unbind'{destination = <<"amq.fanout">>,
source = <<"amq.direct">>,
routing_key = <<"binding-to-be-deleted">>})).

declare_queue(Config) ->
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbitmq_stream/src/rabbit_stream_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,8 @@ add_super_stream_binding(VirtualHost,
{error, {binding_invalid, rabbit_misc:format(Fmt, Args)}};
{error, #amqp_error{} = Error} ->
{error, {internal_error, rabbit_misc:format("~tp", [Error])}};
{error, timeout} ->
{error, {internal_error, "failed to add binding due to a timeout"}};
ok ->
ok
end.
Expand Down