Skip to content

Controller and message processing

Artem Utkin edited this page May 28, 2023 · 1 revision

Controller and message processing

Controller means a component that is responsible for message processing.

When the next intake of messages is accumulated the library will pass them to your controller.

How to create controller

1. Create controller class

You create controller by implementing IKafkaController<TMessage>:

public class MyController : IKafkaController<MyMessage>
{
    public Task ProcessAsync(MessageInfo<MyMessage>[] messages, CancellationToken token)
    {
        // process your messages
    }
}

You must implement 1 method only. This method receives an array of messages (intake) and a cancellation token. The cancellation token is signaled when the application is requested to stop.

MessageInfo<TMessage>

This record holds the message itself and contains additional metadata describing the Kafka message:

  • Value - message instance
  • EventTimeUtc - message timestamp. This value can be:
    • either CreateTime (time when producer record is created but before sending)
    • or LogAppendTime (time when the record was appended to the Kafka log on the broker),
    • it depends on your broker or topic configuration (by default it is CreateTime).
  • Topic - topic name,
  • Partition - partition,
  • Offset - message offset.

2. Register your controller (Program.cs):

services.AddKafkaEventLoop(ctx.Configuration, o => o
    .HasConsumerGroup("<your-consumer-group-id>", cgOptions => cgOptions
        .HasMessageType<MyMessage>()
        .HasJsonMessageDeserializer()
        .HasController<MyController>() // <---- controller registration
        .Build())
    .Build());

Controller behavior explained

Return when processing is finished

The ProcessAsync method returns a task that must be completed only when the given messages are successfully processed. In other words, you must wait for any external calls to the database, API, etc. before returning from the ProcessAsync method.

Do not try to process messages in a separate task and return from the method before the actual message processing is done. This might lead to undesirable side effects:

  • consumer will commit the message offsets before you actually finished their processing. If your service crashes or stops, you won't be able to consume these messages again after the service restart.
  • consumer will create a new instance of controller and pass the next intake of messages, but the previous instance is still processing the previous intake. It may exhaust your host resources.

Cancellation token

Cancellation token is signalled when the application is being stopped. You should respect the token cancellation and cancel your message processing if it's possible.

Lifetime

A new instance of your controller will be created for each intake of messages.

Thread-safety

Your controller will not be called by multiple threads in parallel. Even when you have multiple parallel consumers, each of them accumulates its own intake of messages and creates a separate instance of your controller.

Duration of the ProcessAsync method

In general, you should process the messages as fast as possible and return from the ProcessAsync method.

In case of long-running message processing, when it takes more than 5 minutes to process the intake, you may need to increase the maxPollIntervalMs Kafka setting. Otherwise the consumer will be considered "dead" by the Kafka and when you finish message processing the consumer will fail to commit their offsets. This will lead to a consumer restart and you might consume the same messages once again.

You can increase maxPollIntervalMs in the consumer group options (Program.cs). Make sure this value is greater than the maximum expected duration of your message processing.

services.AddKafkaEventLoop(ctx.Configuration, o => o
    .HasConsumerGroup("<your-consumer-group-id>", cgOptions => cgOptions
        .HasMessageType<MyMessage>()
        .HasJsonMessageDeserializer()
        .HasController<MyController>()
        .HasKafkaConfig(c => c.MaxPollIntervalMs = 900000)  // <---- 15 minutes
        .Build())
    .Build());

Exceptions

If your controller throws an unhandled exception it will be considered as non-transient. This may lead to a consumer stop or dead-lettering, depending on your configuration.

If you want to signal that the error is transient (it can be retried), you should throw another exception of type TransientException:

public class MyController : IKafkaController<MyMessage>
{
    public Task ProcessAsync(MessageInfo<MyMessage>[] messages, CancellationToken token)
    {
        try
        {
            // process your messages
        }
        catch (SomeException ex)
        {
            throw new TransientException(ex);
        }
    }
}

In this case, the consumer will be restarted after a delay and consume the same messages again.

See also Error handling.

Clone this wiki locally