Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: how to handle redelivery #588

Open
JBastiaan opened this issue Jul 22, 2024 · 3 comments
Open

Question: how to handle redelivery #588

JBastiaan opened this issue Jul 22, 2024 · 3 comments

Comments

@JBastiaan
Copy link

JBastiaan commented Jul 22, 2024

Hello,

I'm using amqpnetlite to connect to an activemq classic broker. Im struggling how to implement redeliveries and moving it to DLQ.
Specifically when an exception occurs when processing in OnMessage. A message i can choose to Reject, Release or Modify from what i can see in the API. Ive tried to reject, outcome is on activemq the message is marked as Dequeued. I've also tried to Release, i seem to get like unlimited redeliveries then, only when i quit the app the redelivery count is increased in activemq.
Same for modify, i get unlimited redeliveries, delivery count keeps increasing on consumer side, but not on broker side.

Expected behavior:
Redelivery is increased on activemq broker side so it should be automatically moved to a configured DLQ.

What am i missing here?

Edit:
I also tried publishing the message myself to DLQ using a separate ISenderLink, however i get an exception saying: "Message was not received by this link"

My consumer code:


internal class QueueConsumer<T> : IHostedService
    where T : class
{
    private readonly ILogger<QueueConsumer<T>> _logger;
    private readonly IServiceScopeFactory _serviceScopeFactory;

    private readonly Lazy<Task<IReceiverLink>> _receiver;

    internal QueueConsumer(
        ILogger<QueueConsumer<T>> logger,
        IConnectionAdapter connectionAdapter,
        IServiceScopeFactory serviceScopeFactory,
        string queueName)
    {
        _logger = logger;
        _serviceScopeFactory = serviceScopeFactory;
        _receiver = connectionAdapter.CreateQueueReceiver(
            $"Read.{queueName}",
            $"queue://{queueName}");
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var receiver = await _receiver.Value;
        receiver.Start(1, OnMessage);
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        var receiver = await _receiver.Value;
        await receiver.CloseAsync();
    }

    private async void OnMessage(IReceiverLink receiver, Message message)
    {
        try
        {
            using var body = new MemoryStream((message.Body as byte[])!);
            var deserializer = new XmlSerializer(typeof(T));
            var msg = deserializer.Deserialize(body) as T ??
                      throw new SerializationException(
                          $"Failed to deserialize message of type {typeof(T).FullName}");

            var scope = _serviceScopeFactory.CreateAsyncScope();
            var mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
            await mediator.Publish(new ActiveMqMessage<T>
            {
                MessageProperties = message.Properties,
                Message = msg
            });

            receiver.Accept(message);
        }
        catch (Exception e)
        {
            _logger.LogError(e, "Failed to process message of type {Type}", typeof(T).FullName);

            try
            {
                //Also tried Reject and Release here
               //Also tried the following (gives Message was not received by this link) :                
                //var dlqSender = await _dlqSender.Value;
                //await dlqSender.SendAsync(message);
                //receiver.Accept(message);


                receiver.Modify(
                    message,
                    deliveryFailed: true,
                    undeliverableHere: false,
                    new Fields());
            }
            catch (Exception ex)
            {
                //If it ends up here it might mean we failed to release the message and
                //we cant receive more messages due to our credit being exhausted
                _logger.LogCritical(ex,
                    "Failed to send modification of failed message to peer of type {Type}",
                    typeof(T).FullName);
            }
        }
    }
}```
@xinchen10
Copy link
Member

Try undeliverableHere: true in the Modify call. It worked for me. The message was moved to the ActiveMQ.DLQ queue.

Some activemq broker configurations could also affect the redelivery and deadlettering behavior. For details check out https://activemq.apache.org/components/classic/documentation/message-redelivery-and-dlq-handling

@JBastiaan
Copy link
Author

JBastiaan commented Jul 23, 2024

Hey, thanks for your response. I figured out why the message wasn't moved to DLQ, i was not setting the Durable header to true when publishing my test messages. So now when i reject or modify a message as undeliverableHere it gets moved to DLQ.

However i dont entirely understand yet why receiver.Release(message) and receiver.Modify(message) doesn't seem to get synced to the broker. When i release i just get infinite retries with the same delivery count, and if i modify as in the snippet i posted i do get increased delivery counts but that does not seem to get synced to the broker and i basically get infinite retries aswell.

Is it expected that we should check the delivery count on consumer side and then reject or mark as undeliverable here so it moves to DLQ? I kind of would have expected the broker to move the message to DLQ after consumer side marked it as deliveryfailed for x amount of times.

Note:
I'm using the out of the box activemq configuration, only thing i've modified is this:

                <policyEntries>
                    <policyEntry queue=">" producerFlowControl="true" memoryLimit="25mb" queuePrefetch="1" reduceMemoryFootprint="true">
                        <pendingMessageLimitStrategy>
                            <constantPendingMessageLimitStrategy limit="1000"/>
                        </pendingMessageLimitStrategy>
                        <!-- ICC standards: Message Redelivery and DLQ Handling
                            http://activemq.apache.org/message-redelivery-and-dlq-handling.html
                        -->
                        <deadLetterStrategy>
                            <!--
                            Use the suffix '.DLQ' for the destination name, and make
                            the DLQ a queue rather than a topic
                            -->
                            <individualDeadLetterStrategy topicSuffix=".DLQ" topicPrefix="" queueSuffix=".DLQ" queuePrefix="" useQueueForQueueMessages="true"/>
                        </deadLetterStrategy>
                    </policyEntry>
                </policyEntries>

//Johan

@xinchen10
Copy link
Member

Released and modified outcomes are explained in the specification (section 3.4.4 and 3.4.5).
http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-complete-v1.0-os.pdf.

It is expected that you get the same message repeatedly for released and modified with undeliverableHere = false.

Deadlettering based on delivery-count is broker specific. It might be better to ask the ActiveMQ community how that is handled.

By default ActiveMQ only considers persistent message for deadlettering. In my testing I added the following to the broker config.

                <policyEntry queue=">">
                  <deadLetterStrategy>
                    <sharedDeadLetterStrategy processNonPersistent="true"/>
                  </deadLetterStrategy>
                </policyEntry>

I think setting the message header has the same effect.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants