Lightweight .NET Kafka adapter intended to let an existing codebase publish and consume Kafka messages without taking a direct dependency on Kafka-specific APIs throughout the application.
Provide a small, maintainable adapter library that:
- Exposes application-facing abstractions for publishing and consuming messages
- Wraps Kafka client setup, configuration, serialization, and delivery concerns
- Integrates cleanly with standard .NET dependency injection and hosting
- Keeps application code insulated from Kafka implementation details
The first version should focus on the smallest useful feature set:
- Typed message publishing
- Typed message consumption
- JSON serialization and deserialization
- DI registration for producer and consumer services
- Background consumer hosting for worker services or ASP.NET Core apps
- Basic logging, error handling, and retry behavior
The first version does not need to solve everything:
- Full schema registry support
- Complex stream processing
- Exactly-once delivery guarantees
- Custom admin tooling
- Broad broker management features
This repository now contains the initial solution scaffold, public contracts, producer and consumer implementations, DI registration, typed handler registration helpers, retry/dead-letter behavior, a runnable sample app, and live integration tests against a local Kafka broker.
- Confirmed the library is technically feasible as a lightweight .NET adapter over Kafka
- Defined the high-level goal and initial project scope
- Created the initial project README
- Created the .NET solution and class library structure
- Chosen the initial target framework (
net8.0) - Defined the first-pass public abstractions for publishing and consuming
- Defined the first-pass configuration model for brokers, topics, auth, and consumer groups
- Added the Kafka client dependency (
Confluent.Kafka) - Implemented the first producer wrapper for typed messages
- Added the first JSON serialization path for produced messages
- Added DI registration extensions for the producer and consumer paths
- Implemented the consumer worker pattern for typed handlers
- Added JSON deserialization support for consumers
- Added typed handler registration helpers for consuming apps
- Added structured logging around produce/consume failures
- Decided the initial offset commit strategy and failure semantics
- Added retry behavior and dead-letter strategy
- Added local development setup for Kafka
- Added integration tests against a local Kafka instance
- Added sample application showing publish/consume usage
- Added usage documentation and configuration examples
- Narrowed the handler registration API to consumer-facing options
- Added production-oriented failure and authentication guidance
- Added optional certificate-based TLS broker security settings
- Added a TLS-enabled local Kafka integration test path
- Added initial NuGet package metadata and pack/push instructions
- Refined public-release package metadata
- Added operational guidance for monitoring, replay, and dead-letter reprocessing
- Added MIT license metadata for NuGet publication
- Added GitHub Actions release automation for GitHub Releases and NuGet publishing
- Split build/test CI from release automation into separate workflows
- Added fail-fast options validation and built-in metrics for observability
- Decide whether to refactor shared GitHub Actions steps into a reusable workflow or composite action
The likely shape of the library:
- A reusable adapter package
- A small sample app
- Integration tests
- Documentation for setup and usage
The repository includes a single-node Kafka stack for local development in docker-compose.kafka.yml.
docker compose -f docker-compose.kafka.yml up -dKafka will be reachable from the host on localhost:9092.
docker compose -f docker-compose.kafka.yml downTo remove the Kafka data volume as well:
docker compose -f docker-compose.kafka.yml down -v- This uses a single broker in KRaft combined mode for local development only.
- The adapter should use
localhost:9092asBootstrapServerswhen running on the host machine.
Run the Kafka integration tests against the local broker with:
dotnet test tests/DotNetKafkaAdapter.IntegrationTests/DotNetKafkaAdapter.IntegrationTests.csprojTo skip the TLS-specific integration test explicitly:
DOTNET_KAFKA_ADAPTER_SKIP_TLS_TESTS=true dotnet test tests/DotNetKafkaAdapter.IntegrationTests/DotNetKafkaAdapter.IntegrationTests.csprojTo run the TLS integration test as well:
powershell -ExecutionPolicy Bypass -File scripts/generate-kafka-tls-certs.ps1
docker compose -f docker-compose.kafka.tls.yml up -d
dotnet test tests/DotNetKafkaAdapter.IntegrationTests/DotNetKafkaAdapter.IntegrationTests.csprojThe TLS broker listens on localhost:9093 and requires:
- server certificate validation against the generated local CA
- client certificate authentication with the generated client certificate and private key
The TLS-specific test is skipped automatically if the generated certificate assets are not present.
It is also skipped when DOTNET_KAFKA_ADAPTER_SKIP_TLS_TESTS=true.
A runnable sample lives in samples/DotNetKafkaAdapter.SampleApp.
Run it against the local broker with:
dotnet run --project samples/DotNetKafkaAdapter.SampleApp/DotNetKafkaAdapter.SampleApp.csprojOptional environment variables:
KAFKA_BOOTSTRAP_SERVERSdefaults tolocalhost:9092KAFKA_TOPICdefaults tosample.ordersKAFKA_CONSUMER_GROUPdefaults tosample.orders.consumerKAFKA_DEAD_LETTER_TOPICdefaults to<topic>.dlq
The sample app:
- creates the sample topic and dead-letter topic if they do not exist
- starts the adapter consumer hosted service
- publishes one
OrderSubmittedmessage - logs the consumed message and stops the host
The adapter library can be packed from src/DotNetKafkaAdapter.
The project currently packs with:
- package ID
DotNetKafkaAdapter - default version
0.1.0 - the repository
README.mdincluded as the NuGet package readme
Create a package locally with:
dotnet pack src/DotNetKafkaAdapter/DotNetKafkaAdapter.csproj -c Release -o artifacts/packagesTo set an explicit package version at pack time:
dotnet pack src/DotNetKafkaAdapter/DotNetKafkaAdapter.csproj -c Release -o artifacts/packages /p:Version=0.1.0The generated .nupkg file will be written to artifacts/packages.
The package metadata is defined in DotNetKafkaAdapter.csproj. If you are preparing a public release, you will likely still want to set or refine:
VersionAuthors- release automation and publishing workflow
Push a built package with:
dotnet nuget push artifacts/packages/DotNetKafkaAdapter.<version>.nupkg --source https://api.nuget.org/v3/index.json --api-key <your-api-key>If you want symbol packages as well, include them when packing:
dotnet pack src/DotNetKafkaAdapter/DotNetKafkaAdapter.csproj -c Release -o artifacts/packages /p:IncludeSymbols=true /p:SymbolPackageFormat=snupkgThe repository includes a CI workflow at .github/workflows/ci.yml.
It runs on pushes to main or master and on pull requests. It will:
- restore and build the solution
- start the plaintext local Kafka Docker stack
- run the integration test suite
This keeps routine validation separate from the release pipeline.
The GitHub CI workflow sets DOTNET_KAFKA_ADAPTER_SKIP_TLS_TESTS=true, so the TLS-specific integration test is excluded there.
The repository includes a release workflow at .github/workflows/release.yml.
It will:
- restore, build, and run the integration test suite
- start the plaintext local Kafka stack in Docker
- pack
.nupkgand.snupkgartifacts - create a GitHub Release for the version tag
- push the package to NuGet when the
NUGET_API_KEYsecret is configured in thereleaseGitHub Environment
The release workflow is intentionally separate from CI so package publication only happens from explicit release events.
The GitHub release workflow also sets DOTNET_KAFKA_ADAPTER_SKIP_TLS_TESTS=true, so TLS integration remains available locally but is excluded from GitHub-hosted runs.
The release workflow runs automatically when you push a tag that starts with v, for example:
git tag v0.1.0
git push origin v0.1.0You can also run it manually from GitHub Actions with:
versionpublish_to_nuget
The release workflow targets the release GitHub Environment. To publish to NuGet from GitHub Actions:
- Create a GitHub Environment named
release - Add an environment secret named
NUGET_API_KEY
If that secret is not present, the workflow will still build, test, pack, and create the GitHub Release, but it will skip the NuGet push step.
Typical registration in an application:
using DotNetKafkaAdapter.Abstractions;
using Microsoft.Extensions.DependencyInjection;
services.AddKafkaAdapter(options =>
{
options.BootstrapServers = "localhost:9092";
options.ClientId = "my-app";
options.Producer.DefaultTopic = "orders";
});
services.AddKafkaHandler<OrderSubmitted, OrderSubmittedHandler>(
topic: "orders",
consumerGroup: "orders-consumer",
options =>
{
options.MaxRetryAttempts = 3;
options.RetryDelay = TimeSpan.FromSeconds(1);
options.DeadLetterTopic = "orders.dlq";
});If you prefer to build options up front instead of registering handlers through DI extensions:
var kafkaOptions = new KafkaAdapterOptions
{
BootstrapServers = "localhost:9092",
ClientId = "my-app"
};
kafkaOptions.Producer.DefaultTopic = "orders";
kafkaOptions.AddConsumer<OrderSubmitted, OrderSubmittedHandler>(
topic: "orders",
consumerGroup: "orders-consumer",
configure: options =>
{
options.MaxRetryAttempts = 3;
options.RetryDelay = TimeSpan.FromSeconds(1);
options.DeadLetterTopic = "orders.dlq";
});
services.AddKafkaAdapter(kafkaOptions);
services.AddScoped<OrderSubmittedHandler>();Publish a message:
await publisher.PublishAsync(
"orders",
new OrderSubmitted("order-123", "customer-42", 42.50m),
new PublishOptions
{
Key = "order-123",
MessageId = "order-123"
},
cancellationToken);Handle a consumed message:
public sealed class OrderSubmittedHandler : IMessageHandler<OrderSubmitted>
{
public Task HandleAsync(
MessageContext context,
OrderSubmitted message,
CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
}Current consumer behavior:
- If a handler succeeds and
AutoCommitisfalse, the adapter commits the Kafka offset after handling. - If deserialization fails, the adapter does not retry in-process. It sends the message to the dead-letter topic if one is configured; otherwise that consumer loop stops.
- If the handler throws, the adapter retries in-process up to
MaxRetryAttemptswith exponential backoff based onRetryDelay. - If all retries fail and a dead-letter topic is configured, the adapter publishes a
KafkaDeadLetterMessageand then commits the original offset. - If all retries fail and no dead-letter topic is configured, that consumer loop stops without committing the failed message.
What this does not guarantee:
- Exactly-once delivery
- Global ordering beyond Kafka partition semantics
- Distributed retries across processes
- Automatic replay or re-drive of dead-lettered messages
Local development usually uses plaintext:
services.AddKafkaAdapter(options =>
{
options.BootstrapServers = "localhost:9092";
options.ClientId = "my-app";
});For SASL/PLAIN over TLS:
services.AddKafkaAdapter(options =>
{
options.BootstrapServers = "your-broker:9093";
options.ClientId = "my-app";
options.Security.Protocol = KafkaSecurityProtocol.SaslSsl;
options.Security.SaslMechanism = KafkaSaslMechanism.Plain;
options.Security.Username = "my-username";
options.Security.Password = "my-password";
});For SASL/SCRAM over TLS:
services.AddKafkaAdapter(options =>
{
options.BootstrapServers = "your-broker:9093";
options.ClientId = "my-app";
options.Security.Protocol = KafkaSecurityProtocol.SaslSsl;
options.Security.SaslMechanism = KafkaSaslMechanism.ScramSha512;
options.Security.Username = "my-username";
options.Security.Password = "my-password";
});For TLS server verification with a custom CA bundle:
services.AddKafkaAdapter(options =>
{
options.BootstrapServers = "your-broker:9093";
options.ClientId = "my-app";
options.Security.Protocol = KafkaSecurityProtocol.Ssl;
options.Security.SslCaLocation = "/etc/certs/ca.pem";
options.Security.EnableSslCertificateVerification = true;
options.Security.SslEndpointIdentificationAlgorithm = KafkaSslEndpointIdentificationAlgorithm.Https;
});For mutual TLS with a client certificate and private key:
services.AddKafkaAdapter(options =>
{
options.BootstrapServers = "your-broker:9093";
options.ClientId = "my-app";
options.Security.Protocol = KafkaSecurityProtocol.Ssl;
options.Security.SslCaLocation = "/etc/certs/ca.pem";
options.Security.SslCertificateLocation = "/etc/certs/client.pem";
options.Security.SslKeyLocation = "/etc/certs/client.key";
options.Security.SslKeyPassword = "optional-key-password";
});Optional TLS settings currently supported:
SslCaLocationSslCaPemSslCaCertificateStoresSslCertificateLocationSslCertificatePemSslKeyLocationSslKeyPemSslKeyPasswordSslKeystoreLocationSslKeystorePasswordEnableSslCertificateVerificationSslEndpointIdentificationAlgorithm
- Keep
BootstrapServers, usernames, and passwords in configuration or secret storage rather than source code. - Keep certificate file paths, PEM content, and key passwords in configuration or secret storage rather than source code.
- Use
KafkaSecurityProtocol.SaslSslorKafkaSecurityProtocol.Sslfor remote brokers unless you explicitly control a trusted plaintext network. - Use the TLS settings only when your broker protocol is
SslorSaslSsl; they remain optional and are ignored for plaintext configurations. - Decide whether stopping a consumer loop on an unrecoverable failure is acceptable for your service model before using the current defaults in production.
- Provision dead-letter topics explicitly and monitor them; the adapter publishes DLQ messages but does not re-drive them.
- Integration tests in this repo validate the local broker path, not a production cluster topology.
For production operation, monitor the adapter at both the service and Kafka level.
Recommended service-level signals:
- publish failures from
KafkaMessagePublisher - consumer loop failures from
KafkaConsumerHostedService - retry attempt counts
- dead-letter publish counts
- dead-letter topic growth over time
- handler processing latency
- consumer lag for each topic and consumer group
Recommended operational alerts:
- any consumer loop stops unexpectedly
- dead-letter traffic exceeds a normal baseline
- consumer lag continues to grow for a sustained window
- repeated deserialization failures for the same topic
- repeated handler failures for the same message type or handler
Operationally, the most important current distinction is:
- deserialization failures are not retried in-process
- handler failures are retried in-process and may be dead-lettered after retries are exhausted
That means deserialization failures usually point to a producer-contract mismatch, while handler failures usually point to business logic or downstream dependency issues.
The adapter publishes a KafkaDeadLetterMessage payload to the configured dead-letter topic. The message includes:
- original topic
- consumer group
- original key and message ID
- serialized payload
- headers
- partition, offset, and timestamp
- message type and handler type
- failure stage, error message, exception type, and attempt count
Use that information to separate recovery paths:
- if
FailureStageisdeserialization, fix the producer or schema/contract mismatch before replaying - if
FailureStageishandler, fix the handler or downstream dependency before replaying - if the payload is irrecoverable, archive it and do not replay it blindly
For replay, a safe default process is:
- identify the root cause and deploy the fix first
- inspect the DLQ payloads and filter to only the recoverable subset
- republish those messages to the original topic or a dedicated retry topic
- monitor consumer lag, retry counts, and DLQ volume during replay
- keep replay idempotent at the handler level, because the adapter does not guarantee exactly-once handling
Avoid wiring automatic DLQ re-drive back into the same consumer path until you have explicit idempotency and replay controls in place. Otherwise a poison message can loop between the primary topic and DLQ.
For low-volume recovery, manual replay with an operator-reviewed tool or script is the safest option.
For higher-volume recovery, prefer a separate replay worker that:
- reads from the DLQ topic
- validates message age, failure stage, and replay eligibility
- republishes at a controlled rate
- tags replayed messages with a header or message attribute if you add that behavior later
- writes an audit log of what was replayed and why
The current adapter intentionally does not include built-in replay automation. That keeps the core library small and avoids baking operational policy into the transport layer.
The adapter now validates configuration at startup through the .NET options pipeline, so invalid broker, security, producer, or consumer settings fail fast during host startup instead of surfacing later inside background tasks.
Current stability guardrails:
BootstrapServersmust be configured- SASL username and password are required for SASL protocols
- consumer topic, consumer group, message type, and handler type must be configured
- retry counts must be zero or greater
- configured topic values cannot be whitespace
The intent is to keep the public surface small and predictable while catching invalid registrations before runtime traffic begins.
The adapter emits built-in metrics through the .NET Meter named DotNetKafkaAdapter. The stable names live in KafkaAdapterInstrumentation.cs.
Current instruments include:
dotnet_kafka_adapter.messages.publisheddotnet_kafka_adapter.publish.failuresdotnet_kafka_adapter.publish.durationdotnet_kafka_adapter.messages.handleddotnet_kafka_adapter.consume.failuresdotnet_kafka_adapter.deserialization.failuresdotnet_kafka_adapter.handler.failuresdotnet_kafka_adapter.retry.attemptsdotnet_kafka_adapter.dead_letter.publisheddotnet_kafka_adapter.offset_commit.failuresdotnet_kafka_adapter.consumers.active
The metrics use lightweight tags such as:
topicconsumer_grouphandlerdead_letter_topicstage
If you are using OpenTelemetry in an application, add the meter with:
builder.Services.AddOpenTelemetry()
.WithMetrics(metrics => metrics.AddMeter("DotNetKafkaAdapter"));Decide how far to take production hardening, especially packaging and advanced broker security support.