Skip to content

Commit

Permalink
Core: Add Apache Ignite Repo (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
AntoxaAntoxic authored Aug 7, 2024
1 parent e79cd06 commit f9313e7
Show file tree
Hide file tree
Showing 15 changed files with 725 additions and 17 deletions.
85 changes: 79 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ This section describes how to download, install and run the application.
git clone https://github.com/prebid/prebid-cache-java.git
```

(2). Start Redis or Aerospike:
If you have installed [Redis](https://redis.io/docs/install/install-redis/) or [Aerospike](https://aerospike.com/docs/server/operations/install) locally, you may start them both (or separately, depends on your needs) via bash:
(2). Start Repo Locally:
If you have installed [Redis](https://redis.io/docs/install/install-redis/) or [Aerospike](https://aerospike.com/docs/server/operations/install) or [Apache Ignite](https://ignite.apache.org/docs/latest/installation/deb-rpm) locally, you may start them both (or separately, depends on your needs) via bash:
```bash
sudo systemctl start redis
sudo systemctl start aerospike
sudo systemctl start apache-ignite
```
Alternatively, you may start DB as Docker image.
You should install [Docker Engine](https://docs.docker.com/engine/install/) if you don't have one.
Expand Down Expand Up @@ -67,7 +68,29 @@ docker run -d --name aerospike -e "NAMESPACE=<namespace>" -p <host_port>:<contai
docker run -d --name aerospike -e "NAMESPACE=prebid_cache" -p 3000:3000 aerospike:ce-6.4.0.2_1
```

(2.3) Make sure that the Aerospike and/or Redis is up and running
(2.3) Apache Ignite via Docker
1. Pull [Apache Ignite docker image](https://ignite.apache.org/docs/latest/installation/installing-using-docker) of an appropriate version
```bash
docker pull apacheignite/ignite:<version>
```
2. Run Apache Ignite container
- the `<version>` should correspond to the pulled image version
- the `OPTION_LIBS=ignite-rest-http` corresponds to the library that enables REST API access to the server (i.e. to create a cache inside Apache Ignite)
- the `-p 10800:10800` exposes the default port `10800` that will be a connection port for the Prebid Cache
- the host will be the `localhost`
```bash
docker run -d \
--name apache-ignite
-e "OPTION_LIBS=ignite-rest-http" \
-p 8080:8080 -p 10800:10800 -p 11211:11211 -p 47100:47100 -p 47500:47500 \
apacheignite/ignite:<version>
```
3. Create a cache via Rest Api
```bash
GET http://localhost:10800/ignite?cmd=getorcreate&cacheName=<cacheName>
```

(2.4) Make sure that the Aerospike, Redis and/or Apache Ignite is up and running
```bash
docker ps
```
Expand Down Expand Up @@ -112,8 +135,37 @@ _VM Options:_
java -jar prebid-cache.jar -Dspring.profiles.active=prod -Dlog.dir=/app/prebid-cache-java/log/
```

_Note_
The `Apache Ignite` requires additional VM parameters to be added to support Java 17+
```bash
--add-opens=java.base/jdk.internal.access=ALL-UNNAMED
--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
--add-opens=java.management/com.sun.jmx.mbeanserver=ALL-UNNAMED
--add-opens=jdk.internal.jvmstat/sun.jvmstat.monitor=ALL-UNNAMED
--add-opens=java.base/sun.reflect.generics.reflectiveObjects=ALL-UNNAMED
--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.math=ALL-UNNAMED
--add-opens=java.sql/java.sql=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.time=ALL-UNNAMED
--add-opens=java.base/java.text=ALL-UNNAMED
--add-opens=java.management/sun.management=ALL-UNNAMED
--add-opens java.desktop/java.awt.font=ALL-UNNAMED
```

### _Cache Configuration_
Prebid cache uses Aerospike as a default cache implementation but also supports Redis. For switching from Aerospike
Prebid cache uses Aerospike as a default cache implementation but also supports Redis and Apache Ignite. For switching from Aerospike
to Redis replace next:

_application.yml:_
Expand All @@ -125,7 +177,13 @@ with
```yaml
spring.redis.host: value
```
```
or
```yaml
spring.ignite.host: value
```
For configuring single redis node, please use next properties:
Expand All @@ -137,11 +195,20 @@ For configuring single redis node, please use next properties:
port: value
```
or
```yaml
spring:
ignite:
host: host
port: port
cache-name: cacheName
```
It is possible to override the default YAML configuration by supplying a custom configuration. See example scenario(s) below.
###### Cluster config for Redis and Aerospike
###### Cluster config for Redis, Aerospike and Apache Ignite
Redis cluster settings
_application-default.yml:_
Expand All @@ -161,6 +228,12 @@ _application-default.yml:_
spring.aerospike.host: aerospike_host_1:port,aerospike_host_2:port,aerospike_host_3:port
```
Apache Ignite cluster settings
_application-default.yml:_
```yaml
spring.ignite.host: ignite_host_1:port,ignite_host_2:port,ignite_host_3:port
```
### _Optional: Bring Your Own (BYO) Cache Implementation_
Prebid Cache can support any cache implementation, although Redis is provided as default. Spring injects the cache repository bean instances during context initialization, or application startup. This section describes how to setup a custom cache repository.
Expand Down
4 changes: 2 additions & 2 deletions docs/differenceBetweenPBCGo-and-Java.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,5 @@ and not the other for an interim period. This page tracks known differences that
- PBC-Go for `JSON` value uses an array with JSON content and additional parameters;

2) PBC-Java has different implementations of cache storage with PBC-Go:
- PBC-Java has ability to store data in `Aerospike` and `Redis`;
- PBC-Go has ability to store data in `Cassandra` , `Memcache`, `Aerospike`;
- PBC-Java has ability to store data in `Aerospike`, `Redis`, `Apache Ignite`;
- PBC-Go has ability to store data in `Cassandra` , `Memcache`, `Aerospike`, `Apache Ignite`;
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
<javax.el-api.version>3.0.0</javax.el-api.version>
<reflections.version>0.10.2</reflections.version>
<aerospike-client.version>6.2.0</aerospike-client.version>
<ignite-client.version>2.16.0</ignite-client.version>
<reactor-extra.version>3.5.1</reactor-extra.version>
<!-- Wiremock 3.4.0+ has incompatibility with the mockserver client (do we really need both?) -->
<wiremock.version>3.3.1</wiremock.version>
Expand Down Expand Up @@ -215,6 +216,11 @@
<artifactId>aerospike-client</artifactId>
<version>${aerospike-client.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>${ignite-client.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-extra</artifactId>
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/prebid/cache/config/RepositoryConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
import com.aerospike.client.policy.Policy;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.lettuce.core.api.reactive.RedisStringReactiveCommands;
import org.apache.ignite.client.ClientCache;
import org.prebid.cache.model.PayloadWrapper;
import org.prebid.cache.repository.CacheConfig;
import org.prebid.cache.repository.CircuitBreakerSecuredReactiveRepository;
import org.prebid.cache.repository.ReactiveRepository;
import org.prebid.cache.repository.TimeOutCapableReactiveRepository;
import org.prebid.cache.repository.aerospike.AerospikePropertyConfiguration;
import org.prebid.cache.repository.aerospike.AerospikeRepositoryImpl;
import org.prebid.cache.repository.ignite.IgniteRepositoryImpl;
import org.prebid.cache.repository.redis.RedisRepositoryImpl;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -41,6 +43,12 @@ ReactiveRepository<PayloadWrapper, String> aerospikeRepository(AerospikeProperty
return new AerospikeRepositoryImpl(configuration, client, eventLoops, policy);
}

@Bean
@ConditionalOnProperty(prefix = "spring.ignite", name = {"host"})
ReactiveRepository<PayloadWrapper, String> igniteRepository(ClientCache<String, String> igniteCache) {
return new IgniteRepositoryImpl(igniteCache);
}

@Bean
@Primary
ReactiveRepository<PayloadWrapper, String> circuitBreakerSecuredRepository(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.prebid.cache.repository.ignite;

import jakarta.validation.constraints.NotNull;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.ignite.Ignition;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.client.SslMode;
import org.apache.ignite.configuration.ClientConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;

import static java.util.Objects.requireNonNull;

@Configuration
@EnableConfigurationProperties(IgniteConfigurationProperties.class)
@ConditionalOnProperty(prefix = "spring.ignite", name = {"host"})
public class IgniteConfiguration {

private static final int DEFAULT_PORT = 10800;

@Bean
public ClientConfiguration clientConfiguration(IgniteConfigurationProperties properties) {
ClientConfiguration cfg = new ClientConfiguration();

final String host = properties.getHost();
final int port = properties.getPort() == null ? DEFAULT_PORT : properties.getPort();

if (isCluster(host)) {
cfg.setAddresses(extractHosts(host));
} else {
cfg.setAddresses(host + ":" + port);
}

cfg.setSslMode(BooleanUtils.isTrue(properties.getSecure()) ? SslMode.REQUIRED : SslMode.DISABLED);
return cfg;
}

public static boolean isCluster(@NotNull String hostList) {
return hostList.split(",").length > 1;
}

private static String[] extractHosts(@NotNull String hostList) {
return Arrays.stream(hostList.split(","))
.map(host -> {
String[] params = host.split(":");
String hostname = requireNonNull(params[0]);
int port = DEFAULT_PORT;
if (params.length == 2) {
port = Integer.parseInt(params[1]);
}
return hostname + ":" + port;
})
.toArray(String[]::new);
}

@Bean(destroyMethod = "close")
public IgniteClient igniteClient(ClientConfiguration clientConfiguration) throws ClientConnectionException {
return Ignition.startClient(clientConfiguration);
}

@Bean
public ClientCache<String, String> igniteCache(IgniteClient igniteClient,
IgniteConfigurationProperties properties) {

return igniteClient.cache(properties.getCacheName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.prebid.cache.repository.ignite;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@Data
@ConfigurationProperties(prefix = "spring.ignite")
public class IgniteConfigurationProperties {

private String host;

private Integer port;

private String cacheName;

private Boolean secure;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.prebid.cache.repository.ignite;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientException;
import org.prebid.cache.exceptions.PayloadWrapperPropertyException;
import org.prebid.cache.exceptions.RepositoryException;
import org.prebid.cache.helpers.Json;
import org.prebid.cache.model.PayloadWrapper;
import org.prebid.cache.repository.ReactiveRepository;
import reactor.core.publisher.Mono;

import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import java.util.concurrent.TimeUnit;

@Slf4j
public class IgniteRepositoryImpl implements ReactiveRepository<PayloadWrapper, String> {

private final ClientCache<String, String> cache;

public IgniteRepositoryImpl(ClientCache<String, String> cache) {
this.cache = cache;
}

@Override
public Mono<PayloadWrapper> save(PayloadWrapper wrapper) {
ExpiryPolicy expiryPolicy;
String normalizedId;

try {
normalizedId = wrapper.getNormalizedId();
expiryPolicy = new CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, wrapper.getExpiry()));
} catch (PayloadWrapperPropertyException e) {
log.error("Exception occurred while extracting normalized id from payload: '{}', cause: '{}'",
ExceptionUtils.getMessage(e), ExceptionUtils.getMessage(e));
return Mono.empty();
}

final ClientCache<String, String> expiredCache = cache.withExpirePolicy(expiryPolicy);
return Mono.fromFuture(expiredCache.putIfAbsentAsync(normalizedId, Json.toJson(wrapper)).toCompletableFuture())
.map(payload -> wrapper)
.onErrorResume(IgniteRepositoryImpl::handleError);
}

@Override
public Mono<PayloadWrapper> findById(String id) {
return Mono.fromFuture(cache.getAsync(id).toCompletableFuture())
.map(json -> Json.createPayloadFromJson(json, PayloadWrapper.class))
.onErrorResume(IgniteRepositoryImpl::handleError);
}

private static <T> Mono<T> handleError(Throwable throwable) {
if (throwable instanceof ClientException) {
return Mono.error(new RepositoryException(throwable.toString(), throwable));
}

return Mono.error(throwable);
}
}
Loading

0 comments on commit f9313e7

Please sign in to comment.