Skip to content

Commit a1a6bbc

Browse files
committed
Speed up fanout exchange
Resolves #14531 ## What? Increase end-to-end message throughput for messages routed via the fanout exchange by ~42% (see benchmark below). In addition to the fanout exchange, a similar speed up is achieved for the following exchange types: * modulus hash * random * recent history This applies only if Khepri is enabled. ## How? Use an additional routing table (projection) whose table key is the source exchange. Looking up the destinations happens then by an ETS table key. Prior to this commit, CPUs were busy compiling the same match spec for every incoming message. ## Benchmark 1. Start RabbitMQ: ``` make run-broker RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \ RABBITMQ_CONFIG_FILE="advanced.config" PLUGINS="rabbitmq_management" ``` where `advanced.config` contains: ``` [ {rabbitmq_management_agent, [ {disable_metrics_collector, true} ]} ]. ``` 2. Create a queue and binding: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=classic durable=true name=q1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=q1 ``` 3. Create the load ``` java -jar target/perf-test.jar -p -e amq.fanout -u q1 -s 5 --autoack -z 60 ``` Before this commit: ``` sending rate avg: 97394 msg/s receiving rate avg: 97394 msg/s ``` After this commit: ``` sending rate avg: 138677 msg/s receiving rate avg: 138677 msg/s ``` The CPU flamegraph shows that `rabbit_exchange:route/3` consumes the following CPU amounts: * 13.5% before this commit * 3.4% after this commit ## Downsides Additional ETS memory usage for the new projection table. However, the new table does not store any binding entries for the following source exchange types: * direct * headers * topic * x-local-random
1 parent ccb56fc commit a1a6bbc

File tree

5 files changed

+160
-95
lines changed

5 files changed

+160
-95
lines changed

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,5 +216,6 @@
216216
{'rabbitmq_4.2.0',
217217
#{desc => "Allows rolling upgrades to 4.2.x",
218218
stability => stable,
219-
depends_on => ['rabbitmq_4.1.0']
219+
depends_on => ['rabbitmq_4.1.0'],
220+
callbacks => #{enable => {rabbit_khepri, enable_feature_flag}}
220221
}}).

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@
5656
-define(MNESIA_REVERSE_TABLE, rabbit_reverse_route).
5757
-define(MNESIA_INDEX_TABLE, rabbit_index_route).
5858
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding).
59-
-define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route).
59+
-define(KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION, rabbit_khepri_route_by_source_key).
60+
-define(KHEPRI_ROUTE_BY_SOURCE_PROJECTION, rabbit_khepri_route_by_source).
6061

6162
%% -------------------------------------------------------------------
6263
%% exists().
@@ -707,21 +708,14 @@ match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) ->
707708
end.
708709

