Automatic rate limiting feature might cause duplicate messages #1236
Description
Describe the bug
Description
When using the automatic rate limiting feature with the RabbitMQ transport, the functionality might produce duplicate messages in some edge cases.
Expected behavior
When automatic rate limiting kicks in, message processing should be reduced to a single message at the time with the configured delay interval between further processing attempts until message processing succeeds again.
Actual behavior
In the specific cases where the configured consecutive failure counter is reached and the rate-limiting starts to kick in, a message that should be handed over to delayed retries or to the error queue can be duplicated. There is no impact on messages that are handled via immediate retries. This also only happens when the failure counter reaches the limit, further failures while the rate limiting is enabled do not create message duplicates.
This is caused by the implementation which tries to reduce the transports concurrency setting once the configured failure counter is reached. The concurrency change is achieved by reconnecting to the broker with adjusted concurrency settings. However, this is done while the (failed) message is still being processed. After the connection is reset, the error handling pipeline tries to send a copy of the message for delayed delivery or to the error queue, while the failed message is expected to be acknowledged afterward. While sending messages works with the new connection, the currently processed message can't be acknowledged via the new connection. Therefore the message will be available for processing again even though a copy has been sent for delayed retries or to the error queue.
Versions
- 8.0
- not yet verified on version 7
Steps to reproduce
With the following configuration:
endpointConfiguration.Recoverability()
.Delayed(d => d.NumberOfRetries(1))
.Immediate(i => i.NumberOfRetries(1))
.OnConsecutiveFailures(2,
new RateLimitSettings(TimeSpan.FromSeconds(10),
token =>
{
Console.WriteLine("rate limit started");
return Task.CompletedTask;
}, token =>
{
Console.WriteLine("rate limit ended");
return Task.CompletedTask;
}));
send a message to a handler throwing an exception. See the following log example.
Relevant log output
2023-05-01 16:07:51.737 INFO Processing message
message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Delayed retry counter: 0
2023-05-01 16:07:51.932 INFO Immediate Retry is going to retry message '96dff5e4-f63b-4d60-8a48-aff500e8df38' because of an exception:
System.Exception: pewpew
at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in \Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22
at (ArrayClosure, Object, Object, IMessageHandlerContext)
at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext)
at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context)
at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
at NServiceBus.MutateIncomingMessageBehavior.InvokeIncomingMessageMutators(IIncomingLogicalMessageContext context, Func`2 next)
at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
at NServiceBus.UnitOfWorkBehavior.InvokeUnitsOfWork(IIncomingPhysicalMessageContext context, Func`2 next)
at NServiceBus.UnitOfWorkBehavior.InvokeUnitsOfWork(IIncomingPhysicalMessageContext context, Func`2 next)
at NServiceBus.MutateIncomingTransportMessageBehavior.InvokeIncomingTransportMessagesMutators(IIncomingPhysicalMessageContext context, Func`2 next)
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next)
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)
Exception details:
Message type: MyCommand
Handler type: MyCommandHandler
Handler start time: 2023-05-01 14:07:51:736450 Z
Handler failure time: 2023-05-01 14:07:51:767432 Z
Handler canceled: False
Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Pipeline canceled: False
2023-05-01 16:07:51.953 INFO Processing message
message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Delayed retry counter: 0
2023-05-01 16:07:52.060 WARN The consecutive failures circuit breaker is now in the armed state
2023-05-01 16:07:52.062 INFO Pause message processing for 0:00:10 due to enabled consecutive failures mode.
2023-05-01 16:07:52.063 INFO Calling a change concurrency and reconnecting with new value 1.
2023-05-01 16:07:52.076 INFO 'Samples.RabbitMQ.SimpleReceiver MessagePump': Attempting to reconnect in 10 seconds.
2023-05-01 16:08:02.080 WARN Delayed Retry will reschedule message '96dff5e4-f63b-4d60-8a48-aff500e8df38' after a delay of 00:00:10 because of an exception:
System.Exception: pewpew
at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in \Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22
at (ArrayClosure, Object, Object, IMessageHandlerContext)
at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext)
at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context)
at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next)
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)
Exception details:
Message type: MyCommand
Handler type: MyCommandHandler
Handler start time: 2023-05-01 14:07:51:953324 Z
Handler failure time: 2023-05-01 14:07:51:967382 Z
Handler canceled: False
Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Pipeline canceled: False
2023-05-01 16:08:02.103 INFO 'Samples.RabbitMQ.SimpleReceiver MessagePump': Connection to the broker reestablished successfully.
2023-05-01 16:08:02.106 INFO Processing message
message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Delayed retry counter: 0
2023-05-01 16:08:02.214 INFO Pause message processing for 0:00:10 due to enabled consecutive failures mode.
2023-05-01 16:08:02.217 WARN Failed to acknowledge message '96dff5e4-f63b-4d60-8a48-aff500e8df38' because the channel was closed. The message was returned to the queue.
RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text='Goodbye', classId=0, methodId=0
at RabbitMQ.Client.Impl.SessionBase.Transmit(OutgoingCommand& cmd)
at RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemory`1 body)
at RabbitMQ.Client.Framing.Impl.Model.BasicAck(UInt64 deliveryTag, Boolean multiple)
at NServiceBus.Transport.RabbitMQ.ModelExtensions.BasicAckSingle(IModel channel, UInt64 deliveryTag) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/ModelExtensions.cs:line 9
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken) in /_/src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs:line 431
2023-05-01 16:08:12.227 WARN Delayed Retry will reschedule message '96dff5e4-f63b-4d60-8a48-aff500e8df38' after a delay of 00:00:10 because of an exception:
System.Exception: pewpew
at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in \Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22
at (ArrayClosure, Object, Object, IMessageHandlerContext)
at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext)
at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context)
at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next)
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)
Exception details:
Message type: MyCommand
Handler type: MyCommandHandler
Handler start time: 2023-05-01 14:08:02:106038 Z
Handler failure time: 2023-05-01 14:08:02:120234 Z
Handler canceled: False
Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Pipeline canceled: False
2023-05-01 16:08:12.237 INFO Processing message
message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Delayed retry counter: 1
2023-05-01 16:08:12.416 INFO Pause message processing for 0:00:10 due to enabled consecutive failures mode.
2023-05-01 16:08:22.423 INFO Immediate Retry is going to retry message '96dff5e4-f63b-4d60-8a48-aff500e8df38' because of an exception:
System.Exception: pewpew
at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in \Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22
at (ArrayClosure, Object, Object, IMessageHandlerContext)
at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext)
at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context)
at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next)
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)
Exception details:
Message type: MyCommand
Handler type: MyCommandHandler
Handler start time: 2023-05-01 14:08:12:237766 Z
Handler failure time: 2023-05-01 14:08:12:264706 Z
Handler canceled: False
Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Pipeline canceled: False
2023-05-01 16:08:22.426 INFO Processing message
message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Delayed retry counter: 1
2023-05-01 16:08:22.593 INFO Pause message processing for 0:00:10 due to enabled consecutive failures mode.
2023-05-01 16:08:32.608 INFO Immediate Retry is going to retry message '96dff5e4-f63b-4d60-8a48-aff500e8df38' because of an exception:
System.Exception: pewpew
at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in \Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22
at (ArrayClosure, Object, Object, IMessageHandlerContext)
at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext)
at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context)
at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next)
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)
Exception details:
Message type: MyCommand
Handler type: MyCommandHandler
Handler start time: 2023-05-01 14:08:22:426253 Z
Handler failure time: 2023-05-01 14:08:22:446579 Z
Handler canceled: False
Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Pipeline canceled: False
2023-05-01 16:08:32.610 INFO Processing message
message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Delayed retry counter: 1
2023-05-01 16:08:32.769 INFO Pause message processing for 0:00:10 due to enabled consecutive failures mode.
2023-05-01 16:08:42.777 ERROR Moving message '96dff5e4-f63b-4d60-8a48-aff500e8df38' to the error queue 'error' because processing failed due to an exception:
System.Exception: pewpew
at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in \Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22
at (ArrayClosure, Object, Object, IMessageHandlerContext)
at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext)
at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context)
at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next)
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)
Exception details:
Message type: MyCommand
Handler type: MyCommandHandler
Handler start time: 2023-05-01 14:08:32:610920 Z
Handler failure time: 2023-05-01 14:08:32:629076 Z
Handler canceled: False
Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Pipeline canceled: False
2023-05-01 16:08:42.794 INFO Processing message
message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Delayed retry counter: 1
2023-05-01 16:08:42.959 INFO Pause message processing for 0:00:10 due to enabled consecutive failures mode.
2023-05-01 16:08:52.974 ERROR Moving message '96dff5e4-f63b-4d60-8a48-aff500e8df38' to the error queue 'error' because processing failed due to an exception:
System.Exception: pewpew
at MyCommandHandler.Handle(MyCommand commandMessage, IMessageHandlerContext context) in \Downloads\simple_rabbit_8_net7_0 (1)\Receiver\MyCommandHandler.cs:line 22
at (ArrayClosure, Object, Object, IMessageHandlerContext)
at NServiceBus.Pipeline.MessageHandler.Invoke(Object message, IMessageHandlerContext handlerContext)
at NServiceBus.InvokeHandlerTerminator.Terminate(IInvokeHandlerContext context)
at NServiceBus.LoadHandlersConnector.Invoke(IIncomingLogicalMessageContext context, Func`2 stage)
at NServiceBus.DeserializeMessageConnector.Invoke(IIncomingPhysicalMessageContext context, Func`2 stage)
at NServiceBus.ProcessingStatisticsBehavior.Invoke(IIncomingPhysicalMessageContext context, Func`2 next)
at NServiceBus.TransportReceiveToPhysicalMessageConnector.Invoke(ITransportReceiveContext context, Func`2 next)
at NServiceBus.RetryAcknowledgementBehavior.Invoke(ITransportReceiveContext context, Func`2 next)
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.MainPipelineExecutor.Invoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.AdaptiveMessageReceiver.WrappedInvoke(MessageContext messageContext, CancellationToken cancellationToken)
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, CancellationToken messageProcessingCancellationToken)
Exception details:
Message type: MyCommand
Handler type: MyCommandHandler
Handler start time: 2023-05-01 14:08:42:794772 Z
Handler failure time: 2023-05-01 14:08:42:813593 Z
Handler canceled: False
Message ID: 96dff5e4-f63b-4d60-8a48-aff500e8df38
Pipeline canceled: False
Additional Information
Workarounds
Possible solutions
- Ensure that reconnecting only happens when the remaining inflight messages have been processed.
Activity