Skip to content

Commit 20eef96

Browse files
committed
Use Spring Integration Messaging System to Talk with kafka with schema registry.
1 parent 7bf07e7 commit 20eef96

File tree

3 files changed

+34
-7
lines changed

3 files changed

+34
-7
lines changed

docker-compose.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
version: '3.8'
22
services:
3-
43
zookeeper:
54
container_name: zookeeper
65
restart: always

src/main/java/org/wsd/app/bootloader/BootLoader.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class BootLoader implements CommandLineRunner {
5252
private final PhotoRepository photoRepository;
5353
private final PersonRepository personRepository;
5454

55-
private final KafkaTemplate<UUID, SensorEventAvro> sensorEventAvroKafkaTemplate;
55+
5656

5757
@Override
5858
@Transactional
@@ -78,8 +78,6 @@ public void run(String... args) throws Exception {
7878
.setY(Math.random())
7979
.build();
8080

81-
sensorEventAvroKafkaTemplate.send(TopicConfiguration.SENSOR, UUID.randomUUID(), eventAvro);
82-
System.out.println("Okay");
8381

8482
// personRepository.save(person);
8583

src/main/java/org/wsd/app/messaging/pubs/ProducerService.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727
import org.springframework.boot.CommandLineRunner;
2828
import org.springframework.context.annotation.Bean;
2929
import org.springframework.kafka.core.KafkaTemplate;
30+
import org.springframework.kafka.support.KafkaHeaders;
3031
import org.springframework.kafka.support.SendResult;
32+
import org.springframework.messaging.Message;
33+
import org.springframework.messaging.support.MessageBuilder;
3134
import org.springframework.retry.annotation.Recover;
3235
import org.springframework.stereotype.Service;
3336
import org.springframework.transaction.TransactionException;
@@ -43,17 +46,44 @@
4346
@Service
4447
@RequiredArgsConstructor
4548
public class ProducerService {
46-
private final KafkaTemplate<UUID, SensorEventAvro> kafkaTemplate;
49+
private final UserRepository userRepository;
4750

4851

49-
// @Transactional(value = "transactionManager", rollbackFor = Exception.class)
52+
private final KafkaTemplate<UUID, SensorEventAvro> sensorEventAvroKafkaTemplate;
53+
54+
@Bean
55+
public CommandLineRunner commandLineRunner(ProducerService producerService) {
56+
return args -> {
57+
for (int i = 0; i < 10; i++) {
58+
try {
59+
producerService.process(i);
60+
} catch (Exception e) {
61+
System.out.println(e.getMessage());
62+
}
63+
}
64+
};
65+
}
66+
67+
@Transactional(value = "transactionManager", rollbackFor = Exception.class)
5068
public void process(int i) throws TransactionException {
5169
final SensorEventAvro sensorEvent = new SensorEventAvro();
5270
sensorEvent.setX(i);
5371
sensorEvent.setY(Math.random());
5472

73+
SensorEventAvro eventAvro = SensorEventAvro.newBuilder()
74+
.setId(UUID.randomUUID().toString())
75+
.setX(1)
76+
.setY(Math.random())
77+
.build();
78+
79+
final Message<SensorEventAvro> message = MessageBuilder
80+
.withPayload(eventAvro)
81+
.setHeader(KafkaHeaders.KEY, UUID.randomUUID())
82+
.setHeader(KafkaHeaders.TOPIC, TopicConfiguration.SENSOR)
83+
.setHeader(KafkaHeaders.TIMESTAMP, System.currentTimeMillis())
84+
.build();
5585

56-
CompletableFuture<? extends SendResult<UUID, ?>> future = kafkaTemplate.send(TopicConfiguration.SENSOR, UUID.randomUUID(), sensorEvent);
86+
CompletableFuture<? extends SendResult<UUID, ?>> future = sensorEventAvroKafkaTemplate.send(message);
5787
future.thenAccept(uuidSendResult -> {
5888
log.info("Message sent successfully.");
5989
}).exceptionally(exception -> {

0 commit comments

Comments
 (0)