709710
match_routing_key_in_khepri(Src, ['_']) ->
710-
try
711-
MatchHead = #index_route{source_key = {Src, '_'},
712-
destination = '$1',
713-
_ = '_'},
714-
ets:select(?KHEPRI_INDEX_ROUTE_PROJECTION, [{MatchHead, [], ['$1']}])
715-
catch
716-
error:badarg ->
717-
[]
718-
end;
711+
ets:lookup_element(?KHEPRI_ROUTE_BY_SOURCE_PROJECTION,
712+
Src, #route_by_source.destination, []);
719713
match_routing_key_in_khepri(Src, RoutingKeys) ->
720714
lists:foldl(
721715
fun(RK, Acc) ->
722716
try
723717
Dst = ets:lookup_element(
724-
?KHEPRI_INDEX_ROUTE_PROJECTION,
718+
?KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION,
725719
{Src, RK},
726720
#index_route.destination),
727721
Dst ++ Acc

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 96 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@
170170
%% equivalent cluster
171171
-export([khepri_db_migration_enable/1,
172172
khepri_db_migration_post_enable/1,
173+
enable_feature_flag/1,
173174
is_enabled/0, is_enabled/1,
174175
get_feature_state/0, get_feature_state/1,
175176
handle_fallback/1]).
@@ -332,8 +333,10 @@ init(IsVirgin) ->
332333
"up to the Raft cluster leader", [],
333334
#{domain => ?RMQLOG_DOMAIN_DB}),
334335
ok ?= case IsVirgin of
335-
true -> register_projections();
336-
false -> ok
336+
true ->
337+
register_projections();
338+
false ->
339+
register_4_2_0_projections()
337340
end,
338341
%% Delete transient queues on init.
339342
%% Note that we also do this in the
@@ -1318,25 +1321,34 @@ register_projections() ->
13181321
fun register_rabbit_per_vhost_runtime_parameters_projection/0,
13191322
fun register_rabbit_user_permissions_projection/0,
13201323
fun register_rabbit_bindings_projection/0,
1321-
fun register_rabbit_index_route_projection/0,
1324+
fun register_rabbit_route_by_source_key_projection/0,
1325+
fun register_rabbit_route_by_source_projection/0,
13221326
fun register_rabbit_topic_graph_projection/0],
1323-
rabbit_misc:for_each_while_ok(
1324-
fun(RegisterFun) ->
1325-
case RegisterFun() of
1326-
ok ->
1327-
ok;
1328-
%% Before Khepri v0.13.0, `khepri:register_projection/1,2,3`
1329-
%% would return `{error, exists}` for projections which
1330-
%% already exist.
1331-
{error, exists} ->
1332-
ok;
1333-
%% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead.
1334-
{error, {khepri, projection_already_exists, _Info}} ->
1335-
ok;
1336-
{error, _} = Error ->
1337-
Error
1338-
end
1339-
end, RegFuns).
1327+
rabbit_misc:for_each_while_ok(fun register_projection/1, RegFuns).
1328+
1329+
%% This function registers projections introduced in 4.2.0. In a mixed version
1330+
%% cluster, these new projections will appear but won't be used on older nodes.
1331+
%% This function can be deleted after feature flag rabbitmq_4.2.0 becomes required.
1332+
register_4_2_0_projections() ->
1333+
RegFuns = [fun register_rabbit_route_by_source_key_projection/0,
1334+
fun register_rabbit_route_by_source_projection/0],
1335+
rabbit_misc:for_each_while_ok(fun register_projection/1, RegFuns).
1336+
1337+
register_projection(RegisterFun) ->
1338+
case RegisterFun() of
1339+
ok ->
1340+
ok;
1341+
%% Before Khepri v0.13.0, `khepri:register_projection/1,2,3`
1342+
%% would return `{error, exists}` for projections which
1343+
%% already exist.
1344+
{error, exists} ->
1345+
ok;
1346+
%% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead.
1347+
{error, {khepri, projection_already_exists, _Info}} ->
1348+
ok;
1349+
{error, _} = Error ->
1350+
Error
1351+
end.
13401352

13411353
register_rabbit_exchange_projection() ->
13421354
Name = rabbit_khepri_exchange,
@@ -1414,7 +1426,7 @@ register_rabbit_bindings_projection() ->
14141426
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
14151427
khepri:register_projection(?STORE_ID, PathPattern, Projection).
14161428

1417-
register_rabbit_index_route_projection() ->
1429+
register_rabbit_route_by_source_key_projection() ->
14181430
MapFun = fun(_Path, #binding{source = Source,
14191431
key = Key,
14201432
destination = Destination,
@@ -1427,16 +1439,55 @@ register_rabbit_index_route_projection() ->
14271439
Options = #{type => bag,
14281440
keypos => #index_route.source_key,
14291441
read_concurrency => true},
1430-
Projection = khepri_projection:new(
1431-
rabbit_khepri_index_route, ProjectionFun, Options),
1432-
IgnoreHeadersAndTopic = #if_data_matches{
1433-
pattern = #exchange{type = '$1', _ = '_'},
1434-
conditions = [{'andalso',
1435-
{'=/=', '$1', headers},
1436-
{'=/=', '$1', topic}}]},
1442+
Projection = khepri_projection:new(rabbit_khepri_route_by_source_key,
1443+
ProjectionFun,
1444+
Options),
1445+
Exchange = #if_data_matches{
1446+
pattern = #exchange{type = '$1', _ = '_'},
1447+
conditions = [{'andalso',
1448+
{'=/=', '$1', headers},
1449+
{'=/=', '$1', topic},
1450+
{'=/=', '$1', fanout},
1451+
{'=/=', '$1', 'x-jms-topic'},
1452+
{'=/=', '$1', 'x-random'}
1453+
}]},
14371454
PathPattern = rabbit_db_binding:khepri_route_path(
14381455
_VHost = ?KHEPRI_WILDCARD_STAR,
1439-
_Exchange = IgnoreHeadersAndTopic,
1456+
Exchange,
1457+
_Kind = ?KHEPRI_WILDCARD_STAR,
1458+
_DstName = ?KHEPRI_WILDCARD_STAR,
1459+
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
1460+
khepri:register_projection(?STORE_ID, PathPattern, Projection).
1461+
1462+
register_rabbit_route_by_source_projection() ->
1463+
MapFun = fun(_Path, #binding{source = Source,
1464+
key = Key,
1465+
destination = Destination,
1466+
args = Args}) ->
1467+
#route_by_source{source = Source,
1468+
key = Key,
1469+
destination = Destination,
1470+
args = Args}
1471+
end,
1472+
ProjectionFun = projection_fun_for_sets(MapFun),
1473+
Options = #{type => bag,
1474+
keypos => #route_by_source.source,
1475+
read_concurrency => true},
1476+
Projection = khepri_projection:new(rabbit_khepri_route_by_source,
1477+
ProjectionFun,
1478+
Options),
1479+
Exchange = #if_data_matches{
1480+
pattern = #exchange{type = '$1', _ = '_'},
1481+
conditions = [{'andalso',
1482+
{'=/=', '$1', headers},
1483+
{'=/=', '$1', topic},
1484+
{'=/=', '$1', direct},
1485+
{'=/=', '$1', 'x-local-random'},
1486+
{'=/=', '$1', 'x-jms-topic'}
1487+
}]},
1488+
PathPattern = rabbit_db_binding:khepri_route_path(
1489+
_VHost = ?KHEPRI_WILDCARD_STAR,
1490+
Exchange,
14401491
_Kind = ?KHEPRI_WILDCARD_STAR,
14411492
_DstName = ?KHEPRI_WILDCARD_STAR,
14421493
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
@@ -1755,6 +1806,22 @@ khepri_db_migration_post_enable(
17551806
_ = mnesia_to_khepri:rollback_table_copy(?STORE_ID, ?MIGRATION_ID),
17561807
ok.
17571808

1809+
enable_feature_flag(#{command := enable,
1810+
feature_name := 'rabbitmq_4.2.0' = FeatureName}) ->
1811+
%% We unregister this projection because it's superseded by
1812+
%% rabbit_khepri_route_by_source_key introduced in 4.2.0
1813+
ProjectionName = rabbit_khepri_index_route,
1814+
Result = try khepri:unregister_projections(?STORE_ID, [ProjectionName])
1815+
catch _:Reason -> Reason
1816+
end,
1817+
?LOG_DEBUG(
1818+
"enabling feature flag ~s unregisters projection ~s: ~tp",
1819+
[FeatureName, ProjectionName, Result],
1820+
#{domain => ?RMQLOG_DOMAIN_DB}),
1821+
ok;
1822+
enable_feature_flag(_) ->
1823+
ok.
1824+
17581825
-spec sync_cluster_membership_from_mnesia(FeatureName) -> Ret when
17591826
FeatureName :: rabbit_feature_flags:feature_name(),
17601827
Ret :: ok | {error, Reason},

deps/rabbit/test/bindings_SUITE.erl

Lines changed: 56 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ all_tests() ->
4848
list_with_multiple_vhosts,
4949
list_with_multiple_arguments,
5050
bind_to_unknown_queue,
51-
binding_args,
51+
binding_args_direct_exchange,
52+
binding_args_fanout_exchange,
5253
%% Exchange bindings
5354
bind_and_unbind_exchange,
5455
bind_and_delete_exchange_source,
@@ -698,59 +699,60 @@ bind_to_unknown_queue(Config) ->
698699
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),
699700
ok.
700701

701-
binding_args(Config) ->
702-
case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0') of
703-
{skip, _} = Skip ->
704-
Skip;
705-
ok ->
706-
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
707-
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
708-
Q = ?config(queue_name, Config),
709-
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
710-
711-
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
712-
amqp_channel:register_confirm_handler(Ch, self()),
713-
714-
%% Create two bindings that differ only in their binding arguments.
715-
Exchange = <<"amq.direct">>,
716-
RoutingKey = <<"some-key">>,
717-
BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}],
718-
BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}],
719-
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
720-
routing_key = RoutingKey,
721-
queue = Q,
722-
arguments = BindingArgs1}),
723-
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
724-
routing_key = RoutingKey,
725-
queue = Q,
726-
arguments = BindingArgs2}),
727-
ok = amqp_channel:cast(Ch,
728-
#'basic.publish'{exchange = Exchange,
729-
routing_key = RoutingKey},
730-
#amqp_msg{payload = <<"m1">>}),
731-
receive #'basic.ack'{} -> ok
732-
after 9000 -> ct:fail(confirm_timeout)
733-
end,
734-
735-
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
736-
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})),
737-
738-
%% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding.
739-
#'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange,
740-
routing_key = RoutingKey,
741-
queue = Q,
742-
arguments = BindingArgs1}),
743-
ok = amqp_channel:cast(Ch,
744-
#'basic.publish'{exchange = Exchange,
745-
routing_key = RoutingKey},
746-
#amqp_msg{payload = <<"m2">>}),
747-
receive #'basic.ack'{} -> ok
748-
after 9000 -> ct:fail(confirm_timeout)
749-
end,
750-
751-
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
752-
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true}))
753-
end.
702+
%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/14533
703+
binding_args_direct_exchange(Config) ->
704+
binding_args(<<"amq.direct">>, Config).
705+
706+
binding_args_fanout_exchange(Config) ->
707+
binding_args(<<"amq.fanout">>, Config).
708+
709+
binding_args(Exchange, Config) ->
710+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
711+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
712+
Q = ?config(queue_name, Config),
713+
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
714+
715+
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
716+
amqp_channel:register_confirm_handler(Ch, self()),
717+
718+
%% Create two bindings that differ only in their binding arguments.
719+
RoutingKey = <<"some-key">>,
720+
BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}],
721+
BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}],
722+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
723+
routing_key = RoutingKey,
724+
queue = Q,
725+
arguments = BindingArgs1}),
726+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
727+
routing_key = RoutingKey,
728+
queue = Q,
729+
arguments = BindingArgs2}),
730+
ok = amqp_channel:cast(Ch,
731+
#'basic.publish'{exchange = Exchange,
732+
routing_key = RoutingKey},
733+
#amqp_msg{payload = <<"m1">>}),
734+
receive #'basic.ack'{} -> ok
735+
after 9000 -> ct:fail(confirm_timeout)
736+
end,
737+
738+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
739+
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})),
740+
741+
%% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding.
742+
#'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange,
743+
routing_key = RoutingKey,
744+
queue = Q,
745+
arguments = BindingArgs1}),
746+
ok = amqp_channel:cast(Ch,
747+
#'basic.publish'{exchange = Exchange,
748+
routing_key = RoutingKey},
749+
#amqp_msg{payload = <<"m2">>}),
750+
receive #'basic.ack'{} -> ok
751+
after 9000 -> ct:fail(confirm_timeout)
752+
end,
753+
754+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
755+
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})).
754756

755757
bind_and_unbind_exchange(Config) ->
756758
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),

deps/rabbit_common/include/rabbit.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
-record(route, {binding, value = const}).
9595
-record(reverse_route, {reverse_binding, value = const}).
9696
-record(index_route, {source_key, destination, args = []}).
97+
-record(route_by_source, {source, key, destination, args = []}).
9798

9899
-record(binding, {source, key, destination, args = []}).
99100
-record(reverse_binding, {destination, key, source, args = []}).

0 commit comments

Comments
 (0)