Skip to content

Logging and intake observer

Artem Utkin edited this page May 28, 2023 · 1 revision

Logging and intake observer

Logging

The library uses standard Microsoft.Extensions.Logging.ILogger<T> logger to write logs.

Intake observer

Additionally, you can observe every intake of messages individually. This might be useful when you want to create individual log documents for every intake of messages.

If it's your case, implement your observer by inheriting KafkaIntakeObserver<TMessage> class:

public class MyIntakeObserver : KafkaIntakeObserver<MyMessage>
{
}

The base class contains numerous methods that you can override in order to observe different kind of events:

  • OnConsumeError
  • OnNothingToProcess*
  • OnMessagesCollected
  • OnProcessingFinished
  • OnProcessingException
  • OnDeadLettering
  • OnDeadLetteringFinished
  • OnDeadLetteringFailed
  • OnCommitted
  • OnCommitError

* This method is called when the intake strategy signalled the intake cancellation but no messages were collected by that moment. This can happen when FixedInterval or MaxSizeWithTimeout intake strategy is used, see also Intake and intake strategies.

Lifetime

A new instance of your intake observer is created for every intake of messages and disposed at the end. There is a Dispose method that you can override if you need to dispose resources.

Registration

Register your intake observer in the consumer group options (Program.cs):

services.AddKafkaEventLoop(ctx.Configuration, o => o
  .HasConsumerGroup("<your-consumer-group-id>", cgOptions => cgOptions
    .HasMessageType<MyMessage>()
    .HasJsonMessageDeserializer()
    .HasController<MyController>()
    .HasCustomIntakeObserver<MyIntakeObserver>() // <-- registration
    .Build())
  .Build());
Clone this wiki locally