Example (template) project demonstrating how to integrate Apache Kafka with Spring Boot in a modern Java application.
This repository contains:
- producers (REST controllers) that publish messages to Kafka topics
- consumers using
@KafkaListenerthat deserialize DTOs and persist them to H2 using Spring Data JPA - full configuration of
ProducerFactory/ConsumerFactory/ConcurrentKafkaListenerContainerFactory - error handling with
DefaultErrorHandlerand a Dead Letter Queue (DLQ) - integration tests using
spring-kafka-test(Embedded Kafka)
Main technologies
- Java 21
- Spring Boot 3.5.7
- Spring Kafka
- Spring Data JPA + H2 (in-memory for tests)
- Lombok (to reduce boilerplate)
- Spring DevTools (development hot reload)
- Spring Actuator (monitoring and metrics)
- JUnit 5
Contents
- Overview
- Topics
- How to run (quickstart)
- Available REST APIs
- Testing and EmbeddedKafka test synchronization
- Architecture and design notes
- Debug & troubleshooting
- Contributing
This project is intended as a starting point for experiments and demos. The main components are under the it.alf package.
Consumers receive JSON messages that are deserialized into DTOs (classes under it.alf.dto.*) and persisted into H2 tables using the JPA entities under it.alf.entity.
Topics used by the application (configured in application.yml):
users-topic— messages of typeUserorders-topic— messages of typeOrdernotifications-topic— messages of typeNotificationevents-topic— messages of typeGenericEventdlq-topic— dead-letter queue used when messages cannot be processed correctly
NewTopic beans are created by KafkaConfig at startup to simplify the developer experience.
Prerequisites
- Docker (to run Kafka locally) or a running Kafka broker
- Maven 3.8+ and JDK 21
- Start Kafka locally (optional):
docker-compose up -d- Run the application in development mode:
mvn spring-boot:run- Use the REST APIs to publish messages to Kafka (PowerShell/curl examples)
- Create a
User:
curl -X POST http://localhost:8080/api/users -H "Content-Type: application/json" -d '{"id":"u1","name":"Alice","email":"alice@example.com"}'- Create an
Order:
curl -X POST http://localhost:8080/api/orders -H "Content-Type: application/json" -d '{"id":"o1","userId":"u1","product":"Book","amount":12.5}'- Send a
Notification:
curl -X POST http://localhost:8080/api/notifications -H "Content-Type: application/json" -d '{"id":"n1","message":"Order received","level":"INFO"}'- Send a
GenericEvent:
curl -X POST http://localhost:8080/api/events -H "Content-Type: application/json" -d '{"id":"e1","type":"user.signup","payload":{"userId":"u1"}}'Consumers persist messages into H2 tables. The app exposes GET endpoints to read saved entities (e.g. /api/users/{id}) or you can open the H2 console if enabled (see application.yml).
The application includes Spring Actuator for monitoring. Once the app is running, the following endpoints are available:
- Health Check:
http://localhost:8080/actuator/health - App Info:
http://localhost:8080/actuator/info - Metrics:
http://localhost:8080/actuator/metrics - Environment:
http://localhost:8080/actuator/env - Config Props:
http://localhost:8080/actuator/configprops - Loggers:
http://localhost:8080/actuator/loggers - Thread Dump:
http://localhost:8080/actuator/threaddump - Heap Dump:
http://localhost:8080/actuator/heapdump
The project includes Spring DevTools, which provides:
- Automatic restart when Java files change
- LiveReload integration for the browser
- Development-optimized defaults
To use automatic restart, modify and save a Java file — the application will restart automatically.
Integration tests rely on spring-kafka-test and @EmbeddedKafka to run an in-process Kafka broker.
Common issues
- Race conditions: a producer might send a message before the listener container is assigned to partitions. This can cause flaky tests with empty repositories.
- Shared context: Spring tests share the application context by default and can interfere with each other.
Solutions used in this project
- Integration tests explicitly wait for the listener container assignment before sending messages, using
KafkaListenerEndpointRegistryandContainerTestUtils.waitForAssignment(...). @DirtiesContext(classMode = AFTER_EACH_TEST_METHOD)is used to isolate tests when necessary.- Tests use a simple polling approach with a timeout (extended to 10s) to wait for the entity to be written to the DB. You can replace this with Awaitility for more expressive assertions.
Typical test flow:
- wait for the listener container assignment
- send the message with
KafkaTemplate.send(...).get() - poll the repository until the record appears
KafkaConfigconfigures producer/consumer factories and provides typedConcurrentKafkaListenerContainerFactoryinstances for each DTO.DefaultErrorHandleris configured with aDeadLetterPublishingRecovererthat publishes failed records todlq-topicafter a configured number of retries.- Consumers are simple
@Servicecomponents with@KafkaListener(...)that map DTOs to JPA entities and persist them.
Design choices and rationale
- JsonSerializer/JsonDeserializer: we use Spring Kafka's JSON serializer for simplicity and readability.
- Typed consumers (ConsumerFactory, etc.) enable direct deserialization into DTO classes without manual conversion.
- Lombok: entities and DTOs use Lombok to remove boilerplate (getters/setters/constructors). Main annotations used:
@Data— generates getters, setters, toString, equals, and hashCode@NoArgsConstructor— no-args constructor (required by JPA and Jackson)@AllArgsConstructor— all-args constructor
- Spring DevTools: configured for automatic restart and LiveReload during development.
- Spring Actuator: configured to expose detailed monitoring endpoints for health, metrics, and configuration.
- Tests fail with empty repositories: ensure listeners are assigned; check test logs for "partitions assigned".
- Messages in DLQ: inspect the
dlq-topicrecords to diagnose deserialization or business exceptions.
Pull requests are welcome. For structural changes to tests, prefer incremental refactoring and verify changes with mvn test.
Automatically generated documentation and inline code comments are provided to help readability.
This section explains how to quickly try Kafka in a Kubernetes cluster using the Strimzi operator. The simplest approach for local development is to deploy Strimzi on a local cluster (minikube/kind) and then use port-forwarding to connect your local application.
Prerequisites
kubectlconfigured for your cluster (minikube, kind, or OpenShift)helm(optional)- access to the machine running the cluster (minikube/kind)
- Create the namespace and install the Strimzi operator
kubectl create namespace kafka
# install Strimzi (operator + CRDs)
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka- Deploy a minimal Kafka cluster (1 replica) — save as
kafka-cluster.yamland apply
apiVersion: kafka.strimzi.io/v1beta3
kind: Kafka
metadata:
name: my-cluster
namespace: kafka
spec:
kafka:
version: 3.4.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: external
port: 9094
type: nodeport
tls: false
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator: {}kubectl apply -f kafka-cluster.yaml -n kafka
kubectl -n kafka wait kafka/my-cluster --for=condition=Ready --timeout=300s
kubectl -n kafka get pods- Create a topic (example
users-topicused by the app)
apiVersion: kafka.strimzi.io/v1beta3
kind: KafkaTopic
metadata:
name: users-topic
labels:
strimzi.io/cluster: my-cluster
namespace: kafka
spec:
partitions: 1
replicas: 1kubectl apply -f users-topic.yaml -n kafka
kubectl -n kafka get kafkatopic- Quick access from your local machine (port-forward)
This method is convenient for developing and testing the local application without deploying the app into Kubernetes.
# forward the bootstrap service to localhost:9092
kubectl -n kafka port-forward svc/my-cluster-kafka-bootstrap 9092:9092Then configure application.yml (or your spring.kafka.bootstrap-servers) with:
spring.kafka.bootstrap-servers: localhost:9092
Note: if you configured the external listener as nodeport, you can use the node IP and NodePort exposed by the my-cluster-kafka-external-bootstrap service.
- Test with Strimzi clients (console producer/consumer)
Quick example using a temporary pod that contains Kafka tools:
kubectl run --rm -i --tty kafka-client --image=strimzi/kafka:latest -- bash
# inside the pod shell
/opt/kafka/bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic users-topic
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic users-topic --from-beginning- Run the app locally and send messages
With the port-forward active, you can start the app with mvn spring-boot:run (or run the JAR) and use the existing REST endpoints to publish messages to the Strimzi topics (for example /api/users).
Tips and notes
- For more realistic CI/CD tests you can deploy the application into the same Kubernetes cluster.
- For production use, prefer
externallisteners with TLS and authentication (SCRAM or TLS) — Strimzi managesKafkaUserand TLS secrets for you. - CR API versions can change across Strimzi releases; consult the official docs (https://strimzi.io/docs) for the version you install.
If you want, I can add the full manifests as files, provide a kubectl demo script, or create a GitHub Action that brings up a kind cluster, installs Strimzi, and runs a smoke test.