Skip to content

Commit

Permalink
feat: create basic kafka consumer (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusvictor authored Nov 27, 2024
1 parent 85773cc commit cbf1f62
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 29 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,15 @@
# payments-hexagonal
A Spring Boot API developed in Hexagonal Architecture

## Docker compose
```bash
docker-compose up -d
```

## Kafka
* Consume messages for debugging:
```bash
kafka-console-consumer --bootstrap-server localhost:9092 --topic payments.info --from-beginning
```

## Requirements
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.mv.hexagonal.payments.adapters.in.consumer;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@RequiredArgsConstructor
@Slf4j
@Component
public class PaymentInfoConsumer {
@KafkaListener(
topics = "${topics.payment-info.name}",
groupId = "${topics.payment-info.group-id}")
public void receive(ConsumerRecord<String, String> consumerRecord) {
log.info("Received payment info: {}", consumerRecord);
log.info("consumer key: {}", consumerRecord.key());
log.info("consumer value: {}", consumerRecord.value());
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.mv.hexagonal.payments.adapters.out.producer;
package com.mv.hexagonal.payments.adapters.out;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,39 +37,37 @@ public Payment execute(Payment newPayment) {

var paymentCreated = createPaymentOutputPort.create(newPayment);

validateTransaction(fromUser, paymentCreated);
executeTransaction(fromUser, toUser, paymentCreated);

publishPaymentInfo(paymentCreated);
try {
validateTransaction(fromUser, paymentCreated);
executeTransaction(fromUser, toUser, paymentCreated);
} finally {
postActionTransaction(paymentCreated);
}

return paymentCreated;
}

private void executeTransaction(User fromUser, User toUser, Payment paymentCreated) {
private void executeTransaction(User fromUser, User toUser, Payment payment) {
try {
transferBalanceOutputPort.transfer(fromUser, toUser, paymentCreated.getAmount());
paymentCreated.approve();
transferBalanceOutputPort.transfer(fromUser, toUser, payment.getAmount());
payment.approve();
} catch (Exception ex) {
log.error("Error while executing payment", ex);
paymentCreated.cancelByError();
throw new FailedDependencyException("Error while executing payment", paymentCreated);
} finally {
updatePaymentOutputPort.update(paymentCreated);
payment.cancelByError();
throw new FailedDependencyException("Error while executing payment", payment);
}
}

private void validateTransaction(User fromUser, Payment paymentCreated) {
private void validateTransaction(User fromUser, Payment payment) {
try {
validateIfSenderHasEnoughBalance(fromUser, paymentCreated);
validateIfTransactionIsSafe(paymentCreated);
validateIfSenderHasEnoughBalance(fromUser, payment);
validateIfTransactionIsSafe(payment);
} catch (FraudValidationException ex) {
paymentCreated.cancelByFraud();
throw new BadRequestException("Fraud detected", paymentCreated);
payment.cancelByFraud();
throw new BadRequestException("Fraud detected", payment);
} catch (InsufficientAmountException ex) {
paymentCreated.cancelByInsufficientAmount();
throw new BadRequestException("Insufficient amount", paymentCreated);
} finally {
updatePaymentOutputPort.update(paymentCreated);
payment.cancelByInsufficientAmount();
throw new BadRequestException("Insufficient amount", payment);
}
}

Expand All @@ -95,7 +93,8 @@ private void validateIfTransactionIsSafe(Payment payment) {
validateTransactionByFraudOutputPort.validate(payment);
}

private void publishPaymentInfo(Payment payment) {
private void postActionTransaction(Payment payment) {
updatePaymentOutputPort.update(payment);
publishPaymentInfoOutputPort.publish(payment);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.mv.hexagonal.payments.configs;

import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${topics.bootstrap-address}")
private String bootstrapAddress;

@Value(value = "${topics.payment-info.group-id}")
private String groupId;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// props.put(GROUP_ID_CONFIG, groupId);

return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(this.consumerFactory());
return factory;
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.mv.hexagonal.payments.configs;

import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
Expand All @@ -16,11 +16,17 @@

@Configuration
public class KafkaProducerConfig {
@Value("${topics.bootstrap-address}")
private String bootstrapAddress;

@Value("${topics.payment-info.group-id}")
private String groupId;

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(GROUP_ID_CONFIG, "mv");
configProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
// configProps.put(GROUP_ID_CONFIG, groupId);
configProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
Expand Down
9 changes: 5 additions & 4 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,18 @@ topics:
name: payments.info
partitions: 3
replication-factor: 1
group-id: payments-group01


# Kafka Logs
logging:
level:
org:
springframework.kafka.*: OFF
springframework:
kafka:
listener:
[KafkaMessageListenerContainer$ListenerConsumer]: OFF
# springframework:
# kafka:
# listener:
# [KafkaMessageListenerContainer$ListenerConsumer]: OFF
org.apache.kafka.*: OFF
kafka.*: OFF
org.apache.zookeeper.*: OFF

0 comments on commit cbf1f62

Please sign in to comment.