diff --git a/pom.xml b/pom.xml
index 38676ff..6a74c92 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,10 +39,12 @@
spring-kafka
- org.springframework.boot
- spring-boot-starter-web
+ org.modelmapper
+ modelmapper
+ 2.4.4
+
org.springframework.boot
spring-boot-starter-test
diff --git a/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/configs/ModelMapperConfig.java b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/configs/ModelMapperConfig.java
new file mode 100644
index 0000000..77448fe
--- /dev/null
+++ b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/configs/ModelMapperConfig.java
@@ -0,0 +1,14 @@
+package com.itsnaveenk.spring_kafka_consumer_elastic.configs;
+
+import org.modelmapper.ModelMapper;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class ModelMapperConfig {
+
+ @Bean
+ public ModelMapper modelMapper() {
+ return new ModelMapper();
+ }
+}
diff --git a/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/entity/NotificationEntity.java b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/entity/NotificationEntity.java
new file mode 100644
index 0000000..b639b25
--- /dev/null
+++ b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/entity/NotificationEntity.java
@@ -0,0 +1,53 @@
+package com.itsnaveenk.spring_kafka_consumer_elastic.entity;
+
+import org.springframework.data.elasticsearch.annotations.Document;
+
+@Document(indexName = "notification")
+public class NotificationEntity {
+
+ private int user_id;
+ private String message;
+ private int recipient_id;
+
+ public NotificationEntity() {
+ }
+
+ public NotificationEntity(int user_id, String message, int recipient_id) {
+ this.user_id = user_id;
+ this.message = message;
+ this.recipient_id = recipient_id;
+ }
+
+ public int getUser_id() {
+ return user_id;
+ }
+
+ public void setUser_id(int user_id) {
+ this.user_id = user_id;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public int getRecipient_id() {
+ return recipient_id;
+ }
+
+ public void setRecipient_id(int recipient_id) {
+ this.recipient_id = recipient_id;
+ }
+
+ @Override
+ public String toString() {
+ return "NotificationEntity{" +
+ "user_id=" + user_id +
+ ", message='" + message + '\'' +
+ ", recipient_id=" + recipient_id +
+ '}';
+ }
+}
diff --git a/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/repository/ElasticRepository.java b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/repository/ElasticRepository.java
new file mode 100644
index 0000000..eb85714
--- /dev/null
+++ b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/repository/ElasticRepository.java
@@ -0,0 +1,13 @@
+package com.itsnaveenk.spring_kafka_consumer_elastic.repository;
+
+
+import com.itsnaveenk.spring_kafka_consumer_elastic.entity.NotificationEntity;
+import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+
+@Repository
+public interface ElasticRepository extends ElasticsearchRepository {
+ List findByMessage(String text);
+}
diff --git a/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/service/ElasticsearchService.java b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/service/ElasticsearchService.java
new file mode 100644
index 0000000..2354d2b
--- /dev/null
+++ b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/service/ElasticsearchService.java
@@ -0,0 +1,30 @@
+package com.itsnaveenk.spring_kafka_consumer_elastic.service;
+
+
+import com.itsnaveenk.spring_kafka_consumer_elastic.entity.NotificationEntity;
+import com.itsnaveenk.spring_kafka_consumer_elastic.model.Notification;
+import com.itsnaveenk.spring_kafka_consumer_elastic.repository.ElasticRepository;
+import org.modelmapper.ModelMapper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+
+@Service
+public class ElasticsearchService {
+
+ private final ElasticRepository elasticRepository;
+ private final ModelMapper modelMapper;
+
+ @Autowired
+ public ElasticsearchService(ElasticRepository elasticRepository, ModelMapper modelMapper) {
+ this.elasticRepository = elasticRepository;
+ this.modelMapper = modelMapper;
+ }
+
+ public void saveNotification(Notification notification) {
+ NotificationEntity notificationEntity = modelMapper.map(notification, NotificationEntity.class);
+ elasticRepository.save(notificationEntity);
+ }
+
+
+}
diff --git a/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/service/KafkaConsumerService.java b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/service/KafkaConsumerService.java
index a894263..bf13b6f 100644
--- a/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/service/KafkaConsumerService.java
+++ b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/service/KafkaConsumerService.java
@@ -1,15 +1,23 @@
package com.itsnaveenk.spring_kafka_consumer_elastic.service;
+import com.itsnaveenk.spring_kafka_consumer_elastic.entity.NotificationEntity;
import com.itsnaveenk.spring_kafka_consumer_elastic.model.Notification;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.elasticsearch.core.AbstractElasticsearchTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
+ @Autowired
+ ElasticsearchService elasticsearchService;
+
@KafkaListener(topics = "foodsOrder", groupId = "${spring.kafka.consumer.group-id}")
public void listen(Notification notification) {
System.out.println("Received notification: " + notification);
// Process the notification
+ elasticsearchService.saveNotification(notification);
+
}
}
diff --git a/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/service/NotificationMapper.java b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/service/NotificationMapper.java
new file mode 100644
index 0000000..7835726
--- /dev/null
+++ b/src/main/java/com/itsnaveenk/spring_kafka_consumer_elastic/service/NotificationMapper.java
@@ -0,0 +1,57 @@
+package com.itsnaveenk.spring_kafka_consumer_elastic.service;
+
+import com.itsnaveenk.spring_kafka_consumer_elastic.entity.NotificationEntity;
+import com.itsnaveenk.spring_kafka_consumer_elastic.model.Notification;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Custom DTO for Notification
+ */
+public class NotificationMapper implements Serializable {
+ private final int user_id;
+ private final String message;
+ private final int recipient_id;
+
+ public NotificationMapper(int user_id, String message, int recipient_id) {
+ this.user_id = user_id;
+ this.message = message;
+ this.recipient_id = recipient_id;
+ }
+
+ public int getUser_id() {
+ return user_id;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public int getRecipient_id() {
+ return recipient_id;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ NotificationMapper entity = (NotificationMapper) o;
+ return Objects.equals(this.user_id, entity.user_id) &&
+ Objects.equals(this.message, entity.message) &&
+ Objects.equals(this.recipient_id, entity.recipient_id);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(user_id, message, recipient_id);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(" +
+ "user_id = " + user_id + ", " +
+ "message = " + message + ", " +
+ "recipient_id = " + recipient_id + ")";
+ }
+}