-
Notifications
You must be signed in to change notification settings - Fork 4k
Speed up fanout exchange #14546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Speed up fanout exchange #14546
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
9f8112c to
36b0954
Compare
a1a6bbc to
1c1c91f
Compare
This commit adds a test case for a regression/bug that occurs in Khepri. ``` make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=mnesia ``` succeeds, but ``` make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=khepri ``` fails. The problem is that ETS table `rabbit_khepri_index_route` cannot differentiate between two bindings with different binding arguments, and therefore deletes entries too early, leading to wrong routing decisions. The solution to this bug is to include the binding arguments in the `rabbit_khepri_index_route` projection, similar to how the binding args are also included in the `rabbit_index_route` Mnesia table. This bug/regression is an edge case and exists if the source exchange type is `direct` or `fanout` and if different bindings arguments are used by client apps. Note that such binding arguments are entirely ignored when RabbitMQ performs routing decisions for the `direct` or `fanout` exchange. However, there might be client apps that use binding arguments to add some metadata to the binding, for example `app-id` or `user` or `purpose` and might use this metadata as a form of reference counting in deciding when to delete `auto-delete` exchanges or just for informational/operational purposes.
Resolves #14531 ## What? Increase end-to-end message throughput for messages routed via the fanout exchange by ~42% (see benchmark below). In addition to the fanout exchange, a similar speed up is achieved for the following exchange types: * modulus hash * random * recent history This applies only if Khepri is enabled. ## How? Use an additional routing table (projection) whose table key is the source exchange. Looking up the destinations happens then by an ETS table key. Prior to this commit, CPUs were busy compiling the same match spec for every incoming message. ## Benchmark 1. Start RabbitMQ: ``` make run-broker RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \ RABBITMQ_CONFIG_FILE="advanced.config" PLUGINS="rabbitmq_management" ``` where `advanced.config` contains: ``` [ {rabbitmq_management_agent, [ {disable_metrics_collector, true} ]} ]. ``` 2. Create a queue and binding: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=classic durable=true name=q1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=q1 ``` 3. Create the load ``` java -jar target/perf-test.jar -p -e amq.fanout -u q1 -s 5 --autoack -z 60 ``` Before this commit: ``` sending rate avg: 97394 msg/s receiving rate avg: 97394 msg/s ``` After this commit: ``` sending rate avg: 138677 msg/s receiving rate avg: 138677 msg/s ``` The CPU flamegraph shows that `rabbit_exchange:route/3` consumes the following CPU amounts: * 13.5% before this commit * 3.4% after this commit ## Downsides Additional ETS memory usage for the new projection table. However, the new table does not store any binding entries for the following source exchange types: * direct * headers * topic * x-local-random
Test that exchange bindings work correctly with the new projection tables `rabbit_khepri_route_by_source` and `rabbit_khepri_route_by_source_key`.
1c1c91f to
4876315
Compare
ansd
commented
Sep 16, 2025
dumbbell
requested changes
Sep 16, 2025
Khepri won’t modify a projection that is already registered (based on its name).
See #11667 (comment) for rationale.
dumbbell
approved these changes
Sep 17, 2025
the-mikedavis
approved these changes
Sep 17, 2025
Collaborator
the-mikedavis
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
I see similar results locally for that perf-test scenario. main:
id: test-100516-125, sending rate avg: 95139 msg/s
id: test-100516-125, receiving rate avg: 95139 msg/s
This PR:
id: test-100853-180, sending rate avg: 140581 msg/s
id: test-100853-180, receiving rate avg: 140581 msg/s
Member
Author
|
As always, thanks a lot to both of you @dumbbell @the-mikedavis for your thorough reviews! |
mergify bot
pushed a commit
that referenced
this pull request
Sep 17, 2025
* Add test case for binding args Khepri regression This commit adds a test case for a regression/bug that occurs in Khepri. ``` make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=mnesia ``` succeeds, but ``` make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=khepri ``` fails. The problem is that ETS table `rabbit_khepri_index_route` cannot differentiate between two bindings with different binding arguments, and therefore deletes entries too early, leading to wrong routing decisions. The solution to this bug is to include the binding arguments in the `rabbit_khepri_index_route` projection, similar to how the binding args are also included in the `rabbit_index_route` Mnesia table. This bug/regression is an edge case and exists if the source exchange type is `direct` or `fanout` and if different bindings arguments are used by client apps. Note that such binding arguments are entirely ignored when RabbitMQ performs routing decisions for the `direct` or `fanout` exchange. However, there might be client apps that use binding arguments to add some metadata to the binding, for example `app-id` or `user` or `purpose` and might use this metadata as a form of reference counting in deciding when to delete `auto-delete` exchanges or just for informational/operational purposes. * Fix regression with Khepri binding args Fix #14533 * Speed up fanout exchange Resolves #14531 ## What? Increase end-to-end message throughput for messages routed via the fanout exchange by ~42% (see benchmark below). In addition to the fanout exchange, a similar speed up is achieved for the following exchange types: * modulus hash * random * recent history This applies only if Khepri is enabled. ## How? Use an additional routing table (projection) whose table key is the source exchange. Looking up the destinations happens then by an ETS table key. Prior to this commit, CPUs were busy compiling the same match spec for every incoming message. ## Benchmark 1. Start RabbitMQ: ``` make run-broker RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \ RABBITMQ_CONFIG_FILE="advanced.config" PLUGINS="rabbitmq_management" ``` where `advanced.config` contains: ``` [ {rabbitmq_management_agent, [ {disable_metrics_collector, true} ]} ]. ``` 2. Create a queue and binding: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=classic durable=true name=q1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=q1 ``` 3. Create the load ``` java -jar target/perf-test.jar -p -e amq.fanout -u q1 -s 5 --autoack -z 60 ``` Before this commit: ``` sending rate avg: 97394 msg/s receiving rate avg: 97394 msg/s ``` After this commit: ``` sending rate avg: 138677 msg/s receiving rate avg: 138677 msg/s ``` The CPU flamegraph shows that `rabbit_exchange:route/3` consumes the following CPU amounts: * 13.5% before this commit * 3.4% after this commit ## Downsides Additional ETS memory usage for the new projection table. However, the new table does not store any binding entries for the following source exchange types: * direct * headers * topic * x-local-random * Add exchange binding tests Test that exchange bindings work correctly with the new projection tables `rabbit_khepri_route_by_source` and `rabbit_khepri_route_by_source_key`. * Always register all projections Khepri won’t modify a projection that is already registered (based on its name). * Protect ets:lookup_element/4 in try catch See #11667 (comment) for rationale. (cherry picked from commit a66c716)
ansd
added a commit
that referenced
this pull request
Sep 17, 2025
* Add test case for binding args Khepri regression This commit adds a test case for a regression/bug that occurs in Khepri. ``` make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=mnesia ``` succeeds, but ``` make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=khepri ``` fails. The problem is that ETS table `rabbit_khepri_index_route` cannot differentiate between two bindings with different binding arguments, and therefore deletes entries too early, leading to wrong routing decisions. The solution to this bug is to include the binding arguments in the `rabbit_khepri_index_route` projection, similar to how the binding args are also included in the `rabbit_index_route` Mnesia table. This bug/regression is an edge case and exists if the source exchange type is `direct` or `fanout` and if different bindings arguments are used by client apps. Note that such binding arguments are entirely ignored when RabbitMQ performs routing decisions for the `direct` or `fanout` exchange. However, there might be client apps that use binding arguments to add some metadata to the binding, for example `app-id` or `user` or `purpose` and might use this metadata as a form of reference counting in deciding when to delete `auto-delete` exchanges or just for informational/operational purposes. * Fix regression with Khepri binding args Fix #14533 * Speed up fanout exchange Resolves #14531 ## What? Increase end-to-end message throughput for messages routed via the fanout exchange by ~42% (see benchmark below). In addition to the fanout exchange, a similar speed up is achieved for the following exchange types: * modulus hash * random * recent history This applies only if Khepri is enabled. ## How? Use an additional routing table (projection) whose table key is the source exchange. Looking up the destinations happens then by an ETS table key. Prior to this commit, CPUs were busy compiling the same match spec for every incoming message. ## Benchmark 1. Start RabbitMQ: ``` make run-broker RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \ RABBITMQ_CONFIG_FILE="advanced.config" PLUGINS="rabbitmq_management" ``` where `advanced.config` contains: ``` [ {rabbitmq_management_agent, [ {disable_metrics_collector, true} ]} ]. ``` 2. Create a queue and binding: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=classic durable=true name=q1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=q1 ``` 3. Create the load ``` java -jar target/perf-test.jar -p -e amq.fanout -u q1 -s 5 --autoack -z 60 ``` Before this commit: ``` sending rate avg: 97394 msg/s receiving rate avg: 97394 msg/s ``` After this commit: ``` sending rate avg: 138677 msg/s receiving rate avg: 138677 msg/s ``` The CPU flamegraph shows that `rabbit_exchange:route/3` consumes the following CPU amounts: * 13.5% before this commit * 3.4% after this commit ## Downsides Additional ETS memory usage for the new projection table. However, the new table does not store any binding entries for the following source exchange types: * direct * headers * topic * x-local-random * Add exchange binding tests Test that exchange bindings work correctly with the new projection tables `rabbit_khepri_route_by_source` and `rabbit_khepri_route_by_source_key`. * Always register all projections Khepri won’t modify a projection that is already registered (based on its name). * Protect ets:lookup_element/4 in try catch See #11667 (comment) for rationale. (cherry picked from commit a66c716) Co-authored-by: David Ansari <david.ansari@gmx.de>
ansd
added a commit
that referenced
this pull request
Sep 18, 2025
ansd
added a commit
that referenced
this pull request
Sep 18, 2025
(cherry picked from commit b3a58b8)
michaelklishin
pushed a commit
that referenced
this pull request
Sep 24, 2025
* Add test case for binding args Khepri regression This commit adds a test case for a regression/bug that occurs in Khepri. ``` make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=mnesia ``` succeeds, but ``` make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=khepri ``` fails. The problem is that ETS table `rabbit_khepri_index_route` cannot differentiate between two bindings with different binding arguments, and therefore deletes entries too early, leading to wrong routing decisions. The solution to this bug is to include the binding arguments in the `rabbit_khepri_index_route` projection, similar to how the binding args are also included in the `rabbit_index_route` Mnesia table. This bug/regression is an edge case and exists if the source exchange type is `direct` or `fanout` and if different bindings arguments are used by client apps. Note that such binding arguments are entirely ignored when RabbitMQ performs routing decisions for the `direct` or `fanout` exchange. However, there might be client apps that use binding arguments to add some metadata to the binding, for example `app-id` or `user` or `purpose` and might use this metadata as a form of reference counting in deciding when to delete `auto-delete` exchanges or just for informational/operational purposes. * Fix regression with Khepri binding args Fix #14533 * Speed up fanout exchange Resolves #14531 ## What? Increase end-to-end message throughput for messages routed via the fanout exchange by ~42% (see benchmark below). In addition to the fanout exchange, a similar speed up is achieved for the following exchange types: * modulus hash * random * recent history This applies only if Khepri is enabled. ## How? Use an additional routing table (projection) whose table key is the source exchange. Looking up the destinations happens then by an ETS table key. Prior to this commit, CPUs were busy compiling the same match spec for every incoming message. ## Benchmark 1. Start RabbitMQ: ``` make run-broker RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \ RABBITMQ_CONFIG_FILE="advanced.config" PLUGINS="rabbitmq_management" ``` where `advanced.config` contains: ``` [ {rabbitmq_management_agent, [ {disable_metrics_collector, true} ]} ]. ``` 2. Create a queue and binding: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=classic durable=true name=q1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=q1 ``` 3. Create the load ``` java -jar target/perf-test.jar -p -e amq.fanout -u q1 -s 5 --autoack -z 60 ``` Before this commit: ``` sending rate avg: 97394 msg/s receiving rate avg: 97394 msg/s ``` After this commit: ``` sending rate avg: 138677 msg/s receiving rate avg: 138677 msg/s ``` The CPU flamegraph shows that `rabbit_exchange:route/3` consumes the following CPU amounts: * 13.5% before this commit * 3.4% after this commit ## Downsides Additional ETS memory usage for the new projection table. However, the new table does not store any binding entries for the following source exchange types: * direct * headers * topic * x-local-random * Add exchange binding tests Test that exchange bindings work correctly with the new projection tables `rabbit_khepri_route_by_source` and `rabbit_khepri_route_by_source_key`. * Always register all projections Khepri won’t modify a projection that is already registered (based on its name). * Protect ets:lookup_element/4 in try catch See #11667 (comment) for rationale.
michaelklishin
pushed a commit
that referenced
this pull request
Sep 24, 2025
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Resolves #14531
Resolves #14533
What?
Increase end-to-end message throughput for messages routed via the fanout exchange by ~42% (see benchmark below).
In addition to the fanout exchange, a similar speed up is achieved for the following exchange types:
This applies only if Khepri is enabled.
How?
Use an additional routing table (projection)
rabbit_khepri_route_by_sourcewhose table key is the source exchange.Looking up the destinations happens then by an ETS table key.
Prior to this commit, CPUs were busy compiling the same match spec for every incoming message.
For the bug fix of #14533, we also introduce a new projection called
rabbit_khepri_route_by_source_key. This can be thought of a v2 of the oldrabbit_khepri_index_routeprojection. The oldrabbit_khepri_index_routeprojection gets deleted in therabbitmq_4.2.0feature flag callback. The two new projections are registered at boot time (as discussed in #14543 (comment))Benchmark
where
advanced.configcontains:Before this commit:
After this commit:
The CPU flamegraph shows that
rabbit_exchange:route/3consumes the following CPU amounts:Downsides
Additional ETS memory usage for the new projection table.
However, the new table does not store any binding entries for the following
source exchange types:
Also, care must be taken to not add extensive amounts of bindings with the same source exchange to projection
rabbit_khepri_route_by_source. For fanout exchanges, binding an extensive amount of queues (e.g. thousands) to the same fanout exchanges is nonsensical anyway. One edge case could be binding thousands of queues to therandomexchange type. But even such an exotic use case could be realised efficiently by using intermediate random exchanges (i.e. exchange-to-exchange bindings) 🙂