Skip to content

Commit

Permalink
#1180 & #1237 create payment and order in paypal
Browse files Browse the repository at this point in the history
  • Loading branch information
tuannguyenh1 committed Nov 7, 2024
1 parent af2c942 commit 40f6a23
Show file tree
Hide file tree
Showing 60 changed files with 1,767 additions and 91 deletions.
16 changes: 16 additions & 0 deletions kafka/connects/debezium-checkout-status.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "dbcheckout-status",
"database.user": "admin",
"database.dbname": "order",
"database.hostname": "postgres",
"database.password": "admin",
"database.port": "5432",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"schema.include.list": "public",
"table.include.list": "public.checkout",
"slot.name": "checkout_status_slot"
}
16 changes: 16 additions & 0 deletions kafka/connects/debezium-payment.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "dbpayment",
"database.user": "admin",
"database.dbname": "payment",
"database.hostname": "postgres",
"database.password": "admin",
"database.port": "5432",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"schema.include.list": "public",
"table.include.list": "public.payment",
"slot.name": "payment_slot"
}
13 changes: 13 additions & 0 deletions order/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
Expand All @@ -51,6 +59,11 @@
<groupId>org.liquibase</groupId>
<artifactId>liquibase-core</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.yas</groupId>
<artifactId>common-library</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.verify;

