diff --git a/CHANGELOG.md b/CHANGELOG.md index ec097629eba82..90c51f5b4ee87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Metrics Framework] Adds support for asynchronous gauge metric type. ([#12642](https://github.com/opensearch-project/OpenSearch/issues/12642)) - Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601)) - [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542)) +- [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625)) +- [Tiered caching] Add serializer integration to allow ehcache disk cache to use non-primitive values ([#12709](https://github.com/opensearch-project/OpenSearch/pull/12709)) - [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583)) ### Dependencies diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java index a60d44db03f2c..d8a6eb480a5a5 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java @@ -14,6 +14,7 @@ import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.serializer.Serializer; import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; @@ -106,8 +107,11 @@ public MockDiskCacheFactory(long delay, int maxSize) { } @Override + @SuppressWarnings({ "unchecked" }) public ICache create(CacheConfig config, CacheType cacheType, Map cacheFactories) { - return new Builder().setMaxSize(maxSize) + return new Builder().setKeySerializer((Serializer) config.getKeySerializer()) + .setValueSerializer((Serializer) config.getValueSerializer()) + .setMaxSize(maxSize) .setDeliberateDelay(delay) .setRemovalListener(config.getRemovalListener()) .build(); @@ -123,6 +127,8 @@ public static class Builder extends ICacheBuilder { int maxSize; long delay; + Serializer keySerializer; + Serializer valueSerializer; @Override public ICache build() { @@ -138,5 +144,16 @@ public Builder setDeliberateDelay(long millis) { this.delay = millis; return this; } + + public Builder setKeySerializer(Serializer keySerializer) { + this.keySerializer = keySerializer; + return this; + } + + public Builder setValueSerializer(Serializer valueSerializer) { + this.valueSerializer = valueSerializer; + return this; + } + } } diff --git a/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java b/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java index 907bbc13df03c..d83a55e60fd2b 100644 --- a/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java +++ b/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java @@ -20,6 +20,7 @@ import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.serializer.Serializer; import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.collect.Tuple; @@ -29,7 +30,9 @@ import org.opensearch.common.unit.TimeValue; import java.io.File; +import java.nio.ByteBuffer; import java.time.Duration; +import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; @@ -50,6 +53,7 @@ import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder; import org.ehcache.config.builders.ResourcePoolsBuilder; import org.ehcache.config.units.MemoryUnit; +import org.ehcache.core.spi.service.FileBasedPersistenceContext; import org.ehcache.event.CacheEvent; import org.ehcache.event.CacheEventListener; import org.ehcache.event.EventType; @@ -57,6 +61,7 @@ import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration; import org.ehcache.spi.loaderwriter.CacheLoadingException; import org.ehcache.spi.loaderwriter.CacheWritingException; +import org.ehcache.spi.serialization.SerializerException; import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_CACHE_ALIAS_KEY; import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY; @@ -89,14 +94,14 @@ public class EhcacheDiskCache implements ICache { // A Cache manager can create many caches. private final PersistentCacheManager cacheManager; - // Disk cache - private Cache cache; + // Disk cache. Using ByteArrayWrapper to compare two byte[] by values rather than the default reference checks + private Cache cache; private final long maxWeightInBytes; private final String storagePath; private final Class keyType; private final Class valueType; private final TimeValue expireAfterAccess; - private final EhCacheEventListener ehCacheEventListener; + private final EhCacheEventListener ehCacheEventListener; private final String threadPoolAlias; private final Settings settings; private final RemovalListener removalListener; @@ -105,6 +110,9 @@ public class EhcacheDiskCache implements ICache { // TODO: Move count to stats once those changes are ready. private final CounterMetric entries = new CounterMetric(); + private final Serializer keySerializer; + private final Serializer valueSerializer; + /** * Used in computeIfAbsent to synchronize loading of a given key. This is needed as ehcache doesn't provide a * computeIfAbsent method. @@ -135,34 +143,36 @@ private EhcacheDiskCache(Builder builder) { this.threadPoolAlias = builder.threadPoolAlias; } this.settings = Objects.requireNonNull(builder.getSettings(), "Settings objects shouldn't be null"); + this.keySerializer = Objects.requireNonNull(builder.keySerializer, "Key serializer shouldn't be null"); + this.valueSerializer = Objects.requireNonNull(builder.valueSerializer, "Value serializer shouldn't be null"); this.cacheManager = buildCacheManager(); Objects.requireNonNull(builder.getRemovalListener(), "Removal listener can't be null"); this.removalListener = builder.getRemovalListener(); - this.ehCacheEventListener = new EhCacheEventListener(builder.getRemovalListener()); + this.ehCacheEventListener = new EhCacheEventListener(builder.getRemovalListener()); this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder); } - private Cache buildCache(Duration expireAfterAccess, Builder builder) { + private Cache buildCache(Duration expireAfterAccess, Builder builder) { try { return this.cacheManager.createCache( this.diskCacheAlias, CacheConfigurationBuilder.newCacheConfigurationBuilder( this.keyType, - this.valueType, + ByteArrayWrapper.class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B) ).withExpiry(new ExpiryPolicy<>() { @Override - public Duration getExpiryForCreation(K key, V value) { + public Duration getExpiryForCreation(K key, ByteArrayWrapper value) { return INFINITE; } @Override - public Duration getExpiryForAccess(K key, Supplier value) { + public Duration getExpiryForAccess(K key, Supplier value) { return expireAfterAccess; } @Override - public Duration getExpiryForUpdate(K key, Supplier oldValue, V newValue) { + public Duration getExpiryForUpdate(K key, Supplier oldValue, ByteArrayWrapper newValue) { return INFINITE; } }) @@ -176,6 +186,12 @@ public Duration getExpiryForUpdate(K key, Supplier oldValue, V newV (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings) ) ) + .withKeySerializer(new KeySerializerWrapper(keySerializer)) + .withValueSerializer(new ByteArrayWrapperSerializer()) + // We pass ByteArrayWrapperSerializer as ehcache's value serializer. If V is an interface, and we pass its + // serializer directly to ehcache, ehcache requires the classes match exactly before/after serialization. + // This is not always feasible or necessary, like for BytesReference. So, we handle the value serialization + // before V hits ehcache. ); } catch (IllegalArgumentException ex) { logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage()); @@ -238,7 +254,7 @@ public V get(K key) { } V value; try { - value = cache.get(key); + value = deserializeValue(cache.get(key)); } catch (CacheLoadingException ex) { throw new OpenSearchException("Exception occurred while trying to fetch item from ehcache disk cache"); } @@ -253,7 +269,7 @@ public V get(K key) { @Override public void put(K key, V value) { try { - cache.put(key, value); + cache.put(key, serializeValue(value)); } catch (CacheWritingException ex) { throw new OpenSearchException("Exception occurred while put item to ehcache disk cache"); } @@ -271,7 +287,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Except // Ehache doesn't provide any computeIfAbsent function. Exposes putIfAbsent but that works differently and is // not performant in case there are multiple concurrent request for same key. Below is our own custom // implementation of computeIfAbsent on top of ehcache. Inspired by OpenSearch Cache implementation. - V value = cache.get(key); + V value = deserializeValue(cache.get(key)); if (value == null) { value = compute(key, loader); } @@ -289,7 +305,7 @@ private V compute(K key, LoadAwareCacheLoader loader) throws Exception { BiFunction, Throwable, V> handler = (pair, ex) -> { V value = null; if (pair != null) { - cache.put(pair.v1(), pair.v2()); + cache.put(pair.v1(), serializeValue(pair.v2())); value = pair.v2(); // Returning a value itself assuming that a next get should return the same. Should // be safe to assume if we got no exception and reached here. } @@ -389,9 +405,9 @@ public void close() { */ class EhCacheKeyIterator implements Iterator { - Iterator> iterator; + Iterator> iterator; - EhCacheKeyIterator(Iterator> iterator) { + EhCacheKeyIterator(Iterator> iterator) { this.iterator = iterator; } @@ -416,10 +432,8 @@ public void remove() { /** * Wrapper over Ehcache original listener to listen to desired events and notify desired subscribers. - * @param Type of key - * @param Type of value */ - class EhCacheEventListener implements CacheEventListener { + class EhCacheEventListener implements CacheEventListener { private final RemovalListener removalListener; @@ -428,26 +442,29 @@ class EhCacheEventListener implements CacheEventListener { } @Override - public void onEvent(CacheEvent event) { + public void onEvent(CacheEvent event) { switch (event.getType()) { case CREATED: entries.inc(); - // this.eventListener.onCached(event.getKey(), event.getNewValue(), CacheStoreType.DISK); assert event.getOldValue() == null; break; case EVICTED: - this.removalListener.onRemoval(new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.EVICTED)); + this.removalListener.onRemoval( + new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.EVICTED) + ); entries.dec(); assert event.getNewValue() == null; break; case REMOVED: entries.dec(); - this.removalListener.onRemoval(new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.EXPLICIT)); + this.removalListener.onRemoval( + new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.EXPLICIT) + ); assert event.getNewValue() == null; break; case EXPIRED: this.removalListener.onRemoval( - new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.INVALIDATED) + new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.INVALIDATED) ); entries.dec(); assert event.getNewValue() == null; @@ -460,6 +477,94 @@ public void onEvent(CacheEvent event) { } } + /** + * Wrapper over Serializer which is compatible with ehcache's serializer requirements. + */ + private class KeySerializerWrapper implements org.ehcache.spi.serialization.Serializer { + private Serializer serializer; + + public KeySerializerWrapper(Serializer keySerializer) { + this.serializer = keySerializer; + } + + // This constructor must be present, but does not have to work as we are not actually persisting the disk + // cache after a restart. + // See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches + public KeySerializerWrapper(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {} + + @Override + public ByteBuffer serialize(T object) throws SerializerException { + return ByteBuffer.wrap(serializer.serialize(object)); + } + + @Override + public T read(ByteBuffer binary) throws ClassNotFoundException, SerializerException { + byte[] arr = new byte[binary.remaining()]; + binary.get(arr); + return serializer.deserialize(arr); + } + + @Override + public boolean equals(T object, ByteBuffer binary) throws ClassNotFoundException, SerializerException { + byte[] arr = new byte[binary.remaining()]; + binary.get(arr); + return serializer.equals(object, arr); + } + } + + /** + * Wrapper allowing Ehcache to serialize ByteArrayWrapper. + */ + private static class ByteArrayWrapperSerializer implements org.ehcache.spi.serialization.Serializer { + public ByteArrayWrapperSerializer() {} + + // This constructor must be present, but does not have to work as we are not actually persisting the disk + // cache after a restart. + // See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches + public ByteArrayWrapperSerializer(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {} + + @Override + public ByteBuffer serialize(ByteArrayWrapper object) throws SerializerException { + return ByteBuffer.wrap(object.value); + } + + @Override + public ByteArrayWrapper read(ByteBuffer binary) throws ClassNotFoundException, SerializerException { + byte[] arr = new byte[binary.remaining()]; + binary.get(arr); + return new ByteArrayWrapper(arr); + } + + @Override + public boolean equals(ByteArrayWrapper object, ByteBuffer binary) throws ClassNotFoundException, SerializerException { + byte[] arr = new byte[binary.remaining()]; + binary.get(arr); + return Arrays.equals(arr, object.value); + } + } + + /** + * Transform a value from V to ByteArrayWrapper, which can be passed to ehcache. + * @param value the value + * @return the serialized value + */ + private ByteArrayWrapper serializeValue(V value) { + ByteArrayWrapper result = new ByteArrayWrapper(valueSerializer.serialize(value)); + return result; + } + + /** + * Transform a ByteArrayWrapper, which comes from ehcache, back to V. + * @param binary the serialized value + * @return the deserialized value + */ + private V deserializeValue(ByteArrayWrapper binary) { + if (binary == null) { + return null; + } + return valueSerializer.deserialize(binary.value); + } + /** * Factory to create an ehcache disk cache. */ @@ -476,15 +581,33 @@ public static class EhcacheDiskCacheFactory implements ICache.Factory { public EhcacheDiskCacheFactory() {} @Override + @SuppressWarnings({ "unchecked" }) // Required to ensure the serializers output byte[] public ICache create(CacheConfig config, CacheType cacheType, Map cacheFactories) { Map> settingList = EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType); Settings settings = config.getSettings(); + + Serializer keySerializer = null; + try { + keySerializer = (Serializer) config.getKeySerializer(); + } catch (ClassCastException e) { + throw new IllegalArgumentException("EhcacheDiskCache requires a key serializer of type Serializer"); + } + + Serializer valueSerializer = null; + try { + valueSerializer = (Serializer) config.getValueSerializer(); + } catch (ClassCastException e) { + throw new IllegalArgumentException("EhcacheDiskCache requires a value serializer of type Serializer"); + } + return new Builder().setStoragePath((String) settingList.get(DISK_STORAGE_PATH_KEY).get(settings)) .setDiskCacheAlias((String) settingList.get(DISK_CACHE_ALIAS_KEY).get(settings)) .setIsEventListenerModeSync((Boolean) settingList.get(DISK_LISTENER_MODE_SYNC_KEY).get(settings)) .setCacheType(cacheType) .setKeyType((config.getKeyType())) .setValueType(config.getValueType()) + .setKeySerializer(keySerializer) + .setValueSerializer(valueSerializer) .setRemovalListener(config.getRemovalListener()) .setExpireAfterAccess((TimeValue) settingList.get(DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY).get(settings)) .setMaximumWeightInBytes((Long) settingList.get(DISK_MAX_SIZE_IN_BYTES_KEY).get(settings)) @@ -518,6 +641,8 @@ public static class Builder extends ICacheBuilder { private Class keyType; private Class valueType; + private Serializer keySerializer; + private Serializer valueSerializer; /** * Default constructor. Added to fix javadocs. @@ -594,9 +719,55 @@ public Builder setIsEventListenerModeSync(boolean isEventListenerModeSync) return this; } + /** + * Sets the key serializer for this cache. + * @param keySerializer the key serializer + * @return builder + */ + public Builder setKeySerializer(Serializer keySerializer) { + this.keySerializer = keySerializer; + return this; + } + + /** + * Sets the value serializer for this cache. + * @param valueSerializer the value serializer + * @return builder + */ + public Builder setValueSerializer(Serializer valueSerializer) { + this.valueSerializer = valueSerializer; + return this; + } + @Override public EhcacheDiskCache build() { return new EhcacheDiskCache<>(this); } } + + /** + * A wrapper over byte[], with equals() that works using Arrays.equals(). + * Necessary due to a bug in Ehcache. + */ + static class ByteArrayWrapper { + private final byte[] value; + + public ByteArrayWrapper(byte[] value) { + this.value = value; + } + + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != ByteArrayWrapper.class) { + return false; + } + ByteArrayWrapper other = (ByteArrayWrapper) o; + return Arrays.equals(this.value, other.value); + } + + @Override + public int hashCode() { + return Arrays.hashCode(value); + } + } } diff --git a/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java b/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java index 35cbab6372961..ac6bf6d6e7c2c 100644 --- a/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java +++ b/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java @@ -9,24 +9,33 @@ package org.opensearch.cache.store.disk; import org.opensearch.cache.EhcacheDiskCacheSettings; +import org.opensearch.common.Randomness; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.serializer.BytesReferenceSerializer; +import org.opensearch.common.cache.serializer.Serializer; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.bytes.CompositeBytesReference; import org.opensearch.env.NodeEnvironment; import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -51,6 +60,8 @@ public void testBasicGetAndPut() throws IOException { .setIsEventListenerModeSync(true) .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) @@ -89,6 +100,8 @@ public void testBasicGetAndPutUsingFactory() throws IOException { new CacheConfig.Builder().setValueType(String.class) .setKeyType(String.class) .setRemovalListener(removalListener) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .setSettings( Settings.builder() .put( @@ -149,6 +162,8 @@ public void testConcurrentPut() throws Exception { .setIsEventListenerModeSync(true) // For accurate count .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) @@ -194,6 +209,8 @@ public void testEhcacheParallelGets() throws Exception { .setIsEventListenerModeSync(true) // For accurate count .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) @@ -237,6 +254,8 @@ public void testEhcacheKeyIterator() throws Exception { .setIsEventListenerModeSync(true) .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) @@ -274,6 +293,8 @@ public void testEvictions() throws Exception { .setThreadPoolAlias("ehcacheTest") .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) @@ -304,6 +325,8 @@ public void testComputeIfAbsentConcurrently() throws Exception { .setThreadPoolAlias("ehcacheTest") .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) @@ -373,6 +396,8 @@ public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception .setThreadPoolAlias("ehcacheTest") .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) @@ -430,6 +455,8 @@ public void testComputeIfAbsentWithNullValueLoading() throws Exception { .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) @@ -491,6 +518,8 @@ public void testEhcacheKeyIteratorWithRemove() throws IOException { .setIsEventListenerModeSync(true) .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) @@ -525,6 +554,50 @@ public void testEhcacheKeyIteratorWithRemove() throws IOException { } + public void testBasicGetAndPutBytesReference() throws Exception { + Settings settings = Settings.builder().build(); + try (NodeEnvironment env = newNodeEnvironment(settings)) { + ICache ehCacheDiskCachingTier = new EhcacheDiskCache.Builder() + .setThreadPoolAlias("ehcacheTest") + .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new BytesReferenceSerializer()) + .setKeyType(String.class) + .setValueType(BytesReference.class) + .setCacheType(CacheType.INDICES_REQUEST_CACHE) + .setSettings(settings) + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES * 20) // bigger so no evictions happen + .setExpireAfterAccess(TimeValue.MAX_VALUE) + .setRemovalListener(new MockRemovalListener<>()) + .build(); + int randomKeys = randomIntBetween(10, 100); + int valueLength = 100; + Random rand = Randomness.get(); + Map keyValueMap = new HashMap<>(); + for (int i = 0; i < randomKeys; i++) { + byte[] valueBytes = new byte[valueLength]; + rand.nextBytes(valueBytes); + keyValueMap.put(UUID.randomUUID().toString(), new BytesArray(valueBytes)); + + // Test a non-BytesArray implementation of BytesReference. + byte[] compositeBytes1 = new byte[valueLength]; + byte[] compositeBytes2 = new byte[valueLength]; + rand.nextBytes(compositeBytes1); + rand.nextBytes(compositeBytes2); + BytesReference composite = CompositeBytesReference.of(new BytesArray(compositeBytes1), new BytesArray(compositeBytes2)); + keyValueMap.put(UUID.randomUUID().toString(), composite); + } + for (Map.Entry entry : keyValueMap.entrySet()) { + ehCacheDiskCachingTier.put(entry.getKey(), entry.getValue()); + } + for (Map.Entry entry : keyValueMap.entrySet()) { + BytesReference value = ehCacheDiskCachingTier.get(entry.getKey()); + assertEquals(entry.getValue(), value); + } + ehCacheDiskCachingTier.close(); + } + } + private static String generateRandomString(int length) { String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; StringBuilder randomString = new StringBuilder(length); @@ -546,4 +619,25 @@ public void onRemoval(RemovalNotification notification) { evictionMetric.inc(); } } + + static class StringSerializer implements Serializer { + private final Charset charset = StandardCharsets.UTF_8; + + @Override + public byte[] serialize(String object) { + return object.getBytes(charset); + } + + @Override + public String deserialize(byte[] bytes) { + if (bytes == null) { + return null; + } + return new String(bytes, charset); + } + + public boolean equals(String object, byte[] bytes) { + return object.equals(deserialize(bytes)); + } + } } diff --git a/server/src/main/java/org/opensearch/common/cache/serializer/BytesReferenceSerializer.java b/server/src/main/java/org/opensearch/common/cache/serializer/BytesReferenceSerializer.java new file mode 100644 index 0000000000000..d1cd872f5801f --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/serializer/BytesReferenceSerializer.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.serializer; + +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; + +import java.util.Arrays; + +/** + * A serializer which transforms BytesReference to byte[]. + * The type of BytesReference is NOT preserved after deserialization, but nothing in opensearch should care. + */ +public class BytesReferenceSerializer implements Serializer { + // This class does not get passed to ehcache itself, so it's not required that classes match after deserialization. + + public BytesReferenceSerializer() {} + + @Override + public byte[] serialize(BytesReference object) { + return BytesReference.toBytes(object); + } + + @Override + public BytesReference deserialize(byte[] bytes) { + if (bytes == null) { + return null; + } + return new BytesArray(bytes); + } + + @Override + public boolean equals(BytesReference object, byte[] bytes) { + return Arrays.equals(serialize(object), bytes); + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/serializer/Serializer.java b/server/src/main/java/org/opensearch/common/cache/serializer/Serializer.java new file mode 100644 index 0000000000000..35e28707d1ca3 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/serializer/Serializer.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.serializer; + +/** + * Defines an interface for serializers, to be used by pluggable caches. + * T is the class of the original object, and U is the serialized class. + */ +public interface Serializer { + /** + * Serializes an object. + * @param object A non-serialized object. + * @return The serialized representation of the object. + */ + U serialize(T object); + + /** + * Deserializes bytes into an object. + * @param bytes The serialized representation. + * @return The original object. + */ + T deserialize(U bytes); + + /** + * Compares an object to a serialized representation of an object. + * @param object A non-serialized objet + * @param bytes Serialized representation of an object + * @return true if representing the same object, false if not + */ + boolean equals(T object, U bytes); +} diff --git a/server/src/main/java/org/opensearch/common/cache/serializer/package-info.java b/server/src/main/java/org/opensearch/common/cache/serializer/package-info.java new file mode 100644 index 0000000000000..e66a9aa4cf68c --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/serializer/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +/** A package for serializers used in caches. */ +package org.opensearch.common.cache.serializer; diff --git a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java index 6ecb752f91fb9..4c9881e845d42 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java +++ b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java @@ -11,6 +11,7 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.policy.CachedQueryResult; +import org.opensearch.common.cache.serializer.Serializer; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -44,6 +45,10 @@ public class CacheConfig { private final RemovalListener removalListener; + // Serializers for keys and values. Not required for all caches. + private final Serializer keySerializer; + private final Serializer valueSerializer; + /** A function which extracts policy-relevant information, such as took time, from values, to allow inspection by policies if present. */ private Function cachedResultParser; /** @@ -62,6 +67,8 @@ private CacheConfig(Builder builder) { this.settings = builder.settings; this.removalListener = builder.removalListener; this.weigher = builder.weigher; + this.keySerializer = builder.keySerializer; + this.valueSerializer = builder.valueSerializer; this.cachedResultParser = builder.cachedResultParser; this.maxSizeInBytes = builder.maxSizeInBytes; this.expireAfterAccess = builder.expireAfterAccess; @@ -83,6 +90,14 @@ public RemovalListener getRemovalListener() { return removalListener; } + public Serializer getKeySerializer() { + return keySerializer; + } + + public Serializer getValueSerializer() { + return valueSerializer; + } + public ToLongBiFunction getWeigher() { return weigher; } @@ -114,6 +129,9 @@ public static class Builder { private RemovalListener removalListener; + private Serializer keySerializer; + private Serializer valueSerializer; + private ToLongBiFunction weigher; private Function cachedResultParser; @@ -143,6 +161,16 @@ public Builder setRemovalListener(RemovalListener removalListener) { return this; } + public Builder setKeySerializer(Serializer keySerializer) { + this.keySerializer = keySerializer; + return this; + } + + public Builder setValueSerializer(Serializer valueSerializer) { + this.valueSerializer = valueSerializer; + return this; + } + public Builder setWeigher(ToLongBiFunction weigher) { this.weigher = weigher; return this; diff --git a/server/src/main/java/org/opensearch/indices/IRCKeyWriteableSerializer.java b/server/src/main/java/org/opensearch/indices/IRCKeyWriteableSerializer.java new file mode 100644 index 0000000000000..781f5765d8da8 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/IRCKeyWriteableSerializer.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.cache.serializer.Serializer; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BytesStreamInput; + +import java.io.IOException; +import java.util.Arrays; + +/** + * This class serializes the IndicesRequestCache.Key using its writeTo method. + */ +public class IRCKeyWriteableSerializer implements Serializer { + + public IRCKeyWriteableSerializer() {} + + @Override + public byte[] serialize(IndicesRequestCache.Key object) { + if (object == null) { + return null; + } + try { + BytesStreamOutput os = new BytesStreamOutput(); + object.writeTo(os); + return BytesReference.toBytes(os.bytes()); + } catch (IOException e) { + throw new OpenSearchException("Unable to serialize IndicesRequestCache.Key", e); + } + } + + @Override + public IndicesRequestCache.Key deserialize(byte[] bytes) { + if (bytes == null) { + return null; + } + try { + BytesStreamInput is = new BytesStreamInput(bytes, 0, bytes.length); + return new IndicesRequestCache.Key(is); + } catch (IOException e) { + throw new OpenSearchException("Unable to deserialize byte[] to IndicesRequestCache.Key", e); + } + } + + @Override + public boolean equals(IndicesRequestCache.Key object, byte[] bytes) { + // Deserialization is much slower than serialization for keys of order 1 KB, + // while time to serialize is fairly constant (per byte) + if (bytes.length < 5000) { + return Arrays.equals(serialize(object), bytes); + } else { + return object.equals(deserialize(bytes)); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index d22f131853a78..dee7b4c15ece3 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -45,6 +45,7 @@ import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.policy.CachedQueryResult; +import org.opensearch.common.cache.serializer.BytesReferenceSerializer; import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; @@ -145,6 +146,8 @@ public final class IndicesRequestCache implements RemovalListener