Skip to content

Commit

Permalink
Merge pull request #1 from ContainerTag/HB-2599-Replacing-redis-with-…
Browse files Browse the repository at this point in the history
…aerospike

Hb 2599 replacing redis with aerospike
  • Loading branch information
Yevhen Terentiev authored and GitHub Enterprise committed Jun 14, 2018
2 parents da008c8 + b82e088 commit e61c44f
Show file tree
Hide file tree
Showing 17 changed files with 642 additions and 240 deletions.
414 changes: 213 additions & 201 deletions pom.xml

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions src/main/java/org/prebid/cache/PBCacheApplication.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package org.prebid.cache;

import org.prebid.cache.config.CorsConfig;
import io.netty.util.internal.SystemPropertyUtil;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/prebid/cache/handlers/CacheHandler.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.prebid.cache.handlers;

import com.aerospike.client.AerospikeException;
import com.codahale.metrics.Timer;
import org.prebid.cache.exceptions.*;
import lombok.extern.slf4j.Slf4j;
import org.prebid.cache.exceptions.PrebidException;
import org.prebid.cache.exceptions.RepositoryException;
Expand Down Expand Up @@ -48,6 +48,8 @@ <T>Mono<T> validateErrorResult(final Mono<T> mono) {
return Mono.error(new RequestParsingException(t.toString()));
} else if (t instanceof org.springframework.web.server.UnsupportedMediaTypeStatusException) {
return Mono.error(new UnsupportedMediaTypeException(t.toString()));
} else if (t instanceof AerospikeException) {
return Mono.error(new AerospikeException(t.toString()));
} else {
return Mono.error(t);
}
Expand Down
36 changes: 18 additions & 18 deletions src/main/java/org/prebid/cache/handlers/PostCacheHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class PostCacheHandler extends CacheHandler {
private final CacheConfig config;
private final Supplier<Date> currentDateProvider;
private final Function<PayloadWrapper, Map<String, String>> payloadWrapperToMapTransformer = payload ->
ImmutableMap.of(UUID_KEY, payload.getId());
ImmutableMap.of(UUID_KEY, payload.getId());

@Autowired
public PostCacheHandler(final ReactiveRepository<PayloadWrapper, String> repository,
Expand All @@ -61,20 +61,20 @@ public PostCacheHandler(final ReactiveRepository<PayloadWrapper, String> reposit
public Mono<ServerResponse> save(final ServerRequest request) {
metricsRecorder.markMeterForClass(this.getClass(), MetricsRecorder.MeasurementTag.REQUEST_RATE);
val timerContext = metricsRecorder.createRequestContextTimerOptionalForServiceType(type)
.orElse(null);
.orElse(null);
val bodyMono = request.bodyToMono(RequestObject.class);
val monoList = bodyMono.map(RequestObject::getPuts);
val flux = monoList.flatMapMany(Flux::fromIterable);
val payloadFlux = flux.map(payload -> payload.toBuilder()
.prefix(config.getPrefix())
.expiry(payload.getExpiry() == null ? config.getExpirySec() : payload.getExpiry())
.build())
.map(payloadWrapperTransformer(currentDateProvider))
.handle(this::validateUUID)
.handle(this::validateExpiry)
.flatMap(repository::save)
.timeout(Duration.ofMillis(config.getTimeoutMs()))
.subscribeOn(Schedulers.parallel());
.prefix(config.getPrefix())
.expiry(payload.getExpiry() == null ? config.getExpirySec() : payload.getExpiry())
.build())
.map(payloadWrapperTransformer(currentDateProvider))
.handle(this::validateUUID)
.handle(this::validateExpiry)
.concatMap(repository::save)
.timeout(Duration.ofMillis(config.getTimeoutMs()))
.subscribeOn(Schedulers.parallel());

final Mono<ServerResponse> responseMono = payloadFlux
.map(payloadWrapperToMapTransformer)
Expand All @@ -94,13 +94,13 @@ public Mono<ServerResponse> save(final ServerRequest request) {

private Function<PayloadTransfer, PayloadWrapper> payloadWrapperTransformer(Supplier<Date> currentDateProvider) {
return transfer ->
new PayloadWrapper(
RandomUUID.extractUUID(transfer),
transfer.getPrefix(),
new Payload(transfer.getType(), transfer.getKey(), transfer.getValue()),
transfer.getExpiry(),
currentDateProvider.get()
);
new PayloadWrapper(
RandomUUID.extractUUID(transfer),
transfer.getPrefix(),
new Payload(transfer.getType(), transfer.getKey(), transfer.getValue()),
transfer.getExpiry(),
currentDateProvider.get()
);
}

private void validateUUID(final PayloadWrapper payload, final SynchronousSink<PayloadWrapper> sink) {
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/org/prebid/cache/listners/AerospikeReadListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.prebid.cache.listners;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.listener.RecordListener;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.MonoSink;

import java.util.Objects;

@Slf4j
public class AerospikeReadListener implements RecordListener {
private final static String NAME = "cache";
private final MonoSink<String> sink;
private final String recordKeyId;

public AerospikeReadListener(MonoSink<String> sink, String recordKeyId) {
this.sink = sink;
this.recordKeyId = recordKeyId;
}

@Override
public void onSuccess(Key key, Record record) {
if (Objects.nonNull(record)) {
sink.success(record.getString(NAME));
} else {
sink.success();
}
}

@Override
public void onFailure(AerospikeException exception) {
log.error("Error when reading record with keyId {}", recordKeyId);
sink.error(exception);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.prebid.cache.listners;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.listener.WriteListener;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.MonoSink;

@Slf4j
public class AerospikeWriteListener implements WriteListener {
private final MonoSink<String> sink;
private final String keyId;

public AerospikeWriteListener(MonoSink<String> sink, String keyId) {
this.sink = sink;
this.keyId = keyId;
}

@Override
public void onSuccess(Key key) {
sink.success(key.userKey.toString());
}

@Override
public void onFailure(AerospikeException exception) {
log.error("Error when writing record with keyID : {}", keyId);
sink.error(exception);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.prebid.cache.repository.aerospike;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Host;
import com.aerospike.client.async.AsyncClientPolicy;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.async.EventPolicy;
import com.aerospike.client.async.NettyEventLoops;
import com.aerospike.client.policy.Policy;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.validation.constraints.NotNull;
import java.util.Arrays;

import static java.util.Objects.requireNonNull;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Configuration
@EnableConfigurationProperties
@ConfigurationProperties(prefix = "spring.aerospike")
public class AerospikePropertyConfiguration {
private String host;
private Integer port;
private String password;
private Integer cores;
private Long firstBackoff;
private Long maxBackoff;
private int maxRetry;
private String namespace;

private static final int DEFAULT_PORT = 3000;

public static Host[] extractHosts(@NotNull String hostList) {
return Arrays.stream(hostList.split(","))
.map(host -> {
String[] params = host.split(":");
String hostname = requireNonNull(params[0]);
int port = DEFAULT_PORT;
if (params.length == 2) {
port = Integer.parseInt(params[1]);
}
return new Host(hostname, port);
})
.toArray(Host[]::new);
}

public static boolean isAerospikeCluster(@NotNull String hostList) {
if (hostList.split(",").length > 1) {
return true;
}
return false;
}

@Bean
Policy readPolicy() {
return new Policy();
}

@Bean
EventPolicy eventPolicy() {
return new EventPolicy();
}

@Bean
EventLoopGroup eventGroup() {
return new NioEventLoopGroup(cores);
}

@Bean
EventLoops eventLoops() {
return new NettyEventLoops(eventPolicy(), eventGroup());
}

@Bean
AsyncClientPolicy clientPolicy() {
AsyncClientPolicy clientPolicy = new AsyncClientPolicy();
clientPolicy.eventLoops = eventLoops();
return clientPolicy;
}

@Bean(destroyMethod = "close")
AerospikeClient client() {
if (isAerospikeCluster(getHost())) {
return new AerospikeClient(clientPolicy(), extractHosts(getHost()));
} else {
return new AerospikeClient(clientPolicy(), getHost(), getPort());
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package org.prebid.cache.repository.aerospike;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.ResultCode;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.WritePolicy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.prebid.cache.exceptions.PayloadWrapperPropertyException;
import org.prebid.cache.helpers.Json;
import org.prebid.cache.listners.AerospikeReadListener;
import org.prebid.cache.listners.AerospikeWriteListener;
import org.prebid.cache.model.PayloadWrapper;
import org.prebid.cache.repository.ReactiveRepository;
import reactor.core.publisher.Mono;
import reactor.retry.Retry;

import javax.validation.constraints.NotNull;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

@Slf4j
@RequiredArgsConstructor
public class AerospikeRepositoryImpl implements ReactiveRepository<PayloadWrapper, String> {
@NotNull
private final AerospikePropertyConfiguration configuration;
@NotNull
private final AerospikeClient client;
@NotNull
private final EventLoops eventLoops;
@NotNull
private final Policy policy;
private WritePolicy writePolicy;

private final static String BIN_NAME="cache";

@Override
public Mono save(final PayloadWrapper wrapper) {
long expiry;
String normalizedId;
WritePolicy policy = writePolicy();

try {
expiry = wrapper.getExpiry();
normalizedId = wrapper.getNormalizedId();
policy.expiration = (int) expiry;
} catch (PayloadWrapperPropertyException e) {
e.printStackTrace();
return Mono.empty();
}

return Mono.<String>create(sink -> client.put(eventLoops.next(),
new AerospikeWriteListener(sink, normalizedId), policy,
new Key(configuration.getNamespace(), "", normalizedId),
new Bin(BIN_NAME, Json.toJson(wrapper)))).map(payload -> wrapper)
.retryWhen(getRetryPolicy());
}

@Override
public Mono<PayloadWrapper> findById(String id) {
return Mono.<String>create(sink -> client.get(eventLoops.next(),
new AerospikeReadListener(sink, id),
policy, new Key(configuration.getNamespace(), "", id)))
.map(json -> Json.createPayloadFromJson(json, PayloadWrapper.class))
.retryWhen(getRetryPolicy());
}

private WritePolicy writePolicy() {
if (Objects.isNull(writePolicy)) {
writePolicy = new WritePolicy();
}
return writePolicy;
}

private List<Integer> getRetryCodes() {
return Arrays.asList(ResultCode.GENERATION_ERROR, ResultCode.KEY_EXISTS_ERROR, ResultCode.KEY_NOT_FOUND_ERROR);
}

private Retry<Object> getRetryPolicy() {
Duration firstBackoff = Duration.ofMillis(configuration.getFirstBackoff());
Duration maxBackoff = Duration.ofMillis(configuration.getMaxBackoff());

return Retry.onlyIf(context -> context.exception() instanceof AerospikeException
&& getRetryCodes().contains(((AerospikeException) context.exception()).getResultCode())
).doOnRetry(context -> log.warn("Retrying context {}", context))
.retryMax(configuration.getMaxRetry())
.exponentialBackoffWithJitter(firstBackoff, maxBackoff);
}
}
17 changes: 12 additions & 5 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ management.health.defaults.enabled: false
management.endpoints.enabled-by-default: false
management.endpoints.web.base-path: /
management.health.diskspace.enabled: true
management.health.redis.enabled: true
management.health.redis.enabled: false
management.endpoints.web.exposure.include: info, health, metrics, env, configprops
management.endpoint.info.enabled: true
management.endpoint.health.enabled: true
Expand Down Expand Up @@ -70,11 +70,18 @@ metrics:

---
spring.profiles: local
cache.expiry_sec: 28800
cache.expiry_sec: 300
metrics.graphite.enabled: false
spring.redis.host: localhost
spring.redis.port: 6379
spring.redis.password:
spring:
aerospike:
port: 3000
host: localhost
cores: 4
password:
first_backoff: 300
max_backoff: 1000
max_retry: 3
namespace: "prebid_cache"
#spring.redis.sentinel.master: mymaster
#spring.redis.sentinel.nodes: localhost:26379

Expand Down
6 changes: 3 additions & 3 deletions src/main/resources/repository.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

---
cache.profiles.active: redis
cache.redis.classname.canonical: org.prebid.cache.repository.redis.RedisRepositoryImpl
cache.redis.property.configuration.classname: RedisPropertyConfiguration
cache.profiles.active: aerospike
cache.aerospike.classname.canonical: org.prebid.cache.repository.aerospike.AerospikeRepositoryImpl
cache.aerospike.property.configuration.classname: AerospikePropertyConfiguration
Loading

0 comments on commit e61c44f

Please sign in to comment.