-
Notifications
You must be signed in to change notification settings - Fork 0
Logging and intake observer
The library uses standard Microsoft.Extensions.Logging.ILogger<T>
logger to write logs.
Additionally, you can observe every intake of messages individually. This might be useful when you want to create individual log documents for every intake of messages.
If it's your case, implement your observer by inheriting KafkaIntakeObserver<TMessage>
class:
public class MyIntakeObserver : KafkaIntakeObserver<MyMessage>
{
}
The base class contains numerous methods that you can override in order to observe different kind of events:
- OnConsumeError
- OnNothingToProcess*
- OnMessagesCollected
- OnProcessingFinished
- OnProcessingException
- OnDeadLettering
- OnDeadLetteringFinished
- OnDeadLetteringFailed
- OnCommitted
- OnCommitError
* This method is called when the intake strategy signalled the intake cancellation but no messages were collected by that moment. This can happen when FixedInterval or MaxSizeWithTimeout intake strategy is used, see also Intake and intake strategies.
A new instance of your intake observer is created for every intake of messages and disposed at the end. There is a Dispose
method that you can override if you need to dispose resources.
Register your intake observer in the consumer group options (Program.cs):
services.AddKafkaEventLoop(ctx.Configuration, o => o
.HasConsumerGroup("<your-consumer-group-id>", cgOptions => cgOptions
.HasMessageType<MyMessage>()
.HasJsonMessageDeserializer()
.HasController<MyController>()
.HasCustomIntakeObserver<MyIntakeObserver>() // <-- registration
.Build())
.Build());