Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/refactor imposter reactive actions #62

Merged
merged 10 commits into from
Mar 13, 2022
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright (C) 2021 Finarkein Analytics Pvt. Ltd.
* All rights reserved This software is the confidential and proprietary information of Finarkein Analytics Pvt. Ltd.
* You shall not disclose such confidential information and shall use it only in accordance with the terms of the license
* agreement you entered into with Finarkein Analytics Pvt. Ltd.
*/
package io.finarkein.fiul.config;

import lombok.Getter;
import lombok.ToString;
import lombok.extern.log4j.Log4j2;
import org.springframework.context.annotation.Configuration;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.util.Optional;

@Configuration
@Getter
@Log4j2
public class DBCallHandlerSchedulerConfig {
@ToString.Exclude
private final Scheduler scheduler;

static final String PROPERTY_PREFIX = "fiul.db-io-handler";

public DBCallHandlerSchedulerConfig() {
final int threadCap = Optional
.ofNullable(System.getProperty(PROPERTY_PREFIX + ".threadCap"))
.map(Integer::parseInt)
.orElseGet(() -> 10 * Runtime.getRuntime().availableProcessors());
final int queueSize = Optional
.ofNullable(System.getProperty(PROPERTY_PREFIX + ".queue-size"))
.map(Integer::parseInt)
.orElse(100000);
scheduler = Schedulers.newBoundedElastic(threadCap, queueSize, "db-io-handler");
log.info("name:db-io-handler, scheduler-type:boundedElastic, threadCap:{}, queueSize:{}"
, threadCap, queueSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.ComponentScan;
import reactor.blockhound.BlockHound;

@Log4j2
@EntityScan(basePackages = {"io.finarkein.fiul", "io.finarkein.api.aa"})
Expand All @@ -22,7 +23,12 @@
public class FiulServerApplication {

public static void main(String[] args) {
try {
BlockHound.install();
}catch (Exception e){
e.printStackTrace();
}

primarypi marked this conversation as resolved.
Show resolved Hide resolved
SpringApplication.run(FiulServerApplication.class, args);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ spring.jpa.generate-ddl=true
spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation=true
# Log JPA queries
# Comment this in production
#spring.jpa.show-sql=true
spring.jpa.show-sql=true
# Drop and create new tables (create, create-drop, validate, update)
# Comment this in production
spring.jpa.hibernate.ddl-auto=update
Expand Down
5 changes: 5 additions & 0 deletions fiul-rest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,10 @@
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>4.0.0.RC1</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,17 @@ public Mono<ConsentHandleResponse> getConsentStatus(String consentHandle, Option
return buildFromConsentState(consentHandle)
.get()
.map(Mono::just)
primarypi marked this conversation as resolved.
Show resolved Hide resolved
.orElseGet(() -> aaNameOptional
.or(consentRequestAANameByConsentHandle(consentHandle))
.map(aaName -> aafiuClient.getConsentStatus(consentHandle, aaName)
.flatMap(consentHandleResponse -> {
final Optional<ConsentStateDTO> optionalConsentState = consentStore
.getConsentStateByHandle(consentHandle);
if (optionalConsentState.isPresent()
&& Objects.nonNull(optionalConsentState.get().getPostConsentResponseTimestamp())
&& strToTimeStamp.apply(consentHandleResponse.getTimestamp())
.before(optionalConsentState.get().getPostConsentResponseTimestamp())) {
throw Errors.InvalidRequest.with(optionalConsentState.get().getTxnId(),
"Invalid consent handle response timestamp : " + consentHandleResponse.getTimestamp());
}
return Mono.just(consentHandleResponse);
})
)
.orElseThrow(() -> Errors.NoDataFound.with(UUIDSupplier.get(), "ConsentHandle not found, try with aaHandle"))
.orElseGet(() -> {
if (aaNameOptional.isPresent()) {
String aaName = aaNameOptional.get();
return getConsentHandleResponseMono(consentHandle, aaName);
}
return consentRequestAANameByConsentHandle(consentHandle)
.flatMap(optionalAAName -> optionalAAName
.map(aaName -> getConsentHandleResponseMono(consentHandle, aaName))
.orElseThrow(() -> Errors.NoDataFound.with(UUIDSupplier.get(), "ConsentHandle not found, try with aaHandle"))
);
}
).doOnSuccess(consentHandleResponse -> {
log.debug("GetConsentStatus: success: response:{}", consentHandleResponse);
consentStateUpdateHelper(consentHandleResponse.getTxnid(), consentHandleResponse.getConsentStatus().getId(),
Expand All @@ -134,6 +128,22 @@ public Mono<ConsentHandleResponse> getConsentStatus(String consentHandle, Option
.doOnError(error -> log.error("GetConsentStatus: error:{}", error.getMessage(), error));
}

private Mono<ConsentHandleResponse> getConsentHandleResponseMono(String consentHandle, String aaName) {
return aafiuClient.getConsentStatus(consentHandle, aaName)
.flatMap(consentHandleResponse -> consentStore.consentStateByHandle(consentHandle)
.flatMap(optionalConsentState -> {
if (optionalConsentState.isPresent()
&& Objects.nonNull(optionalConsentState.get().getPostConsentResponseTimestamp())
&& strToTimeStamp.apply(consentHandleResponse.getTimestamp())
.before(optionalConsentState.get().getPostConsentResponseTimestamp())) {
throw Errors.InvalidRequest.with(optionalConsentState.get().getTxnId(),
"Invalid consent handle response timestamp : " + consentHandleResponse.getTimestamp());
}
return Mono.just(consentHandleResponse);
})
);
}

private void consentStateUpdateHelper(String txnId, String consentId, String consentStatus) {
ConsentStateDTO consentStateDTO = consentStore.getConsentStateByTxnId(txnId);
if (consentStateDTO != null) {
Expand All @@ -143,8 +153,10 @@ private void consentStateUpdateHelper(String txnId, String consentId, String con
}
}

private Supplier<Optional<String>> consentRequestAANameByConsentHandle(final String consentHandle) {
return () -> consentStore.findRequestByConsentHandle(consentHandle).map(ConsentRequestDTO::getAaName);
private Mono<Optional<String>> consentRequestAANameByConsentHandle(final String consentHandle) {
return consentStore.findRequestByConsentHandle(consentHandle)
.map(optionalConsentRequestDTO -> optionalConsentRequestDTO.map(ConsentRequestDTO::getAaName))
;
}

private Supplier<Optional<String>> consentRequestAANameByConsentId(final String consentId) {
Expand Down Expand Up @@ -215,7 +227,7 @@ protected Mono<SignedConsentDTO> fetchAndSaveSignedConsent(final String consentI
return (consentState != null) ?
Optional.ofNullable(consentState.getAaId()) : Optional.empty();
})
.orElseThrow(() -> Errors.NoDataFound.with(UUIDSupplier.get(),"SignedConsent cannot be found, try with aaHandle")
.orElseThrow(() -> Errors.NoDataFound.with(UUIDSupplier.get(), "SignedConsent cannot be found, try with aaHandle")
.params(Map.of("consentId", consentId)));
return aafiuClient.getConsentArtefact(consentId, aaHandle)
.map(consentArtefact -> {
Expand Down Expand Up @@ -264,9 +276,12 @@ public void handleConsentNotification(ConsentNotificationLog consentNotification
@Override
public Mono<ConsentStateDTO> getConsentState(String consentHandle, Optional<String> aaHandle) {
log.debug("GetConsentState: start: consentHandle:{}, aaHandle:{}", consentHandle, aaHandle);
return consentStore.getConsentStateByHandle(consentHandle)
.map(Mono::just)
.orElseGet(() -> fetchConsentStatus(consentHandle, aaHandle))
return consentStore.consentStateByHandle(consentHandle)
.flatMap(optionalConsentStateDTO -> {
if (optionalConsentStateDTO.isEmpty())
return fetchConsentStatus(consentHandle, aaHandle);
return Mono.just(optionalConsentStateDTO.get());
})
.doOnSuccess(artefact -> log.debug("GetConsentState: success: consentHandle:{}, aaHandle:{}", consentHandle, aaHandle))
.doOnError(error -> log.error("GetConsentState: error:{}", error.getMessage(), error));
}
Expand Down Expand Up @@ -300,28 +315,38 @@ public void updateConsentState(ConsentStateDTO consentStateDTO) {
consentStore.updateConsentState(consentStateDTO);
}

private Mono<ConsentStateDTO> fetchConsentStatus(String consentHandle, Optional<String> aaHandle) {
return aaHandle
.or(() -> consentStore.findRequestByConsentHandle(consentHandle).map(ConsentRequestDTO::getAaName))
.map(aaName ->
aafiuClient
.getConsentStatus(consentHandle, aaName)
.flatMap(consentStatusResponse -> {
final var state = new ConsentStateDTO();
state.setConsentHandle(consentStatusResponse.getConsentHandle());
state.setConsentId(consentStatusResponse.getConsentStatus().getId());
state.setConsentStatus(consentStatusResponse.getConsentStatus().getStatus());
state.setTxnId(consentStatusResponse.getTxnid());
state.setAaId(aaName);

//saving consentState
consentStore.saveConsentState(state);
return Mono.just(state);
})
.onErrorMap(throwable -> Errors.InvalidRequest.with(UUIDSupplier.get(),
throwable.getMessage(), throwable))
)
.orElseThrow(() -> Errors.InvalidRequest.with(UUIDSupplier.get(),
"Unable to get status for given consentHandle:" + consentHandle + ", try with aaHandle"));
private Mono<ConsentStateDTO> fetchConsentStatus(String consentHandle, Optional<String> optionalAaHandle) {
if (optionalAaHandle.isPresent()) {
String aaName = optionalAaHandle.get();
return getConsentStateDTOMono(consentHandle, aaName);
}
return consentStore
.findRequestByConsentHandle(consentHandle)
.flatMap(optionalConsentRequest ->
optionalConsentRequest
.map(ConsentRequestDTO::getAaName)
.map(aaName -> getConsentStateDTOMono(consentHandle, aaName))
.orElseThrow(() -> Errors.InvalidRequest.with(UUIDSupplier.get(),
"Unable to get status for given consentHandle:" + consentHandle + ", try with aaHandle"))
);
}

private Mono<ConsentStateDTO> getConsentStateDTOMono(String consentHandle, String aaName) {
return aafiuClient
.getConsentStatus(consentHandle, aaName)
.flatMap(consentStatusResponse -> {
final var state = new ConsentStateDTO();
state.setConsentHandle(consentStatusResponse.getConsentHandle());
state.setConsentId(consentStatusResponse.getConsentStatus().getId());
state.setConsentStatus(consentStatusResponse.getConsentStatus().getStatus());
state.setTxnId(consentStatusResponse.getTxnid());
state.setAaId(aaName);

//saving consentState
consentStore.saveConsentState(state);
return Mono.just(state);
})
.onErrorMap(throwable -> Errors.InvalidRequest.with(UUIDSupplier.get(),
throwable.getMessage(), throwable));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.finarkein.api.aa.notification.ConsentStatusNotification;
import io.finarkein.api.aa.notification.Notifier;
import io.finarkein.api.aa.util.Functions;
import io.finarkein.fiul.config.DBCallHandlerSchedulerConfig;
import io.finarkein.fiul.consent.model.ConsentNotificationLog;
import io.finarkein.fiul.consent.model.ConsentRequestDTO;
import io.finarkein.fiul.consent.model.ConsentStateDTO;
Expand All @@ -25,6 +26,7 @@
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

import java.util.Optional;

Expand All @@ -47,6 +49,9 @@ class ConsentStoreImpl implements ConsentStore {
@Autowired
private RepoSignedConsent repoSignedConsent;

@Autowired
protected DBCallHandlerSchedulerConfig dbBlockingCallSchedulerConfig;

@Override
public void saveConsentRequest(String consentHandle, ConsentRequest consentRequest) {
ConsentRequestDTO consentRequestDTO = ConsentRequestDTO.builder()
Expand Down Expand Up @@ -91,8 +96,9 @@ public Optional<SignedConsentDTO> findSignedConsent(String consentId) {
}

@Override
public Optional<ConsentRequestDTO> findRequestByConsentHandle(String consentHandle) {
return consentRequestDTORepository.findById(consentHandle);
public Mono<Optional<ConsentRequestDTO>> findRequestByConsentHandle(String consentHandle) {
return Mono.fromCallable(() -> consentRequestDTORepository.findById(consentHandle))
.subscribeOn(dbBlockingCallSchedulerConfig.getScheduler());
}

@Override
Expand Down Expand Up @@ -146,6 +152,12 @@ public Optional<ConsentStateDTO> getConsentStateByHandle(String consentHandle) {
return consentStateRepository.findById(consentHandle);
}

@Override
public Mono<Optional<ConsentStateDTO>> consentStateByHandle(String consentHandle) {
return Mono.fromCallable(() -> consentStateRepository.findById(consentHandle))
.subscribeOn(dbBlockingCallSchedulerConfig.getScheduler());
}

@Override
public ConsentStateDTO getConsentStateById(String consentId) {
return consentStateRepository.findByConsentId(consentId).orElse(null);
Expand All @@ -156,6 +168,12 @@ public ConsentStateDTO getConsentStateByTxnId(String txnId) {
return consentStateRepository.findByTxnId(txnId).orElse(null);
}

@Override
public Mono<ConsentStateDTO> consentStateByTxnId(String txnId){
return Mono.fromCallable(() -> consentStateRepository.findByTxnId(txnId).orElse(null))
.subscribeOn(dbBlockingCallSchedulerConfig.getScheduler());
}

@Override
public ConsentStateDTO updateConsentState(ConsentStateDTO consentStateDTO) {
return consentStateRepository.save(consentStateDTO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.finarkein.fiul.consent.model.ConsentRequestDTO;
import io.finarkein.fiul.consent.model.ConsentStateDTO;
import io.finarkein.fiul.consent.model.SignedConsentDTO;
import reactor.core.publisher.Mono;

import java.util.Optional;

Expand All @@ -23,7 +24,7 @@ public interface ConsentStore {

Optional<SignedConsentDTO> findSignedConsent(String consentId);

Optional<ConsentRequestDTO> findRequestByConsentHandle(String consentHandle);
Mono<Optional<ConsentRequestDTO>> findRequestByConsentHandle(String consentHandle);

Optional<ConsentRequestDTO> findRequestByConsentId(String consentId);

Expand All @@ -35,10 +36,14 @@ public interface ConsentStore {

Optional<ConsentStateDTO> getConsentStateByHandle(String consentHandle);

Mono<Optional<ConsentStateDTO>> consentStateByHandle(String consentHandle);

ConsentStateDTO getConsentStateById(String consentId);

ConsentStateDTO getConsentStateByTxnId(String txnId);

Mono<ConsentStateDTO> consentStateByTxnId(String txnId);

ConsentStateDTO updateConsentState(ConsentStateDTO consentStateDTO);

void saveSignedConsent(SignedConsentDTO signedConsentDTO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ void getConsentRequestDTOTest() {
when(consentRequestDTORepository.findById("consentHandle")).thenReturn(Optional.of(consentRequestDTO));
when(consentRequestDTORepository.findByConsentId("consentId")).thenReturn(Optional.of(consentRequestDTO));

ConsentRequestDTO returnedConsentRequestDTO = consentStoreImpl.findRequestByConsentHandle("consentHandle").orElse(null);
/*ConsentRequestDTO returnedConsentRequestDTO = consentStoreImpl
.findRequestByConsentHandle("consentHandle")
.orElse(null);
Assertions.assertEquals(returnedConsentRequestDTO, consentRequestDTO);

returnedConsentRequestDTO = consentStoreImpl.findRequestByConsentId("consentId").orElse(null);
Assertions.assertEquals(returnedConsentRequestDTO, consentRequestDTO);
Assertions.assertEquals(returnedConsentRequestDTO, consentRequestDTO);*/
}

@Test
Expand Down
Loading