Skip to content

Commit

Permalink
#1180 & #1237 create payment and order in paypal
Browse files Browse the repository at this point in the history
  • Loading branch information
tuannguyenh1 committed Nov 6, 2024
1 parent af2c942 commit 5ff62c9
Show file tree
Hide file tree
Showing 53 changed files with 1,659 additions and 80 deletions.
16 changes: 16 additions & 0 deletions kafka/connects/debezium-checkout-status.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "dbcheckout-status",
"database.user": "admin",
"database.dbname": "order",
"database.hostname": "postgres",
"database.password": "admin",
"database.port": "5432",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"schema.include.list": "public",
"table.include.list": "public.checkout",
"slot.name": "checkout_status_slot"
}
16 changes: 16 additions & 0 deletions kafka/connects/debezium-payment.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "dbpayment",
"database.user": "admin",
"database.dbname": "payment",
"database.hostname": "postgres",
"database.password": "admin",
"database.port": "5432",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"schema.include.list": "public",
"table.include.list": "public.payment",
"slot.name": "payment_slot"
}
13 changes: 13 additions & 0 deletions order/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
Expand All @@ -51,6 +59,11 @@
<groupId>org.liquibase</groupId>
<artifactId>liquibase-core</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.yas</groupId>
<artifactId>common-library</artifactId>
Expand Down
8 changes: 7 additions & 1 deletion order/src/it/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,10 @@ spring.security.oauth2.resourceserver.jwt.issuer-uri=test
springdoc.oauthflow.authorization-url=test
springdoc.oauthflow.token-url=test
spring.jpa.open-in-view=true
cors.allowed-origins=*
cors.allowed-origins=*

cdc.event.checkout.status.topic-name=dbcheckout-status.public.checkout
cdc.event.checkout.status.group-id=checkout-status

cdc.event.payment.topic-name=dbpayment.public.payment
cdc.event.payment.update.group-id=payment-update
22 changes: 22 additions & 0 deletions order/src/main/java/com/yas/order/config/JsonConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.yas.order.config;


import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class JsonConfig {

@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper();
}

