-
Notifications
You must be signed in to change notification settings - Fork 0
Transient processing errors
The library can handle transient errors that may occur in your controller during message processing.
An error is transient when it is possible to retry the operation again without getting the same error. For example, during message processing an external API responded with 503 (Service Unavailable) status code. There is a high chance that request will succeed if retried shortly after.
Unhandled exceptions thrown by your ProcessAsync
method are considered as non-transient by default. Message processing is not retried in this case.
If you know that some of the exceptions thrown by your controller are transient you can catch them and throw a TransientException
instead:
public class MyController : IKafkaController<MyMessage>
{
public Task ProcessAsync(MessageInfo<MyMessage>[] messages, CancellationToken token)
{
try
{
// process your messages
}
catch (SomeException ex)
{
// this indicates transient exception:
throw new TransientException(ex);
}
}
}
In this case, the consumer jumps back to the previously committed offsets and accumulates the same messages again. This is done to keep the consumer "alive" from the Kafka point of view and avoid its kicking out in case of long retries.
The accumulated messages are then transferred to your controller. This results in duplicate message processing.
By default, consumer makes a 5 seconds pause before re-consuming but you can change this interval in the appsettings.json
:
{
"Kafka": {
"ConsumerGroups": [{
"GroupId": "<group-1>",
"ErrorHandling": {
"PauseAfterTransientErrorMs": xxx // <-- supply your value
}
}]
}
}