From ef504c312bb785abaf9e06b7de2ca34332bc0d12 Mon Sep 17 00:00:00 2001 From: Naveen Kumar Date: Thu, 5 Sep 2024 12:25:50 +0530 Subject: [PATCH] "error with deserialization" --- .../Entity/FoodOrderNotification.java | 49 ----------------- .../configs/KafkaConsumerConfig.java | 24 ++++---- .../model/Notification.java | 55 +++++++++++++++++++ .../service/KafkaConsumerService.java | 16 +++--- src/main/resources/application.properties | 3 +- 5 files changed, 78 insertions(+), 69 deletions(-) delete mode 100644 src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/Entity/FoodOrderNotification.java create mode 100644 src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/model/Notification.java diff --git a/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/Entity/FoodOrderNotification.java b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/Entity/FoodOrderNotification.java deleted file mode 100644 index d20043b..0000000 --- a/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/Entity/FoodOrderNotification.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.itsnaveenk.spring_kafka_consumer_elastic.Entity; - -import org.springframework.data.elasticsearch.annotations.Document; - -@Document(indexName = "food_order_index") -public class FoodOrderNotification { - private String user_id; - private String message; - private String recipient_id; - - public FoodOrderNotification(String user_id, String message, String recipient_id) { - this.user_id = user_id; - this.message = message; - this.recipient_id = recipient_id; - } - - public String getUser_id() { - return user_id; - } - - public void setUser_id(String user_id) { - this.user_id = user_id; - } - - public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } - - public String getRecipient_id() { - return recipient_id; - } - - public void setRecipient_id(String recipient_id) { - this.recipient_id = recipient_id; - } - - @Override - public String toString() { - return "FoodOrderNotification{" + - "user_id='" + user_id + '\'' + - ", message='" + message + '\'' + - ", recipient_id='" + recipient_id + '\'' + - '}'; - } -} diff --git a/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/configs/KafkaConsumerConfig.java b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/configs/KafkaConsumerConfig.java index 0b1964f..2c4aeaf 100644 --- a/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/configs/KafkaConsumerConfig.java +++ b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/configs/KafkaConsumerConfig.java @@ -1,5 +1,6 @@ package com.itsnaveenk.spring_kafka_consumer_elastic.configs; +import com.itsnaveenk.spring_kafka_consumer_elastic.model.Notification; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; @@ -20,23 +21,24 @@ public class KafkaConsumerConfig { private String bootstrapAddress; @Bean - public ConsumerFactory consumerFactory() { + public ConsumerFactory consumerFactory() { - Map props = new HashMap<>(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "springConsumerGroup1"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); - props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,ErrorHandlingDeserializer.class); - props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // Adjust package names as needed + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + config.put(ConsumerConfig.GROUP_ID_CONFIG, "springConsumerGroup1");; + config.put(ConsumerConfig.GROUP_ID_CONFIG, "springConsumerGroup1"); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class.getName()); + config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName()); + config.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // Adjust package names as needed - return new DefaultKafkaConsumerFactory<>(props); + return new DefaultKafkaConsumerFactory<>(config); } @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; diff --git a/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/model/Notification.java b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/model/Notification.java new file mode 100644 index 0000000..8c028ec --- /dev/null +++ b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/model/Notification.java @@ -0,0 +1,55 @@ +package com.itsnaveenk.spring_kafka_consumer_elastic.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; + +public class Notification { + + private int user_Id; + + + private String message; + + private int recipient_Id; + + public Notification() { + } + public Notification(int user_Id, String message, int recipient_Id) { + this.user_Id = user_Id; + this.message = message; + this.recipient_Id = recipient_Id; + } + + public int getUser_Id() { + return user_Id; + } + + public void setUser_Id(int user_Id) { + this.user_Id = user_Id; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public int getRecipient_Id() { + return recipient_Id; + } + + public void setRecipient_Id(int recipient_Id) { + this.recipient_Id = recipient_Id; + } + + @Override + public String toString() { + return "Notification{" + + "user_Id=" + user_Id + + ", message='" + message + '\'' + + ", recipient_Id=" + recipient_Id + + '}'; + } +} diff --git a/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/service/KafkaConsumerService.java b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/service/KafkaConsumerService.java index 106cec5..071e2a8 100644 --- a/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/service/KafkaConsumerService.java +++ b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/service/KafkaConsumerService.java @@ -1,16 +1,16 @@ package com.itsnaveenk.spring_kafka_consumer_elastic.service; -import com.itsnaveenk.spring_kafka_consumer_elastic.Entity.FoodOrderNotification; -import org.springframework.stereotype.Service; +import com.itsnaveenk.spring_kafka_consumer_elastic.model.Notification; import org.springframework.kafka.annotation.KafkaListener; - - +import org.springframework.stereotype.Service; @Service public class KafkaConsumerService { - @KafkaListener(topics = "foodsOrder", groupId = "springConsumerGroup1") - public void consume(FoodOrderNotification foodOrderNotification) { - System.out.println("Consumed message: " + foodOrderNotification.toString()); + @KafkaListener(topics = "foodsOrder", groupId = "${spring.kafka.consumer.group-id}") + public void listen(Notification notification) { + System.out.println("Received notification: " + notification); + // Process the notification } -} \ No newline at end of file + +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index c8551b0..085c848 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,4 +1,5 @@ spring.application.name=spring-kafka-consumer-elastic server.port=8081 -spring.kafka.bootstrap-servers=localhost:9092 \ No newline at end of file +spring.kafka.bootstrap-servers=localhost:9092 +spring.kafka.consumer.group-id=springConsumerGroup1 \ No newline at end of file