Skip to content

Commit 2df3b4d

Browse files
committed
Now it can adapt to any schema.
1 parent 905eca9 commit 2df3b4d

File tree

7 files changed

+44
-52
lines changed

7 files changed

+44
-52
lines changed

docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ services:
4242
environment:
4343
SCHEMA_REGISTRY_HOST_NAME: schema-registry
4444
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "PLAINTEXT://kafka-1:19092"
45+
# SCHEMA_COMPATIBILITY: BACKWARD
46+
SCHEMA_COMPATIBILITY: FULL_TRANSITIVE
4547
depends_on:
4648
- zookeeper
4749
- kafka-1

src/main/avro/SensorEvent.avsc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"type": "record",
33
"name": "SensorEventAvro",
4-
"namespace": "org.wsd.app.avro",
4+
"namespace": "org.wsd.app.event",
55
"fields": [
66
{
77
"name": "id",
@@ -12,8 +12,9 @@
1212
"type": "double"
1313
},
1414
{
15-
"name": "y",
16-
"type": "double"
15+
"name": "z",
16+
"type": "double",
17+
"default": "-1"
1718
}
1819
]
1920
}

src/main/java/org/wsd/app/bootloader/BootLoader.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,28 +23,16 @@
2323
package org.wsd.app.bootloader;
2424

2525
import com.github.javafaker.Faker;
26-
import io.micrometer.observation.annotation.Observed;
2726
import lombok.RequiredArgsConstructor;
2827
import org.springframework.boot.CommandLineRunner;
29-
import org.springframework.data.mongodb.core.query.Query;
30-
import org.springframework.kafka.core.KafkaTemplate;
3128
import org.springframework.stereotype.Service;
3229
import org.springframework.transaction.annotation.Transactional;
33-
import org.wsd.app.avro.SensorEventAvro;
34-
import org.wsd.app.config.TopicConfiguration;
3530
import org.wsd.app.domain.PhotoEntity;
3631
import org.wsd.app.mongo.Address;
3732
import org.wsd.app.mongo.Gender;
3833
import org.wsd.app.mongo.Person;
3934
import org.wsd.app.mongo.PersonRepository;
4035
import org.wsd.app.repository.PhotoRepository;
41-
import org.wsd.app.repository.UserRepository;
42-
43-
import java.util.ArrayList;
44-
import java.util.List;
45-
import java.util.Map;
46-
import java.util.UUID;
47-
import java.util.stream.Stream;
4836

4937
@Service
5038
@RequiredArgsConstructor
@@ -72,12 +60,6 @@ public void run(String... args) throws Exception {
7260
person.setAddress(address);
7361
person.setGender(Gender.MALE);
7462

75-
SensorEventAvro eventAvro = SensorEventAvro.newBuilder()
76-
.setId(UUID.randomUUID().toString())
77-
.setX(1)
78-
.setY(Math.random())
79-
.build();
80-
8163

8264
// personRepository.save(person);
8365

src/main/java/org/wsd/app/config/KafkaConfig.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222

2323
package org.wsd.app.config;
2424

25+
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
26+
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
2527
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
28+
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
2629
import io.confluent.kafka.serializers.KafkaAvroSerializer;
2730
import lombok.extern.log4j.Log4j2;
2831
import org.apache.kafka.clients.consumer.Consumer;
@@ -54,6 +57,7 @@
5457
import org.springframework.retry.policy.SimpleRetryPolicy;
5558
import org.springframework.retry.support.RetryTemplate;
5659

60+
import java.util.Collections;
5761
import java.util.HashMap;
5862
import java.util.Map;
5963
import java.util.UUID;
@@ -117,16 +121,26 @@ public RetryTemplate retryTemplate() {
117121
return retryTemplate;
118122
}
119123

124+
120125
@Bean
121126
public ConsumerFactory<String, Object> consumerFactory() {
122127
final Map<String, Object> configProps = new HashMap<>();
123128
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
124129

125130
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, UUIDDeserializer.class.getName());
126-
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
127-
// // Specify the actual deserializer classes as properties of ErrorHandlingDeserializer
128-
// configProps.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, UUIDDeserializer.class.getName());
129-
// configProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class.getName());
131+
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class.getName());
132+
configProps.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, UUIDDeserializer.class.getName());
133+
configProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class.getName());
134+
135+
// Configure the trusted package for Avro deserialization
136+
configProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
137+
configProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
138+
139+
configProps.put("specific.avro.reader.schema.trusted", "true");
140+
configProps.put("schema.registry.compatibility.level", "FULL_TRANSITIVE");
141+
142+
// Specify the schema registry URL
143+
130144

131145
// Trusted package
132146
configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "org.wsd.app.event");
@@ -150,9 +164,6 @@ public ConsumerFactory<String, Object> consumerFactory() {
150164
configProps.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "300000");
151165
configProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "305000");
152166
configProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "60000");
153-
154-
configProps.put("schema.registry.url", "http://localhost:8081");
155-
configProps.put("specific.avro.read", "true");
156167
return new DefaultKafkaConsumerFactory<>(configProps);
157168
}
158169

src/main/java/org/wsd/app/messaging/pubs/ProducerService.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import org.springframework.stereotype.Service;
3636
import org.springframework.transaction.TransactionException;
3737
import org.springframework.transaction.annotation.Transactional;
38-
import org.wsd.app.avro.SensorEventAvro;
38+
import org.wsd.app.event.SensorEventAvro;
3939
import org.wsd.app.config.TopicConfiguration;
4040
import org.wsd.app.repository.UserRepository;
4141

