Skip to content

Transient processing errors

Artem Utkin edited this page May 28, 2023 · 1 revision

Transient processing errors

The library can handle transient errors that may occur in your controller during message processing.

An error is transient when it is possible to retry the operation again without getting the same error. For example, during message processing an external API responded with 503 (Service Unavailable) status code. There is a high chance that request will succeed if retried shortly after.

Unhandled exceptions thrown by your ProcessAsync method are considered as non-transient by default. Message processing is not retried in this case.

If you know that some of the exceptions thrown by your controller are transient you can catch them and throw a TransientException instead:

public class MyController : IKafkaController<MyMessage>
{
    public Task ProcessAsync(MessageInfo<MyMessage>[] messages, CancellationToken token)
    {
        try
        {
            // process your messages
        }
        catch (SomeException ex)
        {
            // this indicates transient exception:
            throw new TransientException(ex);
        }
    }
}

In this case, the consumer jumps back to the previously committed offsets and accumulates the same messages again. This is done to keep the consumer "alive" from the Kafka point of view and avoid its kicking out in case of long retries.

The accumulated messages are then transferred to your controller. This results in duplicate message processing.

By default, consumer makes a 5 seconds pause before re-consuming but you can change this interval in the appsettings.json:

{
  "Kafka": {
    "ConsumerGroups": [{
       "GroupId": "<group-1>",
       "ErrorHandling": {
         "PauseAfterTransientErrorMs": xxx // <-- supply your value
       }
    }]
  }
}
Clone this wiki locally