-
Notifications
You must be signed in to change notification settings - Fork 646
Description
In what version(s) of Spring AMQP are you seeing this issue?
For example:
3.2.6
Describe the bug
For sync Message handlers, the error handler (RabbitListenerErrorHandler) can throw an AmqpRejectAndDontRequeueException, the message is then deadlettered (asuming the in-queue has a deadletter-exchange configured).
For an async listeners (methods annotated with @RabbitListener end returning Mono or Future or being a Kotlin suspend functions) that is not the case. When the Error Hander throws any exception (including an AmqpRejectAndDontRequeueException), the message will be nacked with requeue=true resulting an an endless loop. When Error handler returns a value, the messase is not acked either.
The logged error message is:
16:47:50.649 [DefaultDispatcher-worker-2 @coroutine#3288] ERROR o.s.a.r.l.a.MessagingMessageListenerAdapter - Future, Mono, or suspend function was completed with an exception for (Body:'{"this":"is not a valid message"}' MessageProperties [headers={x-delivery-count=3135, traceparent=00-68a5e0018bfa76845f473c7ef2501588-192f6d5acaefac54-00}, type=someEvent, correlationId=123-abc-456, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=..., deliveryTag=63, consumerTag=amq.ctag-FVilYqFsNGCOikPuqNJU6w, consumerQueue=...])
To Reproduce
have a Listener
@Component
class SomeListener {
@RabbitListener(..., errorHandler = "errorHandler")
fun doSomething(message: Message) = Mono.error (
throw IllegalStateException("Oops, I cannot process any message")
)
}
@Component
class ErrorHandler: RabbitListenerErrorHandler {
override fun handleError(
amqpMessage: Message,
channel: Channel,
message: org.springframework.messaging.Message<*>?,
exception: ListenerExecutionFailedException,
): Unit? {
// just deadletter everything
throw AmqpRejectAndDontRequeueException(...)
}
Expected behavior
The message is deadlettered.
** Workaround **
Let the RabbitListenerErrorHandler for these async rabbit listeners ack/nack the message directly insted of returning null or throwing an exception. NOTE: you cannot reuse that errorhandler for sync rabbit Listeners then.