@@ -48,15 +48,14 @@
4848
public class ProducerService {
4949
private final UserRepository userRepository;
5050

51-
5251
private final KafkaTemplate<UUID, SensorEventAvro> sensorEventAvroKafkaTemplate;
5352

5453
@Bean
5554
public CommandLineRunner commandLineRunner(ProducerService producerService) {
5655
return args -> {
5756
for (int i = 0; i < 10; i++) {
5857
try {
59-
producerService.process(i);
58+
// producerService.process(i);
6059
} catch (Exception e) {
6160
System.out.println(e.getMessage());
6261
}
@@ -66,19 +65,18 @@ public CommandLineRunner commandLineRunner(ProducerService producerService) {
6665

6766
@Transactional(value = "transactionManager", rollbackFor = Exception.class)
6867
public void process(int i) throws TransactionException {
69-
final SensorEventAvro sensorEvent = new SensorEventAvro();
70-
sensorEvent.setX(i);
71-
sensorEvent.setY(Math.random());
68+
7269

7370
SensorEventAvro eventAvro = SensorEventAvro.newBuilder()
7471
.setId(UUID.randomUUID().toString())
7572
.setX(1)
76-
.setY(Math.random())
73+
.setZ(Math.random())
7774
.build();
7875

7976
final Message<SensorEventAvro> message = MessageBuilder
8077
.withPayload(eventAvro)
8178
.setHeader(KafkaHeaders.KEY, UUID.randomUUID())
79+
.setHeader("schema.version","V2")
8280
.setHeader(KafkaHeaders.TOPIC, TopicConfiguration.SENSOR)
8381
.setHeader(KafkaHeaders.TIMESTAMP, System.currentTimeMillis())
8482
.build();

src/main/java/org/wsd/app/messaging/subs/ConsumerService.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,31 +24,19 @@
2424

2525
import lombok.extern.log4j.Log4j2;
2626
import org.apache.avro.generic.GenericRecord;
27-
import org.apache.kafka.clients.NetworkClient;
2827
import org.apache.kafka.clients.consumer.ConsumerRecord;
29-
import org.springframework.context.annotation.Lazy;
3028
import org.springframework.kafka.annotation.DltHandler;
3129
import org.springframework.kafka.annotation.KafkaListener;
3230
import org.springframework.kafka.annotation.RetryableTopic;
33-
import org.springframework.kafka.annotation.TopicPartition;
3431
import org.springframework.kafka.support.Acknowledgment;
35-
import org.springframework.kafka.support.converter.ConversionException;
36-
import org.springframework.kafka.support.serializer.DeserializationException;
37-
import org.springframework.messaging.converter.MessageConversionException;
38-
import org.springframework.messaging.handler.annotation.Headers;
3932
import org.springframework.messaging.handler.annotation.Payload;
40-
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
4133
import org.springframework.retry.annotation.Backoff;
4234
import org.springframework.stereotype.Service;
43-
import org.wsd.app.avro.SensorEventAvro;
35+
import org.wsd.app.event.SensorEventAvro;
4436
import org.wsd.app.config.TopicConfiguration;
45-
import org.wsd.app.event.SensorEvent;
4637

47-
import java.util.HashMap;
4838
import java.util.List;
49-
import java.util.Map;
5039
import java.util.UUID;
51-
import java.util.concurrent.ConcurrentHashMap;
5240

5341
@Log4j2
5442
@Service
@@ -59,8 +47,8 @@ public class ConsumerService {
5947
backoff = @Backoff(delay = 10, multiplier = 1.5, maxDelay = 2000)
6048
)
6149
@KafkaListener(topics = TopicConfiguration.SENSOR, groupId = "sensor-group", containerFactory = "kafkaListenerContainerFactory")
62-
public void consumerGroup1(@Payload GenericRecord record) {
63-
log.info("Consumed : " + record);
50+
public void consumerGroup1(@Payload ConsumerRecord<UUID, SensorEventAvro> record) {
51+
log.info("Consumed : " + record.value());
6452
}
6553

6654

src/main/resources/application.properties

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# Hibernate Level Cache Configuration
12
spring.jpa.properties.javax.persistence.sharedCache.mode=ENABLE_SELECTIVE
23
spring.jpa.properties.hibernate.cache.use_second_level_cache=true
34
spring.jpa.properties.hibernate.cache.use_query_cache=true
@@ -13,17 +14,26 @@ spring.quartz.properties.org.quartz.jobStore.driverDelegateClass=org.quartz.impl
1314
spring.quartz.properties.org.quartz.jobStore.isClustered=true
1415
spring.quartz.properties.org.quartz.scheduler.instanceId=AUTO
1516

16-
# Log
17+
# Log Configuration
1718
logging.file.name=logs/app.log
1819

1920
#The filename pattern used to create log archives.
2021
logging.logback.rollingpolicy.file-name-pattern=logs/%d{yyyy-MM, aux}/app.%d{yyyy-MM-dd}.%i.log
2122

2223
#The maximum size of log file before it is archived.
23-
logging.logback.rollingpolicy.max-file-size=1MB
24+
logging.logback.rollingpolicy.max-file-size=10MB
2425

2526
#The maximum amount of size log archives can take before being deleted.
2627
logging.logback.rollingpolicy.total-size-cap=10GB
2728

2829
#The maximum number of archive log files to keep (defaults to 7).
2930
logging.logback.rollingpolicy.max-history=10
31+
32+
## root level
33+
#logging.level.root=error
34+
#
35+
## package level logging
36+
#logging.level.org.springframework.web=debug
37+
#logging.level.org.hibernate=error
38+
#logging.level.com.mkyong=error
39+

0 commit comments

Comments
 (0)