diff --git a/pom.xml b/pom.xml
index e380a19..dbaf992 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,47 +1,47 @@
- 4.0.0
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ 4.0.0
- org.prebid.cache
- prebid-cache
- 0.0.1-SNAPSHOT
+ org.prebid.cache
+ prebid-cache
+ 0.0.1-SNAPSHOT
- prebid-cache
- Prebid Cache
-
- Prebid.org
- http://prebid.org/
-
+ prebid-cache
+ Prebid Cache
+
+ Prebid.org
+ http://prebid.org/
+
-
- org.springframework.boot
- spring-boot-starter-parent
- 2.0.2.RELEASE
-
-
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.0.2.RELEASE
+
+
-
+
src/main/resources
- org.prebid.cache.PBCacheApplication
- 5.0.3
+ org.prebid.cache.PBCacheApplication
+ 5.0.3
-
- 0
- manual
- ${maven.build.timestamp}
- manual
- yyyy-MM-dd HH:mm
+
+ 0
+ manual
+ ${maven.build.timestamp}
+ manual
+ yyyy-MM-dd HH:mm
- 1.8
- UTF-8
- UTF-8
- 3.0.1
- 3.0.0
- 2.20.1
- 3.7.0
- 0.7.9
- 2.0.2.RELEASE
+ 1.8
+ UTF-8
+ UTF-8
+ 3.0.1
+ 3.0.0
+ 2.20.1
+ 3.7.0
+ 0.7.9
+ 2.0.2.RELEASE
3.1.7.RELEASE
5.0.4.RELEASE
2.8.2
@@ -49,136 +49,148 @@
3.4.2
1.16.18
4.0.2
- 3.0.0
+ 3.0.0
0.1.4
0.9.11
2.4.3
-
+ 4.1.6
+ 3.1.6.RELEASE
+
-
-
- org.springframework.boot
- spring-boot-starter-webflux
-
-
- org.springframework.boot
- spring-boot-starter-tomcat
-
+
+
+ org.springframework.boot
+ spring-boot-starter-webflux
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+
org.springframework.boot
spring-boot-starter-logging
-
-
+
+
org.springframework.boot
spring-boot-configuration-processor
true
-
- org.springframework.boot
- spring-boot-starter-log4j2
-
-
- org.springframework.boot
- spring-boot-starter-actuator
-
-
- org.springframework.boot
- spring-boot-starter-data-redis
-
-
- io.projectreactor
- reactor-core
+
+ org.springframework.boot
+ spring-boot-starter-log4j2
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter-data-redis
+
+
+ io.projectreactor
+ reactor-core
${reactor-core.version}
-
-
- io.lettuce
- lettuce-core
- ${lettuce-core.version}
-
+
+
+ io.lettuce
+ lettuce-core
+ ${lettuce-core.version}
+
com.google.code.gson
gson
${gson.version}
-
- com.google.guava
- guava
- ${guava.version}
-
+
+ com.google.guava
+ guava
+ ${guava.version}
+
com.lmax
disruptor
${disruptor.version}
-
- org.projectlombok
- lombok
- ${lombok.version}
- provided
-
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+ provided
+
io.dropwizard.metrics
metrics-graphite
${metrics-graphite.version}
-
- javax.el
- javax.el-api
- ${javax.el-api.version}
-
+
+ javax.el
+ javax.el-api
+ ${javax.el-api.version}
+
org.springframework.boot
spring-boot-starter-test
test
-
- org.junit.jupiter
- junit-jupiter-engine
- ${junit-jupiter.version}
- test
-
-
- org.junit.jupiter
- junit-jupiter-params
- ${junit-jupiter.version}
- test
-
-
- io.projectreactor
- reactor-test
- test
-
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ ${junit-jupiter.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ ${junit-jupiter.version}
+ test
+
+
+ io.projectreactor
+ reactor-test
+ test
+
com.github.cjnygard
rest-maven-plugin
${rest-maven-plugin.version}
-
- org.reflections
- reflections
- ${reflections.version}
- test
-
-
- nl.jqno.equalsverifier
- equalsverifier
- ${equals-verifier.version}
- test
-
-
+
+ org.reflections
+ reflections
+ ${reflections.version}
+ test
+
+
+ nl.jqno.equalsverifier
+ equalsverifier
+ ${equals-verifier.version}
+ test
+
+
+ com.aerospike
+ aerospike-client
+ ${aerospike-client.version}
+
+
+ io.projectreactor.addons
+ reactor-extra
+ ${reactor-extra.version}
+
+
-
- ${project.name}
-
-
- src/main/resources
- true
-
-
-
+
+ ${project.name}
+
+
+ src/main/resources
+ true
+
+
+
maven-clean-plugin
${maven-clean-plugin.version}
@@ -217,36 +229,36 @@
-
- true
- src/scripts/custom_launch.script
-
+
+ true
+ src/scripts/custom_launch.script
+
+
+
+ org.codehaus.gmavenplus
+ gmavenplus-plugin
+ 1.6
+
+
+
+ execute
+
+
+
+
+
+
+
+
+
+
+ org.codehaus.groovy
+ groovy-all
+ 2.4.12
+ runtime
+
+
-
- org.codehaus.gmavenplus
- gmavenplus-plugin
- 1.6
-
-
-
- execute
-
-
-
-
-
-
-
-
-
-
- org.codehaus.groovy
- groovy-all
- 2.4.12
- runtime
-
-
-
com.github.cjnygard
rest-maven-plugin
@@ -280,55 +292,55 @@
-
-
+
+
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/snapshot
-
- true
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/milestone
-
- false
-
-
-
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/snapshot
+
+ true
+
+
+
+ spring-milestones
+ Spring Milestones
+ https://repo.spring.io/milestone
+
+ false
+
+
+
-
-
- spring-snapshots
- Spring Snapshots
- https://repo.spring.io/snapshot
-
- true
-
-
-
- spring-milestones
- Spring Milestones
- https://repo.spring.io/milestone
-
- false
-
-
-
- mvnrepository
- http://repo1.maven.org/maven2
-
- true
-
-
- true
-
-
-
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/snapshot
+
+ true
+
+
+
+ spring-milestones
+ Spring Milestones
+ https://repo.spring.io/milestone
+
+ false
+
+
+
+ mvnrepository
+ http://repo1.maven.org/maven2
+
+ true
+
+
+ true
+
+
+
diff --git a/src/main/java/org/prebid/cache/PBCacheApplication.java b/src/main/java/org/prebid/cache/PBCacheApplication.java
index f8aadb8..a625266 100644
--- a/src/main/java/org/prebid/cache/PBCacheApplication.java
+++ b/src/main/java/org/prebid/cache/PBCacheApplication.java
@@ -1,9 +1,7 @@
package org.prebid.cache;
import org.prebid.cache.config.CorsConfig;
-import io.netty.util.internal.SystemPropertyUtil;
import lombok.extern.slf4j.Slf4j;
-import lombok.val;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
diff --git a/src/main/java/org/prebid/cache/handlers/CacheHandler.java b/src/main/java/org/prebid/cache/handlers/CacheHandler.java
index a73b319..1c5c691 100644
--- a/src/main/java/org/prebid/cache/handlers/CacheHandler.java
+++ b/src/main/java/org/prebid/cache/handlers/CacheHandler.java
@@ -1,7 +1,7 @@
package org.prebid.cache.handlers;
+import com.aerospike.client.AerospikeException;
import com.codahale.metrics.Timer;
-import org.prebid.cache.exceptions.*;
import lombok.extern.slf4j.Slf4j;
import org.prebid.cache.exceptions.PrebidException;
import org.prebid.cache.exceptions.RepositoryException;
@@ -48,6 +48,8 @@ Mono validateErrorResult(final Mono mono) {
return Mono.error(new RequestParsingException(t.toString()));
} else if (t instanceof org.springframework.web.server.UnsupportedMediaTypeStatusException) {
return Mono.error(new UnsupportedMediaTypeException(t.toString()));
+ } else if (t instanceof AerospikeException) {
+ return Mono.error(new AerospikeException(t.toString()));
} else {
return Mono.error(t);
}
diff --git a/src/main/java/org/prebid/cache/handlers/PostCacheHandler.java b/src/main/java/org/prebid/cache/handlers/PostCacheHandler.java
index c222051..78bcc1f 100644
--- a/src/main/java/org/prebid/cache/handlers/PostCacheHandler.java
+++ b/src/main/java/org/prebid/cache/handlers/PostCacheHandler.java
@@ -42,7 +42,7 @@ public class PostCacheHandler extends CacheHandler {
private final CacheConfig config;
private final Supplier currentDateProvider;
private final Function> payloadWrapperToMapTransformer = payload ->
- ImmutableMap.of(UUID_KEY, payload.getId());
+ ImmutableMap.of(UUID_KEY, payload.getId());
@Autowired
public PostCacheHandler(final ReactiveRepository repository,
@@ -61,20 +61,20 @@ public PostCacheHandler(final ReactiveRepository reposit
public Mono save(final ServerRequest request) {
metricsRecorder.markMeterForClass(this.getClass(), MetricsRecorder.MeasurementTag.REQUEST_RATE);
val timerContext = metricsRecorder.createRequestContextTimerOptionalForServiceType(type)
- .orElse(null);
+ .orElse(null);
val bodyMono = request.bodyToMono(RequestObject.class);
val monoList = bodyMono.map(RequestObject::getPuts);
val flux = monoList.flatMapMany(Flux::fromIterable);
val payloadFlux = flux.map(payload -> payload.toBuilder()
- .prefix(config.getPrefix())
- .expiry(payload.getExpiry() == null ? config.getExpirySec() : payload.getExpiry())
- .build())
- .map(payloadWrapperTransformer(currentDateProvider))
- .handle(this::validateUUID)
- .handle(this::validateExpiry)
- .flatMap(repository::save)
- .timeout(Duration.ofMillis(config.getTimeoutMs()))
- .subscribeOn(Schedulers.parallel());
+ .prefix(config.getPrefix())
+ .expiry(payload.getExpiry() == null ? config.getExpirySec() : payload.getExpiry())
+ .build())
+ .map(payloadWrapperTransformer(currentDateProvider))
+ .handle(this::validateUUID)
+ .handle(this::validateExpiry)
+ .concatMap(repository::save)
+ .timeout(Duration.ofMillis(config.getTimeoutMs()))
+ .subscribeOn(Schedulers.parallel());
final Mono responseMono = payloadFlux
.map(payloadWrapperToMapTransformer)
@@ -94,13 +94,13 @@ public Mono save(final ServerRequest request) {
private Function payloadWrapperTransformer(Supplier currentDateProvider) {
return transfer ->
- new PayloadWrapper(
- RandomUUID.extractUUID(transfer),
- transfer.getPrefix(),
- new Payload(transfer.getType(), transfer.getKey(), transfer.getValue()),
- transfer.getExpiry(),
- currentDateProvider.get()
- );
+ new PayloadWrapper(
+ RandomUUID.extractUUID(transfer),
+ transfer.getPrefix(),
+ new Payload(transfer.getType(), transfer.getKey(), transfer.getValue()),
+ transfer.getExpiry(),
+ currentDateProvider.get()
+ );
}
private void validateUUID(final PayloadWrapper payload, final SynchronousSink sink) {
diff --git a/src/main/java/org/prebid/cache/listners/AerospikeReadListener.java b/src/main/java/org/prebid/cache/listners/AerospikeReadListener.java
new file mode 100644
index 0000000..cd85783
--- /dev/null
+++ b/src/main/java/org/prebid/cache/listners/AerospikeReadListener.java
@@ -0,0 +1,37 @@
+package org.prebid.cache.listners;
+
+import com.aerospike.client.AerospikeException;
+import com.aerospike.client.Key;
+import com.aerospike.client.Record;
+import com.aerospike.client.listener.RecordListener;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.MonoSink;
+
+import java.util.Objects;
+
+@Slf4j
+public class AerospikeReadListener implements RecordListener {
+ private final static String NAME = "cache";
+ private final MonoSink sink;
+ private final String recordKeyId;
+
+ public AerospikeReadListener(MonoSink sink, String recordKeyId) {
+ this.sink = sink;
+ this.recordKeyId = recordKeyId;
+ }
+
+ @Override
+ public void onSuccess(Key key, Record record) {
+ if (Objects.nonNull(record)) {
+ sink.success(record.getString(NAME));
+ } else {
+ sink.success();
+ }
+ }
+
+ @Override
+ public void onFailure(AerospikeException exception) {
+ log.error("Error when reading record with keyId {}", recordKeyId);
+ sink.error(exception);
+ }
+}
diff --git a/src/main/java/org/prebid/cache/listners/AerospikeWriteListener.java b/src/main/java/org/prebid/cache/listners/AerospikeWriteListener.java
new file mode 100644
index 0000000..45b84c4
--- /dev/null
+++ b/src/main/java/org/prebid/cache/listners/AerospikeWriteListener.java
@@ -0,0 +1,29 @@
+package org.prebid.cache.listners;
+
+import com.aerospike.client.AerospikeException;
+import com.aerospike.client.Key;
+import com.aerospike.client.listener.WriteListener;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.MonoSink;
+
+@Slf4j
+public class AerospikeWriteListener implements WriteListener {
+ private final MonoSink sink;
+ private final String keyId;
+
+ public AerospikeWriteListener(MonoSink sink, String keyId) {
+ this.sink = sink;
+ this.keyId = keyId;
+ }
+
+ @Override
+ public void onSuccess(Key key) {
+ sink.success(key.userKey.toString());
+ }
+
+ @Override
+ public void onFailure(AerospikeException exception) {
+ log.error("Error when writing record with keyID : {}", keyId);
+ sink.error(exception);
+ }
+}
diff --git a/src/main/java/org/prebid/cache/repository/aerospike/AerospikePropertyConfiguration.java b/src/main/java/org/prebid/cache/repository/aerospike/AerospikePropertyConfiguration.java
new file mode 100644
index 0000000..1c25810
--- /dev/null
+++ b/src/main/java/org/prebid/cache/repository/aerospike/AerospikePropertyConfiguration.java
@@ -0,0 +1,100 @@
+package org.prebid.cache.repository.aerospike;
+
+import com.aerospike.client.AerospikeClient;
+import com.aerospike.client.Host;
+import com.aerospike.client.async.AsyncClientPolicy;
+import com.aerospike.client.async.EventLoops;
+import com.aerospike.client.async.EventPolicy;
+import com.aerospike.client.async.NettyEventLoops;
+import com.aerospike.client.policy.Policy;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.validation.constraints.NotNull;
+import java.util.Arrays;
+
+import static java.util.Objects.requireNonNull;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Configuration
+@EnableConfigurationProperties
+@ConfigurationProperties(prefix = "spring.aerospike")
+public class AerospikePropertyConfiguration {
+ private String host;
+ private Integer port;
+ private String password;
+ private Integer cores;
+ private Long firstBackoff;
+ private Long maxBackoff;
+ private int maxRetry;
+ private String namespace;
+
+ private static final int DEFAULT_PORT = 3000;
+
+ public static Host[] extractHosts(@NotNull String hostList) {
+ return Arrays.stream(hostList.split(","))
+ .map(host -> {
+ String[] params = host.split(":");
+ String hostname = requireNonNull(params[0]);
+ int port = DEFAULT_PORT;
+ if (params.length == 2) {
+ port = Integer.parseInt(params[1]);
+ }
+ return new Host(hostname, port);
+ })
+ .toArray(Host[]::new);
+ }
+
+ public static boolean isAerospikeCluster(@NotNull String hostList) {
+ if (hostList.split(",").length > 1) {
+ return true;
+ }
+ return false;
+ }
+
+ @Bean
+ Policy readPolicy() {
+ return new Policy();
+ }
+
+ @Bean
+ EventPolicy eventPolicy() {
+ return new EventPolicy();
+ }
+
+ @Bean
+ EventLoopGroup eventGroup() {
+ return new NioEventLoopGroup(cores);
+ }
+
+ @Bean
+ EventLoops eventLoops() {
+ return new NettyEventLoops(eventPolicy(), eventGroup());
+ }
+
+ @Bean
+ AsyncClientPolicy clientPolicy() {
+ AsyncClientPolicy clientPolicy = new AsyncClientPolicy();
+ clientPolicy.eventLoops = eventLoops();
+ return clientPolicy;
+ }
+
+ @Bean(destroyMethod = "close")
+ AerospikeClient client() {
+ if (isAerospikeCluster(getHost())) {
+ return new AerospikeClient(clientPolicy(), extractHosts(getHost()));
+ } else {
+ return new AerospikeClient(clientPolicy(), getHost(), getPort());
+ }
+ }
+
+}
diff --git a/src/main/java/org/prebid/cache/repository/aerospike/AerospikeRepositoryImpl.java b/src/main/java/org/prebid/cache/repository/aerospike/AerospikeRepositoryImpl.java
new file mode 100644
index 0000000..8d60421
--- /dev/null
+++ b/src/main/java/org/prebid/cache/repository/aerospike/AerospikeRepositoryImpl.java
@@ -0,0 +1,95 @@
+package org.prebid.cache.repository.aerospike;
+
+import com.aerospike.client.AerospikeClient;
+import com.aerospike.client.AerospikeException;
+import com.aerospike.client.Bin;
+import com.aerospike.client.Key;
+import com.aerospike.client.ResultCode;
+import com.aerospike.client.async.EventLoops;
+import com.aerospike.client.policy.Policy;
+import com.aerospike.client.policy.WritePolicy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.prebid.cache.exceptions.PayloadWrapperPropertyException;
+import org.prebid.cache.helpers.Json;
+import org.prebid.cache.listners.AerospikeReadListener;
+import org.prebid.cache.listners.AerospikeWriteListener;
+import org.prebid.cache.model.PayloadWrapper;
+import org.prebid.cache.repository.ReactiveRepository;
+import reactor.core.publisher.Mono;
+import reactor.retry.Retry;
+
+import javax.validation.constraints.NotNull;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+@RequiredArgsConstructor
+public class AerospikeRepositoryImpl implements ReactiveRepository {
+ @NotNull
+ private final AerospikePropertyConfiguration configuration;
+ @NotNull
+ private final AerospikeClient client;
+ @NotNull
+ private final EventLoops eventLoops;
+ @NotNull
+ private final Policy policy;
+ private WritePolicy writePolicy;
+
+ private final static String BIN_NAME="cache";
+
+ @Override
+ public Mono save(final PayloadWrapper wrapper) {
+ long expiry;
+ String normalizedId;
+ WritePolicy policy = writePolicy();
+
+ try {
+ expiry = wrapper.getExpiry();
+ normalizedId = wrapper.getNormalizedId();
+ policy.expiration = (int) expiry;
+ } catch (PayloadWrapperPropertyException e) {
+ e.printStackTrace();
+ return Mono.empty();
+ }
+
+ return Mono.create(sink -> client.put(eventLoops.next(),
+ new AerospikeWriteListener(sink, normalizedId), policy,
+ new Key(configuration.getNamespace(), "", normalizedId),
+ new Bin(BIN_NAME, Json.toJson(wrapper)))).map(payload -> wrapper)
+ .retryWhen(getRetryPolicy());
+ }
+
+ @Override
+ public Mono findById(String id) {
+ return Mono.create(sink -> client.get(eventLoops.next(),
+ new AerospikeReadListener(sink, id),
+ policy, new Key(configuration.getNamespace(), "", id)))
+ .map(json -> Json.createPayloadFromJson(json, PayloadWrapper.class))
+ .retryWhen(getRetryPolicy());
+ }
+
+ private WritePolicy writePolicy() {
+ if (Objects.isNull(writePolicy)) {
+ writePolicy = new WritePolicy();
+ }
+ return writePolicy;
+ }
+
+ private List getRetryCodes() {
+ return Arrays.asList(ResultCode.GENERATION_ERROR, ResultCode.KEY_EXISTS_ERROR, ResultCode.KEY_NOT_FOUND_ERROR);
+ }
+
+ private Retry