diff --git a/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java b/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java index 0b9c596745532..24026cb37b743 100644 --- a/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java +++ b/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java @@ -34,6 +34,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.time.Duration; +import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; @@ -93,8 +94,8 @@ public class EhcacheDiskCache implements ICache { // A Cache manager can create many caches. private final PersistentCacheManager cacheManager; - // Disk cache - private Cache cache; + // Disk cache. Because of a bug in ehcache, we have to store ByteArrayWrapper rather than byte[]. + private Cache cache; private final long maxWeightInBytes; private final String storagePath; private final Class keyType; @@ -151,27 +152,27 @@ private EhcacheDiskCache(Builder builder) { this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder); } - private Cache buildCache(Duration expireAfterAccess, Builder builder) { + private Cache buildCache(Duration expireAfterAccess, Builder builder) { try { return this.cacheManager.createCache( this.diskCacheAlias, CacheConfigurationBuilder.newCacheConfigurationBuilder( this.keyType, - byte[].class, + ByteArrayWrapper.class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B) ).withExpiry(new ExpiryPolicy<>() { @Override - public Duration getExpiryForCreation(K key, byte[] value) { + public Duration getExpiryForCreation(K key, ByteArrayWrapper value) { return INFINITE; } @Override - public Duration getExpiryForAccess(K key, Supplier value) { + public Duration getExpiryForAccess(K key, Supplier value) { return expireAfterAccess; } @Override - public Duration getExpiryForUpdate(K key, Supplier oldValue, byte[] newValue) { + public Duration getExpiryForUpdate(K key, Supplier oldValue, ByteArrayWrapper newValue) { return INFINITE; } }) @@ -185,7 +186,8 @@ public Duration getExpiryForUpdate(K key, Supplier oldValue, b (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings) ) ) - .withKeySerializer(new KeySerializerWrapper(keySerializer)) + .withKeySerializer(new KeySerializerWrapper(keySerializer)) + .withValueSerializer(new ByteArrayWrapperSerializer()) ); } catch (IllegalArgumentException ex) { logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage()); @@ -248,7 +250,7 @@ public V get(K key) { } V value; try { - value = valueSerializer.deserialize(cache.get(key)); + value = deserializeValue(cache.get(key)); } catch (CacheLoadingException ex) { throw new OpenSearchException("Exception occurred while trying to fetch item from ehcache disk cache"); } @@ -263,7 +265,7 @@ public V get(K key) { @Override public void put(K key, V value) { try { - cache.put(key, valueSerializer.serialize(value)); + cache.put(key, serializeValue(value)); } catch (CacheWritingException ex) { throw new OpenSearchException("Exception occurred while put item to ehcache disk cache"); } @@ -281,7 +283,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Except // Ehache doesn't provide any computeIfAbsent function. Exposes putIfAbsent but that works differently and is // not performant in case there are multiple concurrent request for same key. Below is our own custom // implementation of computeIfAbsent on top of ehcache. Inspired by OpenSearch Cache implementation. - V value = valueSerializer.deserialize(cache.get(key)); + V value = deserializeValue(cache.get(key)); if (value == null) { value = compute(key, loader); } @@ -299,7 +301,7 @@ private V compute(K key, LoadAwareCacheLoader loader) throws Exception { BiFunction, Throwable, V> handler = (pair, ex) -> { V value = null; if (pair != null) { - cache.put(pair.v1(), valueSerializer.serialize(pair.v2())); + cache.put(pair.v1(), serializeValue(pair.v2())); value = pair.v2(); // Returning a value itself assuming that a next get should return the same. Should // be safe to assume if we got no exception and reached here. } @@ -399,9 +401,9 @@ public void close() { */ class EhCacheKeyIterator implements Iterator { - Iterator> iterator; + Iterator> iterator; - EhCacheKeyIterator(Iterator> iterator) { + EhCacheKeyIterator(Iterator> iterator) { this.iterator = iterator; } @@ -427,7 +429,7 @@ public void remove() { /** * Wrapper over Ehcache original listener to listen to desired events and notify desired subscribers. */ - class EhCacheEventListener implements CacheEventListener { + class EhCacheEventListener implements CacheEventListener { private final RemovalListener removalListener; @@ -436,25 +438,25 @@ class EhCacheEventListener implements CacheEventListener { } @Override - public void onEvent(CacheEvent event) { + public void onEvent(CacheEvent event) { switch (event.getType()) { case CREATED: entries.inc(); assert event.getOldValue() == null; break; case EVICTED: - this.removalListener.onRemoval(new RemovalNotification<>(event.getKey(), valueSerializer.deserialize(event.getOldValue()), RemovalReason.EVICTED)); + this.removalListener.onRemoval(new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.EVICTED)); entries.dec(); assert event.getNewValue() == null; break; case REMOVED: entries.dec(); - this.removalListener.onRemoval(new RemovalNotification<>(event.getKey(), valueSerializer.deserialize(event.getOldValue()), RemovalReason.EXPLICIT)); + this.removalListener.onRemoval(new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.EXPLICIT)); assert event.getNewValue() == null; break; case EXPIRED: this.removalListener.onRemoval( - new RemovalNotification<>(event.getKey(), valueSerializer.deserialize(event.getOldValue()), RemovalReason.INVALIDATED) + new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.INVALIDATED) ); entries.dec(); assert event.getNewValue() == null; @@ -470,9 +472,9 @@ public void onEvent(CacheEvent event) { /** * Wrapper over Serializer which is compatible with ehcache's serializer requirements. */ - private class KeySerializerWrapper implements org.ehcache.spi.serialization.Serializer { - private Serializer serializer; - public KeySerializerWrapper(Serializer keySerializer) { + private class KeySerializerWrapper implements org.ehcache.spi.serialization.Serializer { + private Serializer serializer; + public KeySerializerWrapper(Serializer keySerializer) { this.serializer = keySerializer; } @@ -482,25 +484,79 @@ public KeySerializerWrapper(Serializer keySerializer) { public KeySerializerWrapper(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {} @Override - public ByteBuffer serialize(K object) throws SerializerException { + public ByteBuffer serialize(T object) throws SerializerException { return ByteBuffer.wrap(serializer.serialize(object)); } @Override - public K read(ByteBuffer binary) throws ClassNotFoundException, SerializerException { + public T read(ByteBuffer binary) throws ClassNotFoundException, SerializerException { byte[] arr = new byte[binary.remaining()]; binary.get(arr); return serializer.deserialize(arr); } @Override - public boolean equals(K object, ByteBuffer binary) throws ClassNotFoundException, SerializerException { + public boolean equals(T object, ByteBuffer binary) throws ClassNotFoundException, SerializerException { byte[] arr = new byte[binary.remaining()]; binary.get(arr); return serializer.equals(object, arr); } } + /** + * Wrapper allowing Ehcache to serialize ByteArrayWrapper. + */ + private static class ByteArrayWrapperSerializer implements org.ehcache.spi.serialization.Serializer { + public ByteArrayWrapperSerializer() { + } + + // This constructor must be present, but does not have to work as we are not actually persisting the disk + // cache after a restart. + // See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches + public ByteArrayWrapperSerializer(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {} + + @Override + public ByteBuffer serialize(ByteArrayWrapper object) throws SerializerException { + return ByteBuffer.wrap(object.value); + } + + @Override + public ByteArrayWrapper read(ByteBuffer binary) throws ClassNotFoundException, SerializerException { + byte[] arr = new byte[binary.remaining()]; + binary.get(arr); + return new ByteArrayWrapper(arr); + } + + @Override + public boolean equals(ByteArrayWrapper object, ByteBuffer binary) throws ClassNotFoundException, SerializerException { + byte[] arr = new byte[binary.remaining()]; + binary.get(arr); + return Arrays.equals(arr, object.value); + } + } + + /** + * Transform a value from V to ByteArrayWrapper, which can be passed to ehcache. + * @param value the value + * @return the serialized value + */ + private ByteArrayWrapper serializeValue(V value) { + ByteArrayWrapper result = new ByteArrayWrapper(valueSerializer.serialize(value)); + return result; + } + + /** + * Transform a ByteArrayWrapper, which comes from ehcache, back to V. + * @param binary the serialized value + * @return the deserialized value + */ + private V deserializeValue(ByteArrayWrapper binary) { + if (binary == null) { + return null; + } + return valueSerializer.deserialize(binary.value); + } + /** * Factory to create an ehcache disk cache. */ @@ -679,4 +735,28 @@ public EhcacheDiskCache build() { return new EhcacheDiskCache<>(this); } } + + /** + * A wrapper over byte[], with equals() that works using Arrays.equals(). + * Necessary due to a bug in Ehcache. + */ + static class ByteArrayWrapper { + private final byte[] value; + public ByteArrayWrapper(byte[] value) { + this.value = value; + } + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != ByteArrayWrapper.class) { + return false; + } + ByteArrayWrapper other = (ByteArrayWrapper) o; + return Arrays.equals(this.value, other.value); + } + + @Override + public int hashCode() { + return Arrays.hashCode(value); + } + } } diff --git a/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java b/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java index 97f52d47dffca..f3a06dd05fd3a 100644 --- a/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java +++ b/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java @@ -23,6 +23,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.bytes.CompositeBytesReference; import org.opensearch.env.NodeEnvironment; import org.opensearch.test.OpenSearchSingleNodeTestCase; @@ -565,18 +566,26 @@ public void testBasicGetAndPutBytesReference() throws Exception { .setValueType(BytesReference.class) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) - .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES * 2) // bigger so no evictions happen + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES * 20) // bigger so no evictions happen .setExpireAfterAccess(TimeValue.MAX_VALUE) .setRemovalListener(new MockRemovalListener<>()) .build(); int randomKeys = randomIntBetween(10, 100); - int valueLength = 1000; + int valueLength = 100; Random rand = Randomness.get(); Map keyValueMap = new HashMap<>(); for (int i = 0; i < randomKeys; i++) { byte[] valueBytes = new byte[valueLength]; rand.nextBytes(valueBytes); keyValueMap.put(UUID.randomUUID().toString(), new BytesArray(valueBytes)); + + // Test a non-BytesArray implementation of BytesReference. + byte[] compositeBytes1 = new byte[valueLength]; + byte[] compositeBytes2 = new byte[valueLength]; + rand.nextBytes(compositeBytes1); + rand.nextBytes(compositeBytes2); + BytesReference composite = CompositeBytesReference.of(new BytesArray(compositeBytes1), new BytesArray(compositeBytes2)); + keyValueMap.put(UUID.randomUUID().toString(), composite); } for (Map.Entry entry : keyValueMap.entrySet()) { ehCacheDiskCachingTier.put(entry.getKey(), entry.getValue());