-
Notifications
You must be signed in to change notification settings - Fork 0
Installation
Artem Utkin edited this page May 28, 2023
·
1 revision
Important: the library is compatible with .NET 6 and higher.
-
Kafka.EventLoop
- if you use the default IoC-container provided by Microsoft. -
Kafka.EventLoop.Autofac
- if you use Autofac IoC-container.
2. Create your controller class and implement IKafkaController<TMessage>
interface, where TMessage
is a type of messages that you want to consume and process.
Example:
public class MyController : IKafkaController<MyMessage>
{
public Task ProcessAsync(MessageInfo<MyMessage>[] messages, CancellationToken token)
{
// process your messages
}
}
Read more about kafka controllers.
Host
.CreateDefaultBuilder(args)
.ConfigureServices((ctx, services) =>
{
services.AddKafkaEventLoop(ctx.Configuration, o => o
.HasConsumerGroup("<your-consumer-group-id>", cgOptions => cgOptions
.HasMessageType<MyMessage>()
.HasJsonMessageDeserializer()
.HasController<MyController>()
.Build())
.Build());
})
.Build()
.Run();
Host
.CreateDefaultBuilder(args)
.UseServiceProviderFactory(new AutofacServiceProviderFactory())
.ConfigureContainer<ContainerBuilder>((ctx, builder) =>
{
builder.AddKafkaEventLoop(ctx.Configuration, o => o
.HasConsumerGroup("<your-consumer-group-id>", cgOptions => cgOptions
.HasMessageType<MyMessage>()
.HasJsonMessageDeserializer()
.HasController<MyController>()
.Build())
.Build());
})
.ConfigureServices((ctx, services) =>
{
services.AddKafkaHostedService();
})
.Build()
.Run();
Please follow this article.