@Bean
public Gson gson() {
return new Gson();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

@ConfigurationProperties(prefix = "yas.services")
public record ServiceUrlConfig(
String cart, String customer, String product, String tax, String promotion) {
String cart, String customer, String product, String tax, String promotion, String payment) {
}
144 changes: 144 additions & 0 deletions order/src/main/java/com/yas/order/consumer/OrderStatusConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package com.yas.order.consumer;

import static com.yas.order.utils.JsonUtils.convertObjectToString;
import static com.yas.order.utils.JsonUtils.createJsonErrorObject;
import static com.yas.order.utils.JsonUtils.getAttributesNode;
import static com.yas.order.utils.JsonUtils.getJsonValueOrThrow;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.yas.commonlibrary.exception.BadRequestException;
import com.yas.commonlibrary.exception.NotFoundException;
import com.yas.order.model.Checkout;
import com.yas.order.model.enumeration.CheckoutProgress;
import com.yas.order.model.enumeration.CheckoutState;
import com.yas.order.repository.CheckoutRepository;
import com.yas.order.service.PaymentService;
import com.yas.order.utils.Constants;
import com.yas.order.viewmodel.payment.CheckoutPaymentVm;
import java.util.Objects;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class OrderStatusConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(OrderStatusConsumer.class);
private final PaymentService paymentService;
private final CheckoutRepository checkoutRepository;
private final ObjectMapper objectMapper;
private final Gson gson;

@KafkaListener(
topics = "${cdc.event.checkout.status.topic-name}",
groupId = "${cdc.event.checkout.status.group-id}"
)
@RetryableTopic(
attempts = "1"
)
public void listen(ConsumerRecord<?, ?> consumerRecord) {

if (Objects.isNull(consumerRecord)) {
LOGGER.info("ConsumerRecord is null");
return;
}
JsonObject valueObject = gson.fromJson((String) consumerRecord.value(), JsonObject.class);
processCheckoutEvent(valueObject);

}

private void processCheckoutEvent(JsonObject valueObject) {
Optional.ofNullable(valueObject)
.filter(value -> value.has("after"))
.map(value -> value.getAsJsonObject("after"))
.ifPresent(this::handleAfterJson);
}

private void handleAfterJson(JsonObject after) {

String id = getJsonValueOrThrow(after, Constants.Column.ID_COLUMN,
Constants.ErrorCode.ID_NOT_EXISTED);
String status = getJsonValueOrThrow(after, Constants.Column.STATUS_COLUMN,
Constants.ErrorCode.STATUS_NOT_EXISTED, id);
String progress = getJsonValueOrThrow(after, Constants.Column.CHECKOUT_PROGRESS_COLUMN,
Constants.ErrorCode.PROGRESS_NOT_EXISTED, id);

if (!isPaymentProcessing(status, progress)) {
LOGGER.info("Checkout record with ID {} lacks the status 'PAYMENT_PROCESSING' and progress 'STOCK_LOCKED'",
id);
return;
}

LOGGER.info("Checkout record with ID {} has the status 'PAYMENT_PROCESSING' and the process 'STOCK_LOCKED'",
id);

Checkout checkout = checkoutRepository
.findById(id)
.orElseThrow(() -> new NotFoundException(Constants.ErrorCode.CHECKOUT_NOT_FOUND, id));

processPaymentAndUpdateCheckout(checkout);
}

private boolean isPaymentProcessing(String status, String process) {
return CheckoutState.PAYMENT_PROCESSING.name().equalsIgnoreCase(status)
&& CheckoutProgress.STOCK_LOCKED.name().equalsIgnoreCase(process);
}

private void processPaymentAndUpdateCheckout(Checkout checkout) {

try {
Long paymentId = processPayment(checkout);
checkout.setProgress(CheckoutProgress.PAYMENT_CREATED);
checkout.setLastError(null);

ObjectNode updatedAttributes = updateAttributesWithPayment(checkout.getAttributes(), paymentId);
checkout.setAttributes(convertObjectToString(objectMapper, updatedAttributes));

} catch (Exception e) {

checkout.setProgress(CheckoutProgress.PAYMENT_CREATED_FAILED);

ObjectNode error = createJsonErrorObject(objectMapper, CheckoutProgress.PAYMENT_CREATED_FAILED.name(),
e.getMessage());
checkout.setLastError(convertObjectToString(objectMapper, error));

LOGGER.error(e.getMessage());
throw new BadRequestException(Constants.ErrorCode.PROCESS_CHECKOUT_FAILED, checkout.getId());

} finally {
checkoutRepository.save(checkout);
}
}

private Long processPayment(Checkout checkout) {

CheckoutPaymentVm requestDto = new CheckoutPaymentVm(
checkout.getId(),
checkout.getPaymentMethodId(),
checkout.getTotalAmount()
);

Long paymentId = paymentService.createPaymentFromEvent(requestDto);
LOGGER.info("Payment created successfully with ID: {}", paymentId);

return paymentId;
}

private ObjectNode updateAttributesWithPayment(String attributes, Long paymentId) {

ObjectNode attributesNode = getAttributesNode(objectMapper, attributes);
attributesNode.put(Constants.Column.CHECKOUT_ATTRIBUTES_PAYMENT_ID_FIELD, paymentId);

return attributesNode;
}

}
108 changes: 108 additions & 0 deletions order/src/main/java/com/yas/order/consumer/PaymentUpdateConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.yas.order.consumer;

import static com.yas.order.utils.JsonUtils.convertObjectToString;
import static com.yas.order.utils.JsonUtils.getAttributesNode;
import static com.yas.order.utils.JsonUtils.getJsonValueOrNull;
import static com.yas.order.utils.JsonUtils.getJsonValueOrThrow;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.yas.order.model.Checkout;
import com.yas.order.model.enumeration.CheckoutProgress;
import com.yas.order.model.enumeration.CheckoutState;
import com.yas.order.service.CheckoutService;
import com.yas.order.utils.Constants;
import java.util.Objects;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class PaymentUpdateConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(PaymentUpdateConsumer.class);
private final CheckoutService checkoutService;
private final ObjectMapper objectMapper;
private final Gson gson;

