-
Notifications
You must be signed in to change notification settings - Fork 0
Controller and message processing
Controller means a component that is responsible for message processing.
When the next intake of messages is accumulated the library will pass them to your controller.
You create controller by implementing IKafkaController<TMessage>
:
public class MyController : IKafkaController<MyMessage>
{
public Task ProcessAsync(MessageInfo<MyMessage>[] messages, CancellationToken token)
{
// process your messages
}
}
You must implement 1 method only. This method receives an array of messages (intake) and a cancellation token. The cancellation token is signaled when the application is requested to stop.
This record holds the message itself and contains additional metadata describing the Kafka message:
-
Value
- message instance -
EventTimeUtc
- message timestamp. This value can be:- either
CreateTime
(time when producer record is created but before sending) - or
LogAppendTime
(time when the record was appended to the Kafka log on the broker), - it depends on your broker or topic configuration (by default it is
CreateTime
).
- either
-
Topic
- topic name, -
Partition
- partition, -
Offset
- message offset.
services.AddKafkaEventLoop(ctx.Configuration, o => o
.HasConsumerGroup("<your-consumer-group-id>", cgOptions => cgOptions
.HasMessageType<MyMessage>()
.HasJsonMessageDeserializer()
.HasController<MyController>() // <---- controller registration
.Build())
.Build());
The ProcessAsync
method returns a task that must be completed only when the given messages are successfully processed. In other words, you must wait for any external calls to the database, API, etc. before returning from the ProcessAsync
method.
Do not try to process messages in a separate task and return from the method before the actual message processing is done. This might lead to undesirable side effects:
- consumer will commit the message offsets before you actually finished their processing. If your service crashes or stops, you won't be able to consume these messages again after the service restart.
- consumer will create a new instance of controller and pass the next intake of messages, but the previous instance is still processing the previous intake. It may exhaust your host resources.
Cancellation token is signalled when the application is being stopped. You should respect the token cancellation and cancel your message processing if it's possible.
A new instance of your controller will be created for each intake of messages.
Your controller will not be called by multiple threads in parallel. Even when you have multiple parallel consumers, each of them accumulates its own intake of messages and creates a separate instance of your controller.
In general, you should process the messages as fast as possible and return from the ProcessAsync
method.
In case of long-running message processing, when it takes more than 5 minutes to process the intake, you may need to increase the maxPollIntervalMs
Kafka setting. Otherwise the consumer will be considered "dead" by the Kafka and when you finish message processing the consumer will fail to commit their offsets. This will lead to a consumer restart and you might consume the same messages once again.
You can increase maxPollIntervalMs
in the consumer group options (Program.cs). Make sure this value is greater than the maximum expected duration of your message processing.
services.AddKafkaEventLoop(ctx.Configuration, o => o
.HasConsumerGroup("<your-consumer-group-id>", cgOptions => cgOptions
.HasMessageType<MyMessage>()
.HasJsonMessageDeserializer()
.HasController<MyController>()
.HasKafkaConfig(c => c.MaxPollIntervalMs = 900000) // <---- 15 minutes
.Build())
.Build());
If your controller throws an unhandled exception it will be considered as non-transient. This may lead to a consumer stop or dead-lettering, depending on your configuration.
If you want to signal that the error is transient (it can be retried), you should throw another exception of type TransientException
:
public class MyController : IKafkaController<MyMessage>
{
public Task ProcessAsync(MessageInfo<MyMessage>[] messages, CancellationToken token)
{
try
{
// process your messages
}
catch (SomeException ex)
{
throw new TransientException(ex);
}
}
}
In this case, the consumer will be restarted after a delay and consume the same messages again.
See also Error handling.