Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -216,5 +216,6 @@
{'rabbitmq_4.2.0',
#{desc => "Allows rolling upgrades to 4.2.x",
stability => stable,
depends_on => ['rabbitmq_4.1.0']
depends_on => ['rabbitmq_4.1.0'],
callbacks => #{enable => {rabbit_khepri, enable_feature_flag}}
}}).
13 changes: 7 additions & 6 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
-define(MNESIA_REVERSE_TABLE, rabbit_reverse_route).
-define(MNESIA_INDEX_TABLE, rabbit_index_route).
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding).
-define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route).
-define(KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION, rabbit_khepri_route_by_source_key).
-define(KHEPRI_ROUTE_BY_SOURCE_PROJECTION, rabbit_khepri_route_by_source).

%% -------------------------------------------------------------------
%% exists().
Expand Down Expand Up @@ -708,10 +709,10 @@ match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) ->

match_routing_key_in_khepri(Src, ['_']) ->
try
MatchHead = #index_route{source_key = {Src, '_'},
destination = '$1',
_ = '_'},
ets:select(?KHEPRI_INDEX_ROUTE_PROJECTION, [{MatchHead, [], ['$1']}])
ets:lookup_element(?KHEPRI_ROUTE_BY_SOURCE_PROJECTION,
Src,
#route_by_source.destination,
[])
catch
error:badarg ->
[]
Expand All @@ -721,7 +722,7 @@ match_routing_key_in_khepri(Src, RoutingKeys) ->
fun(RK, Acc) ->
try
Dst = ets:lookup_element(
?KHEPRI_INDEX_ROUTE_PROJECTION,
?KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION,
{Src, RK},
#index_route.destination),
Dst ++ Acc
Expand Down
114 changes: 86 additions & 28 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
%% executed. If the migration runs concurrently, whether it started before or
%% during the execution of the Mnesia-specific anonymous function, {@link
%% handle_fallback/1} will watch for "no exists" table exceptions from Mnesia
%% and will retry the Mnesia functino or run the Khepri function accordingly.
%% and will retry the Mnesia function or run the Khepri function accordingly.
%% The Mnesia function must be idempotent because it can be executed multiple
%% times.
%%
Expand Down Expand Up @@ -170,6 +170,7 @@
%% equivalent cluster
-export([khepri_db_migration_enable/1,
khepri_db_migration_post_enable/1,
enable_feature_flag/1,
is_enabled/0, is_enabled/1,
get_feature_state/0, get_feature_state/1,
handle_fallback/1]).
Expand Down Expand Up @@ -331,10 +332,7 @@ init(IsVirgin) ->
"local Khepri-based " ?RA_FRIENDLY_NAME " member is caught "
"up to the Raft cluster leader", [],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok ?= case IsVirgin of
true -> register_projections();
false -> ok
end,
ok ?= register_projections(),
%% Delete transient queues on init.
%% Note that we also do this in the
%% `rabbit_amqqueue:on_node_down/1' callback. We must try this
Expand Down Expand Up @@ -1318,7 +1316,8 @@ register_projections() ->
fun register_rabbit_per_vhost_runtime_parameters_projection/0,
fun register_rabbit_user_permissions_projection/0,
fun register_rabbit_bindings_projection/0,
fun register_rabbit_index_route_projection/0,
fun register_rabbit_route_by_source_key_projection/0,
fun register_rabbit_route_by_source_projection/0,
fun register_rabbit_topic_graph_projection/0],
rabbit_misc:for_each_while_ok(
fun(RegisterFun) ->
Expand Down Expand Up @@ -1414,35 +1413,78 @@ register_rabbit_bindings_projection() ->
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
khepri:register_projection(?STORE_ID, PathPattern, Projection).

register_rabbit_index_route_projection() ->
MapFun = fun(Path, _) ->
{
VHost,
ExchangeName,
Kind,
DstName,
RoutingKey
} = rabbit_db_binding:khepri_route_path_to_args(Path),
Exchange = rabbit_misc:r(VHost, exchange, ExchangeName),
Destination = rabbit_misc:r(VHost, Kind, DstName),
SourceKey = {Exchange, RoutingKey},
#index_route{source_key = SourceKey,
destination = Destination}
register_rabbit_route_by_source_key_projection() ->
MapFun = fun(_Path, #binding{source = Source,
key = Key,
destination = Destination,
args = Args}) ->
#index_route{source_key = {Source, Key},
destination = Destination,
args = Args}
end,
ProjectionFun = projection_fun_for_sets(MapFun),
Options = #{type => bag,
keypos => #index_route.source_key,
read_concurrency => true},
Projection = khepri_projection:new(
rabbit_khepri_index_route, ProjectionFun, Options),
DirectOrFanout = #if_data_matches{
pattern = #exchange{type = '$1', _ = '_'},
conditions = [{'andalso',
{'=/=', '$1', headers},
{'=/=', '$1', topic}}]},
Projection = khepri_projection:new(rabbit_khepri_route_by_source_key,
ProjectionFun,
Options),
Exchange = #if_data_matches{
pattern = #exchange{type = '$1', _ = '_'},
conditions = [{'andalso',
{'=/=', '$1', headers},
{'=/=', '$1', topic},
{'=/=', '$1', fanout},
{'=/=', '$1', 'x-jms-topic'},
{'=/=', '$1', 'x-random'}
}]},
PathPattern = rabbit_db_binding:khepri_route_path(
_VHost = ?KHEPRI_WILDCARD_STAR,
_Exchange = DirectOrFanout,
Exchange,
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
khepri:register_projection(?STORE_ID, PathPattern, Projection).

register_rabbit_route_by_source_projection() ->
MapFun = fun(_Path, #binding{source = Source,
key = Key,
destination = Destination,
args = Args}) ->
#route_by_source{source = Source,
key = Key,
destination = Destination,
args = Args}
end,
ProjectionFun = projection_fun_for_sets(MapFun),
Options = #{type => bag,
keypos => #route_by_source.source,
read_concurrency => true},
Projection = khepri_projection:new(rabbit_khepri_route_by_source,
ProjectionFun,
Options),
%% For some exchange types we know that they won't use this projection.
%% So we exclude such bindings for two reasons:
%% 1. Lower overall ETS memory usage
%% 2. "Avoid inserting an extensive amount of objects with the same key.
%% It will hurt insert and lookup performance as well as real time characteristics
%% of the runtime environment (hash bucket linear search do not yield)."
%% Example: same source direct exchange with 100k different binding keys.
%% In future, rather than exchange types exclusion as done here, a nicer approach
%% would be that each exchange requiring routing lookup by only source exchange
%% advertises this access pattern, e.g. as a boolean flag in the #exchange.options field.
Exchange = #if_data_matches{
pattern = #exchange{type = '$1', _ = '_'},
conditions = [{'andalso',
{'=/=', '$1', headers},
{'=/=', '$1', topic},
{'=/=', '$1', direct},
{'=/=', '$1', 'x-local-random'},
{'=/=', '$1', 'x-jms-topic'}
}]},
PathPattern = rabbit_db_binding:khepri_route_path(
_VHost = ?KHEPRI_WILDCARD_STAR,
Exchange,
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
Expand Down Expand Up @@ -1761,6 +1803,22 @@ khepri_db_migration_post_enable(
_ = mnesia_to_khepri:rollback_table_copy(?STORE_ID, ?MIGRATION_ID),
ok.

enable_feature_flag(#{command := enable,
feature_name := 'rabbitmq_4.2.0' = FeatureName}) ->
%% We unregister this projection because it's superseded by
%% rabbit_khepri_route_by_source_key introduced in 4.2.0
ProjectionName = rabbit_khepri_index_route,
Result = try khepri:unregister_projections(?STORE_ID, [ProjectionName])
catch _:Reason -> Reason
end,
?LOG_DEBUG(
"enabling feature flag ~s unregisters projection ~s: ~tp",
[FeatureName, ProjectionName, Result],
#{domain => ?RMQLOG_DOMAIN_DB}),
ok;
enable_feature_flag(_) ->
ok.

-spec sync_cluster_membership_from_mnesia(FeatureName) -> Ret when
FeatureName :: rabbit_feature_flags:feature_name(),
Ret :: ok | {error, Reason},
Expand Down
111 changes: 99 additions & 12 deletions deps/rabbit/test/bindings_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile([nowarn_export_all, export_all]).
-compile(export_all).

suite() ->
[{timetrap, 5 * 60000}].
Expand Down Expand Up @@ -49,8 +48,12 @@ all_tests() ->
list_with_multiple_vhosts,
list_with_multiple_arguments,
bind_to_unknown_queue,
binding_args_direct_exchange,
binding_args_fanout_exchange,

%% Exchange bindings
bind_and_unbind_exchange,
bind_and_unbind_direct_exchange,
bind_and_unbind_fanout_exchange,
bind_and_delete_exchange_source,
bind_and_delete_exchange_destination,
bind_to_unknown_exchange,
Expand Down Expand Up @@ -116,6 +119,7 @@ end_per_testcase(Testcase, Config) ->
%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------

bind_and_unbind(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),

Expand Down Expand Up @@ -697,33 +701,116 @@ bind_to_unknown_queue(Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),
ok.

bind_and_unbind_exchange(Config) ->
%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/14533
binding_args_direct_exchange(Config) ->
binding_args(<<"amq.direct">>, Config).

binding_args_fanout_exchange(Config) ->
binding_args(<<"amq.fanout">>, Config).

binding_args(Exchange, Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),

#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),

%% Create two bindings that differ only in their binding arguments.
RoutingKey = <<"some-key">>,
BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}],
BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}],
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
routing_key = RoutingKey,
queue = Q,
arguments = BindingArgs1}),
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
routing_key = RoutingKey,
queue = Q,
arguments = BindingArgs2}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{exchange = Exchange,
routing_key = RoutingKey},
#amqp_msg{payload = <<"m1">>}),
receive #'basic.ack'{} -> ok
after 9000 -> ct:fail(confirm_timeout)
end,

?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})),

%% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding.
#'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange,
routing_key = RoutingKey,
queue = Q,
arguments = BindingArgs1}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{exchange = Exchange,
routing_key = RoutingKey},
#amqp_msg{payload = <<"m2">>}),
receive #'basic.ack'{} -> ok
after 9000 -> ct:fail(confirm_timeout)
end,

?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})).

bind_and_unbind_direct_exchange(Config) ->
bind_and_unbind_exchange(<<"direct">>, Config).

bind_and_unbind_fanout_exchange(Config) ->
bind_and_unbind_exchange(<<"fanout">>, Config).

bind_and_unbind_exchange(Type, Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
X = ?config(exchange_name, Config),
Q = ?config(queue_name, Config),
RoutingKey = <<"some key">>,
SourceExchange = <<"amq.", Type/binary>>,

?assertEqual([],
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),

#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X}),
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X,
type = Type}),
%% Let's bind to other exchange
#'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = X,
source = <<"amq.direct">>,
routing_key = <<"key">>}),
source = SourceExchange,
routing_key = RoutingKey}),

DirectBinding = binding_record(rabbit_misc:r(<<"/">>, exchange, <<"amq.direct">>),
rabbit_misc:r(<<"/">>, exchange, X),
<<"key">>, []),
Binding = binding_record(rabbit_misc:r(<<"/">>, exchange, SourceExchange),
rabbit_misc:r(<<"/">>, exchange, X),
RoutingKey, []),

?assertEqual([DirectBinding],
?assertEqual([Binding],
lists:sort(
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>]))),

%% Test that a message gets routed:
%% exchange -> exchange -> queue
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = X,
routing_key = RoutingKey,
queue = Q}),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
ok = amqp_channel:cast(Ch,
#'basic.publish'{exchange = SourceExchange,
routing_key = RoutingKey},
#amqp_msg{payload = <<"m1">>}),
receive #'basic.ack'{} -> ok
after 9000 -> ct:fail(confirm_timeout)
end,
?assertEqual(#'queue.delete_ok'{message_count = 1},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),

#'exchange.unbind_ok'{} = amqp_channel:call(Ch,
#'exchange.unbind'{destination = X,
source = <<"amq.direct">>,
routing_key = <<"key">>}),
source = SourceExchange,
routing_key = RoutingKey}),

?assertEqual([],
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit_common/include/rabbit.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
-record(route, {binding, value = const}).
-record(reverse_route, {reverse_binding, value = const}).
-record(index_route, {source_key, destination, args = []}).
-record(route_by_source, {source, key, destination, args = []}).

-record(binding, {source, key, destination, args = []}).
-record(reverse_binding, {destination, key, source, args = []}).
Expand Down
Loading