@KafkaListener(
topics = "${cdc.event.payment.topic-name}",
groupId = "${cdc.event.payment.update.group-id}"
)
@RetryableTopic
public void listen(ConsumerRecord<?, ?> consumerRecord) {

if (Objects.isNull(consumerRecord)) {
LOGGER.info("Consumer Record is null");
return;
}
JsonObject valueObject = gson.fromJson((String) consumerRecord.value(), JsonObject.class);
processPaymentEvent(valueObject);

}

private void processPaymentEvent(JsonObject valueObject) {
Optional.ofNullable(valueObject)
.filter(
value -> value.has("op") && "u".equals(value.get("op").getAsString())
)
.filter(value -> value.has("before") && value.has("after"))
.ifPresent(this::handleJsonForUpdateCheckout);
}

private void handleJsonForUpdateCheckout(JsonObject valueObject) {

JsonObject before = valueObject.getAsJsonObject("before");
JsonObject after = valueObject.getAsJsonObject("after");

String id = getJsonValueOrThrow(after, Constants.Column.ID_COLUMN,
Constants.ErrorCode.ID_NOT_EXISTED);

String beforePaypalOrderId = getJsonValueOrNull(before,
Constants.Column.CHECKOUT_ATTRIBUTES_PAYMENT_PROVIDER_CHECKOUT_ID_FIELD);
String afterPaypalOrderId = getJsonValueOrNull(after,
Constants.Column.CHECKOUT_ATTRIBUTES_PAYMENT_PROVIDER_CHECKOUT_ID_FIELD);

if (!Objects.isNull(afterPaypalOrderId) && !afterPaypalOrderId.equals(beforePaypalOrderId)) {

LOGGER.info("Handle json for update Checkout with Payment {}", id);

String checkoutId = getJsonValueOrThrow(after, Constants.Column.CHECKOUT_ID_COLUMN,
Constants.ErrorCode.CHECKOUT_ID_NOT_EXISTED);
updateCheckOut(checkoutId, afterPaypalOrderId);
} else {
LOGGER.warn("It is not an event to update an Order on PayPal with Payment ID {}", id);
}
}

private void updateCheckOut(String checkoutId, String paymentProviderCheckoutId) {

Checkout checkout = checkoutService.findCheckoutById(checkoutId);
checkout.setCheckoutState(CheckoutState.PAYMENT_PROCESSING);
checkout.setProgress(CheckoutProgress.PAYMENT_CREATED);

ObjectNode updatedAttributes = updateAttributesWithCheckout(checkout.getAttributes(),
paymentProviderCheckoutId);
checkout.setAttributes(convertObjectToString(objectMapper, updatedAttributes));

checkoutService.updateCheckout(checkout);
}

private ObjectNode updateAttributesWithCheckout(String attributes, String paymentProviderCheckoutId) {

ObjectNode attributesNode = getAttributesNode(objectMapper, attributes);
attributesNode.put(Constants.Column.CHECKOUT_ATTRIBUTES_PAYMENT_PROVIDER_CHECKOUT_ID_FIELD,
paymentProviderCheckoutId);

return attributesNode;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@ public ResponseEntity<Long> updateCheckoutStatus(@Valid @RequestBody CheckoutSta
return ResponseEntity.ok(checkoutService.updateCheckoutStatus(checkoutStatusPutVm));
}

@GetMapping("/storefront/checkouts/{id}")
public ResponseEntity<CheckoutVm> getOrderWithItemsById(@PathVariable String id) {
@GetMapping("/storefront/checkouts/pending/{id}")
public ResponseEntity<CheckoutVm> getPendingCheckoutDetailsById(@PathVariable String id) {
return ResponseEntity.ok(checkoutService.getCheckoutPendingStateWithItemsById(id));
}

@GetMapping("/storefront/checkouts/{id}")
public ResponseEntity<CheckoutVm> getCheckoutById(@PathVariable String id) {
return ResponseEntity.ok(checkoutService.findCheckoutWithItemsById(id));
}

@PutMapping("/storefront/checkouts/{id}/payment-method")
@ApiResponses(value = {
@ApiResponse(responseCode = ApiConstant.CODE_200, description = ApiConstant.OK,
Expand Down
Loading

0 comments on commit 5ff62c9

Please sign in to comment.