import com.yas.order.config.KafkaIntegrationTestConfiguration;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Import;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@SpringBootTest
@Testcontainers
@Import({KafkaIntegrationTestConfiguration.class})
class CustomerServiceIT {
@Container
@ServiceConnection
Expand Down
3 changes: 2 additions & 1 deletion order/src/it/java/com/yas/order/service/OrderServiceIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.yas.commonlibrary.IntegrationTestConfiguration;
import com.yas.commonlibrary.exception.NotFoundException;
import com.yas.order.OrderApplication;
import com.yas.order.config.KafkaIntegrationTestConfiguration;
import com.yas.order.model.Order;
import com.yas.order.model.enumeration.OrderStatus;
import com.yas.order.model.enumeration.PaymentStatus;
Expand Down Expand Up @@ -37,7 +38,7 @@
import org.springframework.data.util.Pair;

@SpringBootTest(classes = OrderApplication.class)
@Import(IntegrationTestConfiguration.class)
@Import({IntegrationTestConfiguration.class, KafkaIntegrationTestConfiguration.class})
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
class OrderServiceIT {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,24 @@
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.verify;

import com.yas.order.config.KafkaIntegrationTestConfiguration;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Import;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@SpringBootTest
@Testcontainers
@Import({KafkaIntegrationTestConfiguration.class})
class ProductServiceIT {
@Container
@ServiceConnection
Expand Down
3 changes: 3 additions & 0 deletions order/src/it/java/com/yas/order/service/TaxServiceIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.verify;

import com.yas.order.config.KafkaIntegrationTestConfiguration;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.context.annotation.Import;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@SpringBootTest
@Testcontainers
@Import({KafkaIntegrationTestConfiguration.class})
class TaxServiceIT {
@Container
@ServiceConnection
Expand Down
19 changes: 18 additions & 1 deletion order/src/it/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,21 @@ spring.security.oauth2.resourceserver.jwt.issuer-uri=test
springdoc.oauthflow.authorization-url=test
springdoc.oauthflow.token-url=test
spring.jpa.open-in-view=true
cors.allowed-origins=*
cors.allowed-origins=*

cdc.event.checkout.status.topic-name=dbcheckout-status.public.checkout
cdc.event.checkout.status.group-id=checkout-status

cdc.event.payment.topic-name=dbpayment.public.payment
cdc.event.payment.update.group-id=payment-update

kafka.version=7.0.9

spring.kafka.consumer.bootstrap-servers=kafka:9092
spring.aop.proxy-target-class=true

spring.kafka.producer.bootstrap-servers=kafka:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

spring.kafka.bootstrap-servers=localhost:9092
22 changes: 22 additions & 0 deletions order/src/main/java/com/yas/order/config/JsonConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.yas.order.config;


import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class JsonConfig {

@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper();
}

@Bean
public Gson gson() {
return new Gson();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

@ConfigurationProperties(prefix = "yas.services")
public record ServiceUrlConfig(
String cart, String customer, String product, String tax, String promotion) {
String cart, String customer, String product, String tax, String promotion, String payment) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@ public ResponseEntity<Long> updateCheckoutStatus(@Valid @RequestBody CheckoutSta
return ResponseEntity.ok(checkoutService.updateCheckoutStatus(checkoutStatusPutVm));
}

@GetMapping("/storefront/checkouts/{id}")
public ResponseEntity<CheckoutVm> getOrderWithItemsById(@PathVariable String id) {
@GetMapping("/storefront/checkouts/pending/{id}")
public ResponseEntity<CheckoutVm> getPendingCheckoutDetailsById(@PathVariable String id) {
return ResponseEntity.ok(checkoutService.getCheckoutPendingStateWithItemsById(id));
}

@GetMapping("/storefront/checkouts/{id}")
public ResponseEntity<CheckoutVm> getCheckoutById(@PathVariable String id) {
return ResponseEntity.ok(checkoutService.findCheckoutWithItemsById(id));
}

@PutMapping("/storefront/checkouts/{id}/payment-method")
@ApiResponses(value = {
@ApiResponse(responseCode = ApiConstant.CODE_200, description = ApiConstant.OK,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.yas.order.kafka;

import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;

@EnableKafka
@Configuration
public class AppKafkaListenerConfigurer implements KafkaListenerConfigurer {

private LocalValidatorFactoryBean validator;

public AppKafkaListenerConfigurer(LocalValidatorFactoryBean validator) {
this.validator = validator;
}

@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
// Enable message validation
registrar.setValidator(this.validator);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package com.yas.order.kafka.consumer;

import static com.yas.order.utils.JsonUtils.convertObjectToString;
import static com.yas.order.utils.JsonUtils.createJsonErrorObject;
import static com.yas.order.utils.JsonUtils.getAttributesNode;
import static com.yas.order.utils.JsonUtils.getJsonValueOrThrow;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.yas.commonlibrary.exception.BadRequestException;
import com.yas.commonlibrary.exception.NotFoundException;
import com.yas.order.model.Checkout;
import com.yas.order.model.enumeration.CheckoutProgress;
import com.yas.order.model.enumeration.CheckoutState;
import com.yas.order.repository.CheckoutRepository;
import com.yas.order.service.PaymentService;
import com.yas.order.utils.Constants;
import com.yas.order.viewmodel.payment.CheckoutPaymentVm;
import java.util.Objects;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class OrderStatusConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(OrderStatusConsumer.class);
private final PaymentService paymentService;
private final CheckoutRepository checkoutRepository;
private final ObjectMapper objectMapper;
private final Gson gson;

@KafkaListener(
topics = "${cdc.event.checkout.status.topic-name}",
groupId = "${cdc.event.checkout.status.group-id}"
)
@RetryableTopic(
attempts = "1"
)
public void listen(ConsumerRecord<?, ?> consumerRecord) {

if (Objects.isNull(consumerRecord)) {
LOGGER.info("ConsumerRecord is null");
return;
}
JsonObject valueObject = gson.fromJson((String) consumerRecord.value(), JsonObject.class);
processCheckoutEvent(valueObject);

}

private void processCheckoutEvent(JsonObject valueObject) {
Optional.ofNullable(valueObject)
.filter(value -> value.has("after"))
.map(value -> value.getAsJsonObject("after"))
.ifPresent(this::handleAfterJson);
}

private void handleAfterJson(JsonObject after) {

String id = getJsonValueOrThrow(after, Constants.Column.ID_COLUMN,
Constants.ErrorCode.ID_NOT_EXISTED);
String status = getJsonValueOrThrow(after, Constants.Column.STATUS_COLUMN,
Constants.ErrorCode.STATUS_NOT_EXISTED, id);
String progress = getJsonValueOrThrow(after, Constants.Column.CHECKOUT_PROGRESS_COLUMN,
Constants.ErrorCode.PROGRESS_NOT_EXISTED, id);

if (!isPaymentProcessing(status, progress)) {
LOGGER.info("Checkout record with ID {} lacks the status 'PAYMENT_PROCESSING' and progress 'STOCK_LOCKED'",
id);
return;
}

LOGGER.info("Checkout record with ID {} has the status 'PAYMENT_PROCESSING' and the process 'STOCK_LOCKED'",
id);

Checkout checkout = checkoutRepository
.findById(id)
.orElseThrow(() -> new NotFoundException(Constants.ErrorCode.CHECKOUT_NOT_FOUND, id));

processPaymentAndUpdateCheckout(checkout);
}

private boolean isPaymentProcessing(String status, String process) {
return CheckoutState.PAYMENT_PROCESSING.name().equalsIgnoreCase(status)
&& CheckoutProgress.STOCK_LOCKED.name().equalsIgnoreCase(process);
}

private void processPaymentAndUpdateCheckout(Checkout checkout) {

try {
Long paymentId = processPayment(checkout);
checkout.setProgress(CheckoutProgress.PAYMENT_CREATED);
checkout.setLastError(null);

ObjectNode updatedAttributes = updateAttributesWithPayment(checkout.getAttributes(), paymentId);
checkout.setAttributes(convertObjectToString(objectMapper, updatedAttributes));

} catch (Exception e) {

checkout.setProgress(CheckoutProgress.PAYMENT_CREATED_FAILED);

ObjectNode error = createJsonErrorObject(objectMapper, CheckoutProgress.PAYMENT_CREATED_FAILED.name(),
e.getMessage());
checkout.setLastError(convertObjectToString(objectMapper, error));

LOGGER.error(e.getMessage());
throw new BadRequestException(Constants.ErrorCode.PROCESS_CHECKOUT_FAILED, checkout.getId());

} finally {
checkoutRepository.save(checkout);
}
}

private Long processPayment(Checkout checkout) {

CheckoutPaymentVm requestDto = new CheckoutPaymentVm(
checkout.getId(),
checkout.getPaymentMethodId(),
checkout.getTotalAmount()
);

Long paymentId = paymentService.createPaymentFromEvent(requestDto);
LOGGER.info("Payment created successfully with ID: {}", paymentId);

return paymentId;
}

private ObjectNode updateAttributesWithPayment(String attributes, Long paymentId) {

ObjectNode attributesNode = getAttributesNode(objectMapper, attributes);
attributesNode.put(Constants.Column.CHECKOUT_ATTRIBUTES_PAYMENT_ID_FIELD, paymentId);

return attributesNode;
}

}
Loading

0 comments on commit 40f6a23

Please sign in to comment.