Skip to content

Error handler for async RabbitListeners (Mono, Future, suspend) not reacting AmqpRejectAndDontRequeueException #3163

@jensbaitingerbosch

Description

@jensbaitingerbosch

In what version(s) of Spring AMQP are you seeing this issue?

For example:

3.2.6

Describe the bug

For sync Message handlers, the error handler (RabbitListenerErrorHandler) can throw an AmqpRejectAndDontRequeueException, the message is then deadlettered (asuming the in-queue has a deadletter-exchange configured).

For an async listeners (methods annotated with @RabbitListener end returning Mono or Future or being a Kotlin suspend functions) that is not the case. When the Error Hander throws any exception (including an AmqpRejectAndDontRequeueException), the message will be nacked with requeue=true resulting an an endless loop. When Error handler returns a value, the messase is not acked either.

The logged error message is:

16:47:50.649 [DefaultDispatcher-worker-2 @coroutine#3288] ERROR o.s.a.r.l.a.MessagingMessageListenerAdapter - Future, Mono, or suspend function was completed with an exception for (Body:'{"this":"is not a valid message"}' MessageProperties [headers={x-delivery-count=3135, traceparent=00-68a5e0018bfa76845f473c7ef2501588-192f6d5acaefac54-00}, type=someEvent, correlationId=123-abc-456, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=..., deliveryTag=63, consumerTag=amq.ctag-FVilYqFsNGCOikPuqNJU6w, consumerQueue=...])

To Reproduce
have a Listener

@Component
class SomeListener {
   @RabbitListener(..., errorHandler = "errorHandler")
   fun doSomething(message: Message) = Mono.error (
      throw IllegalStateException("Oops, I cannot process any message")
   )
}
   
@Component
class ErrorHandler: RabbitListenerErrorHandler {
    override fun handleError(
            amqpMessage: Message,
            channel: Channel,
            message: org.springframework.messaging.Message<*>?,
            exception: ListenerExecutionFailedException,
        ): Unit? {
       // just deadletter everything
       throw AmqpRejectAndDontRequeueException(...)
    }

Expected behavior

The message is deadlettered.

** Workaround **

Let the RabbitListenerErrorHandler for these async rabbit listeners ack/nack the message directly insted of returning null or throwing an exception. NOTE: you cannot reuse that errorhandler for sync rabbit Listeners then.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions