Skip to content

Commit

Permalink
"error with deserialization"
Browse files Browse the repository at this point in the history
  • Loading branch information
itsnaveenk committed Sep 5, 2024
1 parent 9e58e74 commit ef504c3
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 69 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.itsnaveenk.spring_kafka_consumer_elastic.configs;

import com.itsnaveenk.spring_kafka_consumer_elastic.model.Notification;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -20,23 +21,24 @@ public class KafkaConsumerConfig {
private String bootstrapAddress;

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
public ConsumerFactory<String, Notification> consumerFactory() {

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "springConsumerGroup1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,ErrorHandlingDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // Adjust package names as needed
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "springConsumerGroup1");;
config.put(ConsumerConfig.GROUP_ID_CONFIG, "springConsumerGroup1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class.getName());
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // Adjust package names as needed


return new DefaultKafkaConsumerFactory<>(props);
return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
public ConcurrentKafkaListenerContainerFactory<String, Notification> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Notification> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.itsnaveenk.spring_kafka_consumer_elastic.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;

public class Notification {

private int user_Id;


private String message;

private int recipient_Id;

public Notification() {
}
public Notification(int user_Id, String message, int recipient_Id) {
this.user_Id = user_Id;
this.message = message;
this.recipient_Id = recipient_Id;
}

public int getUser_Id() {
return user_Id;
}

public void setUser_Id(int user_Id) {
this.user_Id = user_Id;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

public int getRecipient_Id() {
return recipient_Id;
}

public void setRecipient_Id(int recipient_Id) {
this.recipient_Id = recipient_Id;
}

@Override
public String toString() {
return "Notification{" +
"user_Id=" + user_Id +
", message='" + message + '\'' +
", recipient_Id=" + recipient_Id +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package com.itsnaveenk.spring_kafka_consumer_elastic.service;

import com.itsnaveenk.spring_kafka_consumer_elastic.Entity.FoodOrderNotification;
import org.springframework.stereotype.Service;
import com.itsnaveenk.spring_kafka_consumer_elastic.model.Notification;
import org.springframework.kafka.annotation.KafkaListener;


import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

@KafkaListener(topics = "foodsOrder", groupId = "springConsumerGroup1")
public void consume(FoodOrderNotification foodOrderNotification) {
System.out.println("Consumed message: " + foodOrderNotification.toString());
@KafkaListener(topics = "foodsOrder", groupId = "${spring.kafka.consumer.group-id}")
public void listen(Notification notification) {
System.out.println("Received notification: " + notification);
// Process the notification
}
}

}
3 changes: 2 additions & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
spring.application.name=spring-kafka-consumer-elastic
server.port=8081

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=springConsumerGroup1

0 comments on commit ef504c3

Please sign in to comment.