Skip to content

Commit

Permalink
Merge branch 'main' into rabbitmq-server-7262
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin authored Feb 23, 2023
2 parents b0d2f94 + 2463933 commit fbe83e4
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 4 deletions.
16 changes: 15 additions & 1 deletion deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -743,8 +743,22 @@ end}.
{datatype, integer}
]}.

{mapping, "default_policies.operator.$id.classic_queues.ha_mode", "rabbit.default_policies.operator", [
{datatype, string}
]}.

{mapping, "default_policies.operator.$id.classic_queues.ha_params", "rabbit.default_policies.operator", [
{datatype, [integer, {list, string}]}
]}.

{translation, "rabbit.default_policies.operator", fun(Conf) ->
Props = rabbit_cuttlefish:aggregate_props(Conf, ["default_policies", "operator"]),
Props = rabbit_cuttlefish:aggregate_props(
Conf,
["default_policies", "operator"],
fun({["default_policies","operator",ID,"classic_queues"|T], V}) ->
{["default_policies","operator",ID|T],V};
(E) -> E
end),
Props1 = lists:map(
fun({K, Ss}) ->
{K,
Expand Down
13 changes: 10 additions & 3 deletions deps/rabbit/src/rabbit_cuttlefish.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,28 @@
-module(rabbit_cuttlefish).

-export([
aggregate_props/2
aggregate_props/2,
aggregate_props/3
]).

-type keyed_props() :: [{binary(), [{binary(), any()}]}].

-spec aggregate_props([{string(), any()}], [string()]) ->
keyed_props().
aggregate_props(Conf, Prefix) ->
aggregate_props(Conf, Prefix, fun(E) -> E end).

-spec aggregate_props([{string(), any()}], [string()], function()) ->
keyed_props().
aggregate_props(Conf, Prefix, KeyFun) ->
Pattern = Prefix ++ ["$id", "$_"],
PrefixLen = length(Prefix),
FlatList = lists:filtermap(
fun({K, V}) ->
fun(E) ->
{K, V} = KeyFun(E),
case cuttlefish_variable:is_fuzzy_match(K, Pattern) of
true -> {true, {lists:nthtail(PrefixLen, K), V}};
_ -> false
false -> false
end
end,
Conf
Expand Down
43 changes: 43 additions & 0 deletions deps/rabbit/src/rabbit_mirror_queue_misc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

-module(rabbit_mirror_queue_misc).
-behaviour(rabbit_policy_validator).
-behaviour(rabbit_policy_merge_strategy).

-include("amqqueue.hrl").

Expand All @@ -15,6 +16,7 @@
initial_queue_node/2, suggested_queue_nodes/1, actual_queue_nodes/1,
is_mirrored/1, is_mirrored_ha_nodes/1,
update_mirrors/2, update_mirrors/1, validate_policy/1,
merge_policy_value/3,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
sync_batch_size/1, default_max_sync_throughput/0,
log_info/3, log_warning/3]).
Expand Down Expand Up @@ -46,6 +48,14 @@
[policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-promote-on-failure">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[operator_policy_validator, <<"ha-mode">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[operator_policy_validator, <<"ha-params">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_merge_strategy, <<"ha-mode">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_merge_strategy, <<"ha-params">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, recovery}]}).

Expand Down Expand Up @@ -788,3 +798,36 @@ validate_pof(PromoteOnShutdown) ->
Mode -> {error, "ha-promote-on-failure must be "
"\"always\" or \"when-synced\", got ~tp", [Mode]}
end.

merge_policy_value(<<"ha-mode">>, Val, Val) ->
Val;
merge_policy_value(<<"ha-mode">>, <<"all">> = Val, _OpVal) ->
Val;
merge_policy_value(<<"ha-mode">>, _Val, <<"all">> = OpVal) ->
OpVal;
merge_policy_value(<<"ha-mode">>, <<"exactly">> = Val, _OpVal) ->
Val;
merge_policy_value(<<"ha-mode">>, _Val, <<"exactly">> = OpVal) ->
OpVal;
%% Both values are integers, both are ha-mode 'exactly'
merge_policy_value(<<"ha-params">>, Val, OpVal) when is_integer(Val)
andalso
is_integer(OpVal)->
if Val > OpVal ->
Val;
true ->
OpVal
end;
%% The integer values is of ha-mode 'exactly', the other is a list and of
%% ha-mode 'nodes'. 'exactly' takes precedence
merge_policy_value(<<"ha-params">>, Val, _OpVal) when is_integer(Val) ->
Val;
merge_policy_value(<<"ha-params">>, _Val, OpVal) when is_integer(OpVal) ->
OpVal;
%% Both values are lists, of ha-mode 'nodes', max length takes precedence.
merge_policy_value(<<"ha-params">>, Val, OpVal) ->
if length(Val) > length(OpVal) ->
Val;
true ->
OpVal
end.
4 changes: 4 additions & 0 deletions deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,13 @@ ssl_options.fail_if_no_peer_cert = true",
default_policies.operator.a.expires = 1h
default_policies.operator.a.queue_pattern = apple
default_policies.operator.a.vhost_pattern = banana
default_policies.operator.a.classic_queues.ha_mode = exactly
default_policies.operator.a.classic_queues.ha_params = 2
",
[{rabbit, [{default_policies, [{operator, [
{<<"a">>, [{<<"expires">>, 3600000},
{<<"ha-mode">>, "exactly"},
{<<"ha-params">>, 2},
{<<"queue-pattern">>, "apple"},
{<<"vhost-pattern">>, "banana"}]}]}]}]}],
[]},
Expand Down
77 changes: 77 additions & 0 deletions deps/rabbit/test/policy_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").


-compile(export_all).

all() ->
Expand All @@ -20,6 +21,7 @@ all() ->
groups() ->
[
{cluster_size_2, [], [
target_count_policy,
policy_ttl,
operator_policy_ttl,
operator_retroactive_policy_ttl,
Expand Down Expand Up @@ -149,6 +151,59 @@ operator_retroactive_policy_publish_ttl(Config) ->
rabbit_ct_client_helpers:close_connection(Conn),
passed.

target_count_policy(Config) ->
[Server | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = <<"policy_ha">>,
declare(Ch, QName),
BNodes = [atom_to_binary(N) || N <- Nodes],

AllPolicy = [{<<"ha-mode">>, <<"all">>}],
ExactlyPolicyOne = [{<<"ha-mode">>, <<"exactly">>},
{<<"ha-params">>, 1}],
ExactlyPolicyTwo = [{<<"ha-mode">>, <<"exactly">>},
{<<"ha-params">>, 2}],
NodesPolicyAll = [{<<"ha-mode">>, <<"nodes">>},
{<<"ha-params">>, BNodes}],
NodesPolicyOne = [{<<"ha-mode">>, <<"nodes">>},
{<<"ha-params">>, [hd(BNodes)]}],

%% ALL has precedence
Opts = #{config => Config,
server => Server,
qname => QName},

verify_policies(AllPolicy, ExactlyPolicyTwo, [{<<"ha-mode">>, <<"all">>}], Opts),

verify_policies(ExactlyPolicyTwo, AllPolicy, [{<<"ha-mode">>, <<"all">>}], Opts),

verify_policies(AllPolicy, NodesPolicyAll, [{<<"ha-mode">>, <<"all">>}], Opts),

verify_policies(NodesPolicyAll, AllPolicy, [{<<"ha-mode">>, <<"all">>}], Opts),

%% exactly has precedence over nodes
verify_policies(ExactlyPolicyTwo, NodesPolicyAll,[{<<"ha-mode">>, <<"exactly">>}, {<<"ha-params">>, 2}], Opts),

verify_policies(NodesPolicyAll, ExactlyPolicyTwo, [{<<"ha-mode">>, <<"exactly">>}, {<<"ha-params">>, 2}], Opts),

%% Highest exactly value has precedence
verify_policies(ExactlyPolicyTwo, ExactlyPolicyOne, [{<<"ha-mode">>, <<"exactly">>}, {<<"ha-params">>, 2}], Opts),

verify_policies(ExactlyPolicyOne, ExactlyPolicyTwo, [{<<"ha-mode">>, <<"exactly">>}, {<<"ha-params">>, 2}], Opts),

%% Longest node count has precedence
SortedNodes = lists:sort(BNodes),
verify_policies(NodesPolicyAll, NodesPolicyOne, [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, SortedNodes}], Opts),
verify_policies(NodesPolicyOne, NodesPolicyAll, [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, SortedNodes}], Opts),

delete(Ch, QName),
rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"policy">>),
rabbit_ct_broker_helpers:clear_operator_policy(Config, 0, <<"op_policy">>),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
passed.


%%----------------------------------------------------------------------------


Expand Down Expand Up @@ -201,4 +256,26 @@ get_messages(Number, Ch, Q) ->
exit(failed)
end.

check_policy_value(Server, QName, Value) ->
{ok, Q} = rpc:call(Server, rabbit_amqqueue, lookup, [rabbit_misc:r(<<"/">>, queue, QName)]),
proplists:get_value(Value, rpc:call(Server, rabbit_policy, effective_definition, [Q])).

verify_policies(Policy, OperPolicy, VerifyFuns, #{config := Config,
server := Server,
qname := QName}) ->
rabbit_ct_broker_helpers:set_policy(Config, 0, <<"policy">>,
<<"policy_ha">>, <<"queues">>,
Policy),
rabbit_ct_broker_helpers:set_operator_policy(Config, 0, <<"op_policy">>,
<<"policy_ha">>, <<"queues">>,
OperPolicy),
verify_policy(VerifyFuns, Server, QName).

verify_policy([], _, _) ->
ok;
verify_policy([{HA, Expect} | Tail], Server, QName) ->
Expect = check_policy_value(Server, QName, HA),
verify_policy(Tail, Server, QName).


%%----------------------------------------------------------------------------

0 comments on commit fbe83e4

Please sign in to comment.