Skip to content

Commit

Permalink
Adds workaround for bug in ehcache remove()
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <petealft@amazon.com>
  • Loading branch information
Peter Alfonsi committed Mar 17, 2024
1 parent bb9aad1 commit bad40cd
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -93,8 +94,8 @@ public class EhcacheDiskCache<K, V> implements ICache<K, V> {
// A Cache manager can create many caches.
private final PersistentCacheManager cacheManager;

// Disk cache
private Cache<K, byte[]> cache;
// Disk cache. Because of a bug in ehcache, we have to store ByteArrayWrapper rather than byte[].
private Cache<K, ByteArrayWrapper> cache;
private final long maxWeightInBytes;
private final String storagePath;
private final Class<K> keyType;
Expand Down Expand Up @@ -151,27 +152,27 @@ private EhcacheDiskCache(Builder<K, V> builder) {
this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder);
}

private Cache<K, byte[]> buildCache(Duration expireAfterAccess, Builder<K, V> builder) {
private Cache<K, ByteArrayWrapper> buildCache(Duration expireAfterAccess, Builder<K, V> builder) {
try {
return this.cacheManager.createCache(
this.diskCacheAlias,
CacheConfigurationBuilder.newCacheConfigurationBuilder(
this.keyType,
byte[].class,
ByteArrayWrapper.class,
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
).withExpiry(new ExpiryPolicy<>() {
@Override
public Duration getExpiryForCreation(K key, byte[] value) {
public Duration getExpiryForCreation(K key, ByteArrayWrapper value) {
return INFINITE;
}

@Override
public Duration getExpiryForAccess(K key, Supplier<? extends byte[]> value) {
public Duration getExpiryForAccess(K key, Supplier<? extends ByteArrayWrapper> value) {
return expireAfterAccess;
}

@Override
public Duration getExpiryForUpdate(K key, Supplier<? extends byte[]> oldValue, byte[] newValue) {
public Duration getExpiryForUpdate(K key, Supplier<? extends ByteArrayWrapper> oldValue, ByteArrayWrapper newValue) {
return INFINITE;
}
})
Expand All @@ -185,7 +186,8 @@ public Duration getExpiryForUpdate(K key, Supplier<? extends byte[]> oldValue, b
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings)
)
)
.withKeySerializer(new KeySerializerWrapper(keySerializer))
.withKeySerializer(new KeySerializerWrapper<K>(keySerializer))
.withValueSerializer(new ByteArrayWrapperSerializer())
);
} catch (IllegalArgumentException ex) {
logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage());
Expand Down Expand Up @@ -248,7 +250,7 @@ public V get(K key) {
}
V value;
try {
value = valueSerializer.deserialize(cache.get(key));
value = deserializeValue(cache.get(key));
} catch (CacheLoadingException ex) {
throw new OpenSearchException("Exception occurred while trying to fetch item from ehcache disk cache");
}
Expand All @@ -263,7 +265,7 @@ public V get(K key) {
@Override
public void put(K key, V value) {
try {
cache.put(key, valueSerializer.serialize(value));
cache.put(key, serializeValue(value));
} catch (CacheWritingException ex) {
throw new OpenSearchException("Exception occurred while put item to ehcache disk cache");
}
Expand All @@ -281,7 +283,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Except
// Ehache doesn't provide any computeIfAbsent function. Exposes putIfAbsent but that works differently and is
// not performant in case there are multiple concurrent request for same key. Below is our own custom
// implementation of computeIfAbsent on top of ehcache. Inspired by OpenSearch Cache implementation.
V value = valueSerializer.deserialize(cache.get(key));
V value = deserializeValue(cache.get(key));
if (value == null) {
value = compute(key, loader);
}
Expand All @@ -299,7 +301,7 @@ private V compute(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
BiFunction<Tuple<K, V>, Throwable, V> handler = (pair, ex) -> {
V value = null;
if (pair != null) {
cache.put(pair.v1(), valueSerializer.serialize(pair.v2()));
cache.put(pair.v1(), serializeValue(pair.v2()));
value = pair.v2(); // Returning a value itself assuming that a next get should return the same. Should
// be safe to assume if we got no exception and reached here.
}
Expand Down Expand Up @@ -399,9 +401,9 @@ public void close() {
*/
class EhCacheKeyIterator<K> implements Iterator<K> {

Iterator<Cache.Entry<K, byte[]>> iterator;
Iterator<Cache.Entry<K, ByteArrayWrapper>> iterator;

EhCacheKeyIterator(Iterator<Cache.Entry<K, byte[]>> iterator) {
EhCacheKeyIterator(Iterator<Cache.Entry<K, ByteArrayWrapper>> iterator) {
this.iterator = iterator;
}

Expand All @@ -427,7 +429,7 @@ public void remove() {
/**
* Wrapper over Ehcache original listener to listen to desired events and notify desired subscribers.
*/
class EhCacheEventListener implements CacheEventListener<K, byte[]> {
class EhCacheEventListener implements CacheEventListener<K, ByteArrayWrapper> {

private final RemovalListener<K, V> removalListener;

Expand All @@ -436,25 +438,25 @@ class EhCacheEventListener implements CacheEventListener<K, byte[]> {
}

@Override
public void onEvent(CacheEvent<? extends K, ? extends byte[]> event) {
public void onEvent(CacheEvent<? extends K, ? extends ByteArrayWrapper> event) {
switch (event.getType()) {
case CREATED:
entries.inc();
assert event.getOldValue() == null;
break;
case EVICTED:
this.removalListener.onRemoval(new RemovalNotification<>(event.getKey(), valueSerializer.deserialize(event.getOldValue()), RemovalReason.EVICTED));
this.removalListener.onRemoval(new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.EVICTED));
entries.dec();
assert event.getNewValue() == null;
break;
case REMOVED:
entries.dec();
this.removalListener.onRemoval(new RemovalNotification<>(event.getKey(), valueSerializer.deserialize(event.getOldValue()), RemovalReason.EXPLICIT));
this.removalListener.onRemoval(new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.EXPLICIT));
assert event.getNewValue() == null;
break;
case EXPIRED:
this.removalListener.onRemoval(
new RemovalNotification<>(event.getKey(), valueSerializer.deserialize(event.getOldValue()), RemovalReason.INVALIDATED)
new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.INVALIDATED)
);
entries.dec();
assert event.getNewValue() == null;
Expand All @@ -470,9 +472,9 @@ public void onEvent(CacheEvent<? extends K, ? extends byte[]> event) {
/**
* Wrapper over Serializer which is compatible with ehcache's serializer requirements.
*/
private class KeySerializerWrapper implements org.ehcache.spi.serialization.Serializer<K> {
private Serializer<K, byte[]> serializer;
public KeySerializerWrapper(Serializer<K, byte[]> keySerializer) {
private class KeySerializerWrapper<T> implements org.ehcache.spi.serialization.Serializer<T> {
private Serializer<T, byte[]> serializer;
public KeySerializerWrapper(Serializer<T, byte[]> keySerializer) {
this.serializer = keySerializer;
}

Expand All @@ -482,25 +484,79 @@ public KeySerializerWrapper(Serializer<K, byte[]> keySerializer) {
public KeySerializerWrapper(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {}

@Override
public ByteBuffer serialize(K object) throws SerializerException {
public ByteBuffer serialize(T object) throws SerializerException {
return ByteBuffer.wrap(serializer.serialize(object));
}

@Override
public K read(ByteBuffer binary) throws ClassNotFoundException, SerializerException {
public T read(ByteBuffer binary) throws ClassNotFoundException, SerializerException {
byte[] arr = new byte[binary.remaining()];
binary.get(arr);
return serializer.deserialize(arr);
}

@Override
public boolean equals(K object, ByteBuffer binary) throws ClassNotFoundException, SerializerException {
public boolean equals(T object, ByteBuffer binary) throws ClassNotFoundException, SerializerException {
byte[] arr = new byte[binary.remaining()];
binary.get(arr);
return serializer.equals(object, arr);
}
}

/**
* Wrapper allowing Ehcache to serialize ByteArrayWrapper.
*/
private static class ByteArrayWrapperSerializer implements org.ehcache.spi.serialization.Serializer<ByteArrayWrapper> {
public ByteArrayWrapperSerializer() {
}

// This constructor must be present, but does not have to work as we are not actually persisting the disk
// cache after a restart.
// See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches
public ByteArrayWrapperSerializer(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {}

@Override
public ByteBuffer serialize(ByteArrayWrapper object) throws SerializerException {
return ByteBuffer.wrap(object.value);
}

@Override
public ByteArrayWrapper read(ByteBuffer binary) throws ClassNotFoundException, SerializerException {
byte[] arr = new byte[binary.remaining()];
binary.get(arr);
return new ByteArrayWrapper(arr);
}

@Override
public boolean equals(ByteArrayWrapper object, ByteBuffer binary) throws ClassNotFoundException, SerializerException {
byte[] arr = new byte[binary.remaining()];
binary.get(arr);
return Arrays.equals(arr, object.value);
}
}

/**
* Transform a value from V to ByteArrayWrapper, which can be passed to ehcache.
* @param value the value
* @return the serialized value
*/
private ByteArrayWrapper serializeValue(V value) {
ByteArrayWrapper result = new ByteArrayWrapper(valueSerializer.serialize(value));
return result;
}

/**
* Transform a ByteArrayWrapper, which comes from ehcache, back to V.
* @param binary the serialized value
* @return the deserialized value
*/
private V deserializeValue(ByteArrayWrapper binary) {
if (binary == null) {
return null;
}
return valueSerializer.deserialize(binary.value);
}

/**
* Factory to create an ehcache disk cache.
*/
Expand Down Expand Up @@ -679,4 +735,28 @@ public EhcacheDiskCache<K, V> build() {
return new EhcacheDiskCache<>(this);
}
}

/**
* A wrapper over byte[], with equals() that works using Arrays.equals().
* Necessary due to a bug in Ehcache.
*/
static class ByteArrayWrapper {
private final byte[] value;
public ByteArrayWrapper(byte[] value) {
this.value = value;
}
@Override
public boolean equals(Object o) {
if (o == null || o.getClass() != ByteArrayWrapper.class) {
return false;
}
ByteArrayWrapper other = (ByteArrayWrapper) o;
return Arrays.equals(this.value, other.value);
}

@Override
public int hashCode() {
return Arrays.hashCode(value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.bytes.CompositeBytesReference;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

Expand Down Expand Up @@ -565,18 +566,26 @@ public void testBasicGetAndPutBytesReference() throws Exception {
.setValueType(BytesReference.class)
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES * 2) // bigger so no evictions happen
.setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES * 20) // bigger so no evictions happen
.setExpireAfterAccess(TimeValue.MAX_VALUE)
.setRemovalListener(new MockRemovalListener<>())
.build();
int randomKeys = randomIntBetween(10, 100);
int valueLength = 1000;
int valueLength = 100;
Random rand = Randomness.get();
Map<String, BytesReference> keyValueMap = new HashMap<>();
for (int i = 0; i < randomKeys; i++) {
byte[] valueBytes = new byte[valueLength];
rand.nextBytes(valueBytes);
keyValueMap.put(UUID.randomUUID().toString(), new BytesArray(valueBytes));

// Test a non-BytesArray implementation of BytesReference.
byte[] compositeBytes1 = new byte[valueLength];
byte[] compositeBytes2 = new byte[valueLength];
rand.nextBytes(compositeBytes1);
rand.nextBytes(compositeBytes2);
BytesReference composite = CompositeBytesReference.of(new BytesArray(compositeBytes1), new BytesArray(compositeBytes2));
keyValueMap.put(UUID.randomUUID().toString(), composite);
}
for (Map.Entry<String, BytesReference> entry : keyValueMap.entrySet()) {
ehCacheDiskCachingTier.put(entry.getKey(), entry.getValue());
Expand Down

0 comments on commit bad40cd

Please sign in to comment.