Skip to content

Conversation

@mergify
Copy link

@mergify mergify bot commented Sep 16, 2024

This fixes some crash reports when using MQTT with Khepri, spotted by @mkuratczyk. With an OMQ stresstest:

omq mqtt --uri mqtt://localhost:1883 --uri mqtt://localhost:1884 --uri mqtt://localhost:1885 -x 10000 -y 10000 -r 1 --publish-to 'sensor/%d' --consume-from '/topic/sensor/%d' --mqtt-consumer-qos 1 --mqtt-publisher-qos 1

while a cluster restarts (make restart-cluster), we would see badmatch errors from matching on {ok, _} for rabbit_queue_type:delete/4 and exits for {normal, {gen_server2, call, [Pid, consumers, infinity]}}. That stress test causes queue churn since QoS1 MQTT creates transient exclusive classic queues. Restarting a node leads to very many queues being deleted which can overload Khepri and lead to timeouts.

The first commit makes a refactor to have rabbit_queue_type:delete/4 return {error, timeout} for timeout errors. {error, timeout} could already be returned and is handled in rabbit_amqqueue:delete_with/4. This change is just for consistency: in some places we returned a protocol_error record instead. The second commit handles the {error, timeout} result in rabbit_mqtt_processor.

Also included is a fix for rabbit_amqqueue:consumers/1 to catch exits: an exit can happen if another process asks for a classic queue's consumers while it is terminating. (With Khepri the terminate callback can take some time as it calls rabbit_amqqueue:internal_delete/2.)


This is an automatic backport of pull request #12317 done by Mergify.

This return value was already possible since a classic queue will return
it during termination if `rabbit_amqqueue:internal_delete/2` fails with
that value.

`rabbit_amqqueue:delete/4` already handles this value and converts it
into a protocol error and channel exit. The other caller (MQTT
processor) will be updated in a child commit.

This commit also replaces eager conversions to protocol errors in
rabbit_classic_queue, rabbit_quorum_queue and rabbit_stream_coordinator:
we should return `{error, timeout}` consistently and not hide it in
protocol errors.

(cherry picked from commit 9627903)
`delegate:invoke/2` catches errors but not exits of the delegate
process. Another process might query for a classic queue's consumers
while the classic queue is being deleted or otherwise terminating and
that would result in an exit of the calling process previously.

(cherry picked from commit a65ceb6)
@michaelklishin michaelklishin merged commit 9cbda0e into v4.0.x Sep 17, 2024
@michaelklishin michaelklishin deleted the mergify/bp/v4.0.x/pr-12317 branch September 17, 2024 02:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants