From b196615f2e26c51bc7d7694de6a34c2e1883b9c8 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Thu, 19 Oct 2023 10:20:13 -0700 Subject: [PATCH] [Tiered caching] Framework changes Signed-off-by: Sagar Upadhyaya --- .../org/opensearch/common/cache/Cache.java | 112 +++-- .../common/cache/RemovalNotification.java | 12 + .../cache/request/ShardRequestCache.java | 48 +- .../AbstractIndexShardCacheEntity.java | 19 +- .../org/opensearch/indices/CachingTier.java | 45 ++ .../opensearch/indices/DiskCachingTier.java | 18 + .../indices/DummyDiskCachingTier.java | 58 +++ .../indices/IndicesRequestCache.java | 73 +-- .../opensearch/indices/OnHeapCachingTier.java | 16 + .../indices/OpenSearchOnHeapCache.java | 124 +++++ .../java/org/opensearch/indices/TierType.java | 15 + .../indices/TieredCacheEventListener.java | 22 + .../opensearch/indices/TieredCacheLoader.java | 15 + .../indices/TieredCacheService.java | 34 ++ .../TieredCacheSpilloverStrategyService.java | 231 +++++++++ .../indices/IndicesServiceCloseTests.java | 6 +- ...redCacheSpilloverStrategyServiceTests.java | 458 ++++++++++++++++++ 17 files changed, 1198 insertions(+), 108 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/CachingTier.java create mode 100644 server/src/main/java/org/opensearch/indices/DiskCachingTier.java create mode 100644 server/src/main/java/org/opensearch/indices/DummyDiskCachingTier.java create mode 100644 server/src/main/java/org/opensearch/indices/OnHeapCachingTier.java create mode 100644 server/src/main/java/org/opensearch/indices/OpenSearchOnHeapCache.java create mode 100644 server/src/main/java/org/opensearch/indices/TierType.java create mode 100644 server/src/main/java/org/opensearch/indices/TieredCacheEventListener.java create mode 100644 server/src/main/java/org/opensearch/indices/TieredCacheLoader.java create mode 100644 server/src/main/java/org/opensearch/indices/TieredCacheService.java create mode 100644 server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyService.java create mode 100644 server/src/test/java/org/opensearch/indices/TieredCacheSpilloverStrategyServiceTests.java diff --git a/server/src/main/java/org/opensearch/common/cache/Cache.java b/server/src/main/java/org/opensearch/common/cache/Cache.java index 0b2b608b55df0..9e91866cde2df 100644 --- a/server/src/main/java/org/opensearch/common/cache/Cache.java +++ b/server/src/main/java/org/opensearch/common/cache/Cache.java @@ -422,68 +422,74 @@ public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionExcept } }); if (value == null) { - // we need to synchronize loading of a value for a given key; however, holding the segment lock while - // invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we - // need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding - // the segment lock; to do this, we atomically put a future in the map that can load the value, and then - // get the value from this future on the thread that won the race to place the future into the segment map - CacheSegment segment = getCacheSegment(key); - CompletableFuture> future; - CompletableFuture> completableFuture = new CompletableFuture<>(); + value = compute(key, loader); + } + return value; + } - try (ReleasableLock ignored = segment.writeLock.acquire()) { - future = segment.map.putIfAbsent(key, completableFuture); - } + public V compute(K key, CacheLoader loader) throws ExecutionException { + long now = now(); + // we need to synchronize loading of a value for a given key; however, holding the segment lock while + // invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we + // need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding + // the segment lock; to do this, we atomically put a future in the map that can load the value, and then + // get the value from this future on the thread that won the race to place the future into the segment map + CacheSegment segment = getCacheSegment(key); + CompletableFuture> future; + CompletableFuture> completableFuture = new CompletableFuture<>(); - BiFunction, Throwable, ? extends V> handler = (ok, ex) -> { - if (ok != null) { - try (ReleasableLock ignored = lruLock.acquire()) { - promote(ok, now); - } - return ok.value; - } else { - try (ReleasableLock ignored = segment.writeLock.acquire()) { - CompletableFuture> sanity = segment.map.get(key); - if (sanity != null && sanity.isCompletedExceptionally()) { - segment.map.remove(key); - } - } - return null; - } - }; + try (ReleasableLock ignored = segment.writeLock.acquire()) { + future = segment.map.putIfAbsent(key, completableFuture); + } - CompletableFuture completableValue; - if (future == null) { - future = completableFuture; - completableValue = future.handle(handler); - V loaded; - try { - loaded = loader.load(key); - } catch (Exception e) { - future.completeExceptionally(e); - throw new ExecutionException(e); - } - if (loaded == null) { - NullPointerException npe = new NullPointerException("loader returned a null value"); - future.completeExceptionally(npe); - throw new ExecutionException(npe); - } else { - future.complete(new Entry<>(key, loaded, now)); + BiFunction, Throwable, ? extends V> handler = (ok, ex) -> { + if (ok != null) { + try (ReleasableLock ignored = lruLock.acquire()) { + promote(ok, now); } + return ok.value; } else { - completableValue = future.handle(handler); + try (ReleasableLock ignored = segment.writeLock.acquire()) { + CompletableFuture> sanity = segment.map.get(key); + if (sanity != null && sanity.isCompletedExceptionally()) { + segment.map.remove(key); + } + } + return null; } + }; + CompletableFuture completableValue; + if (future == null) { + future = completableFuture; + completableValue = future.handle(handler); + V loaded; try { - value = completableValue.get(); - // check to ensure the future hasn't been completed with an exception - if (future.isCompletedExceptionally()) { - future.get(); // call get to force the exception to be thrown for other concurrent callers - throw new IllegalStateException("the future was completed exceptionally but no exception was thrown"); - } - } catch (InterruptedException e) { - throw new IllegalStateException(e); + loaded = loader.load(key); + } catch (Exception e) { + future.completeExceptionally(e); + throw new ExecutionException(e); } + if (loaded == null) { + NullPointerException npe = new NullPointerException("loader returned a null value"); + future.completeExceptionally(npe); + throw new ExecutionException(npe); + } else { + future.complete(new Entry<>(key, loaded, now)); + } + } else { + completableValue = future.handle(handler); + } + V value; + try { + value = completableValue.get(); + // check to ensure the future hasn't been completed with an exception + if (future.isCompletedExceptionally()) { + future.get(); // call get to force the exception to be thrown for other concurrent callers + throw new IllegalStateException("the future was completed exceptionally but no exception was thrown"); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); } return value; } diff --git a/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java b/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java index 6d355b2122460..71e240064c6ae 100644 --- a/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java +++ b/server/src/main/java/org/opensearch/common/cache/RemovalNotification.java @@ -32,6 +32,8 @@ package org.opensearch.common.cache; +import org.opensearch.indices.TierType; + /** * Notification when an element is removed from the cache * @@ -42,11 +44,17 @@ public class RemovalNotification { private final K key; private final V value; private final RemovalReason removalReason; + private final TierType tierType; public RemovalNotification(K key, V value, RemovalReason removalReason) { + this(key, value, removalReason, TierType.ON_HEAP); + } + + public RemovalNotification(K key, V value, RemovalReason removalReason, TierType tierType) { this.key = key; this.value = value; this.removalReason = removalReason; + this.tierType = tierType; } public K getKey() { @@ -60,4 +68,8 @@ public V getValue() { public RemovalReason getRemovalReason() { return removalReason; } + + public TierType getTierType() { + return tierType; + } } diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index b13eec79c2be8..1beef5217355f 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -35,6 +35,9 @@ import org.apache.lucene.util.Accountable; import org.opensearch.common.metrics.CounterMetric; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.indices.TierType; + +import java.util.EnumMap; /** * Tracks the portion of the request cache in use for a particular shard. @@ -43,30 +46,39 @@ */ public final class ShardRequestCache { - final CounterMetric evictionsMetric = new CounterMetric(); - final CounterMetric totalMetric = new CounterMetric(); - final CounterMetric hitCount = new CounterMetric(); - final CounterMetric missCount = new CounterMetric(); + private EnumMap statsHolder = new EnumMap<>(TierType.class); + + public ShardRequestCache() { + for (TierType tierType : TierType.values()) { + statsHolder.put(tierType, new StatsHolder()); + } + } public RequestCacheStats stats() { - return new RequestCacheStats(totalMetric.count(), evictionsMetric.count(), hitCount.count(), missCount.count()); + // TODO: Change RequestCacheStats to support disk tier stats. + return new RequestCacheStats( + statsHolder.get(TierType.ON_HEAP).totalMetric.count(), + statsHolder.get(TierType.ON_HEAP).evictionsMetric.count(), + statsHolder.get(TierType.ON_HEAP).hitCount.count(), + statsHolder.get(TierType.ON_HEAP).missCount.count() + ); } - public void onHit() { - hitCount.inc(); + public void onHit(TierType tierType) { + statsHolder.get(tierType).hitCount.inc(); } - public void onMiss() { - missCount.inc(); + public void onMiss(TierType tierType) { + statsHolder.get(tierType).missCount.inc(); } - public void onCached(Accountable key, BytesReference value) { - totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed()); + public void onCached(Accountable key, BytesReference value, TierType tierType) { + statsHolder.get(tierType).totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed()); } - public void onRemoval(Accountable key, BytesReference value, boolean evicted) { + public void onRemoval(Accountable key, BytesReference value, boolean evicted, TierType tierType) { if (evicted) { - evictionsMetric.inc(); + statsHolder.get(tierType).evictionsMetric.inc(); } long dec = 0; if (key != null) { @@ -75,6 +87,14 @@ public void onRemoval(Accountable key, BytesReference value, boolean evicted) { if (value != null) { dec += value.ramBytesUsed(); } - totalMetric.dec(dec); + statsHolder.get(tierType).totalMetric.dec(dec); + } + + static class StatsHolder { + + final CounterMetric evictionsMetric = new CounterMetric(); + final CounterMetric totalMetric = new CounterMetric(); + final CounterMetric hitCount = new CounterMetric(); + final CounterMetric missCount = new CounterMetric(); } } diff --git a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java index bb1201cb910a9..2eef16df2bb9a 100644 --- a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java +++ b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java @@ -51,22 +51,27 @@ abstract class AbstractIndexShardCacheEntity implements IndicesRequestCache.Cach protected abstract ShardRequestCache stats(); @Override - public final void onCached(IndicesRequestCache.Key key, BytesReference value) { - stats().onCached(key, value); + public final void onCached(IndicesRequestCache.Key key, BytesReference value, TierType tierType) { + stats().onCached(key, value, tierType); } @Override - public final void onHit() { - stats().onHit(); + public final void onHit(TierType tierType) { + stats().onHit(tierType); } @Override - public final void onMiss() { - stats().onMiss(); + public final void onMiss(TierType tierType) { + stats().onMiss(tierType); } @Override public final void onRemoval(RemovalNotification notification) { - stats().onRemoval(notification.getKey(), notification.getValue(), notification.getRemovalReason() == RemovalReason.EVICTED); + stats().onRemoval( + notification.getKey(), + notification.getValue(), + notification.getRemovalReason() == RemovalReason.EVICTED, + notification.getTierType() + ); } } diff --git a/server/src/main/java/org/opensearch/indices/CachingTier.java b/server/src/main/java/org/opensearch/indices/CachingTier.java new file mode 100644 index 0000000000000..7bf7294467114 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/CachingTier.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.common.cache.RemovalListener; + +/** + * Caching tier interface. Can be implemented/extended by concrete classes to provide different flavors of cache like + * onHeap, disk etc. + * @param Type of key + * @param Type of value + */ +public interface CachingTier { + + V get(K key); + + void put(K key, V value); + + V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception; + + void invalidate(K key); + + V compute(K key, TieredCacheLoader loader) throws Exception; + + void setRemovalListener(RemovalListener removalListener); + + void invalidateAll(); + + Iterable keys(); + + int count(); + + TierType getTierType(); + + /** + * Force any outstanding size-based and time-based evictions to occur + */ + default void refresh() {} +} diff --git a/server/src/main/java/org/opensearch/indices/DiskCachingTier.java b/server/src/main/java/org/opensearch/indices/DiskCachingTier.java new file mode 100644 index 0000000000000..abc8952c0b8e2 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/DiskCachingTier.java @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +/** + * This is specific to disk caching tier and can be used to add methods which are specific to disk tier. + * @param Type of key + * @param Type of value + */ +public interface DiskCachingTier extends CachingTier { + +} diff --git a/server/src/main/java/org/opensearch/indices/DummyDiskCachingTier.java b/server/src/main/java/org/opensearch/indices/DummyDiskCachingTier.java new file mode 100644 index 0000000000000..26a78b6c61920 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/DummyDiskCachingTier.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.common.cache.RemovalListener; + +import java.util.Collections; + +public class DummyDiskCachingTier implements CachingTier { + + @Override + public V get(K key) { + return null; + } + + @Override + public void put(K key, V value) {} + + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { + return null; + } + + @Override + public void invalidate(K key) {} + + @Override + public V compute(K key, TieredCacheLoader loader) throws Exception { + return null; + } + + @Override + public void setRemovalListener(RemovalListener removalListener) {} + + @Override + public void invalidateAll() {} + + @Override + public Iterable keys() { + return Collections::emptyIterator; + } + + @Override + public int count() { + return 0; + } + + @Override + public TierType getTierType() { + return TierType.DISK; + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 629cea102a8b2..6781d3fa0eb2a 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -39,10 +39,6 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.common.CheckedSupplier; -import org.opensearch.common.cache.Cache; -import org.opensearch.common.cache.CacheBuilder; -import org.opensearch.common.cache.CacheLoader; -import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Setting; @@ -78,7 +74,7 @@ * * @opensearch.internal */ -public final class IndicesRequestCache implements RemovalListener, Closeable { +public final class IndicesRequestCache implements TieredCacheEventListener, Closeable { private static final Logger logger = LogManager.getLogger(IndicesRequestCache.class); @@ -107,25 +103,27 @@ public final class IndicesRequestCache implements RemovalListener keysToClean = ConcurrentCollections.newConcurrentSet(); private final ByteSizeValue size; private final TimeValue expire; - private final Cache cache; + private final TieredCacheService tieredCacheService; IndicesRequestCache(Settings settings) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); - CacheBuilder cacheBuilder = CacheBuilder.builder() - .setMaximumWeight(sizeInBytes) - .weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()) - .removalListener(this); - if (expire != null) { - cacheBuilder.setExpireAfterAccess(expire); - } - cache = cacheBuilder.build(); + + // Initialize onHeap cache tier first. + OnHeapCachingTier openSearchOnHeapCache = new OpenSearchOnHeapCache.Builder().setWeigher( + (k, v) -> k.ramBytesUsed() + v.ramBytesUsed() + ).setMaximumWeight(sizeInBytes).setExpireAfterAccess(expire).build(); + + // Initialize tiered cache service. TODO: Enable Disk tier when tiered support is turned on. + tieredCacheService = new TieredCacheSpilloverStrategyService.Builder().setOnHeapCachingTier( + openSearchOnHeapCache + ).setTieredCacheEventListener(this).build(); } @Override public void close() { - cache.invalidateAll(); + tieredCacheService.invalidateAll(); } void clear(CacheEntity entity) { @@ -133,11 +131,26 @@ void clear(CacheEntity entity) { cleanCache(); } + @Override + public void onMiss(Key key, TierType tierType) { + key.entity.onMiss(tierType); + } + @Override public void onRemoval(RemovalNotification notification) { notification.getKey().entity.onRemoval(notification); } + @Override + public void onHit(Key key, BytesReference value, TierType tierType) { + key.entity.onHit(tierType); + } + + @Override + public void onCached(Key key, BytesReference value, TierType tierType) { + key.entity.onCached(key, value, tierType); + } + BytesReference getOrCompute( CacheEntity cacheEntity, CheckedSupplier loader, @@ -147,9 +160,8 @@ BytesReference getOrCompute( assert reader.getReaderCacheHelper() != null; final Key key = new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey); Loader cacheLoader = new Loader(cacheEntity, loader); - BytesReference value = cache.computeIfAbsent(key, cacheLoader); + BytesReference value = tieredCacheService.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { - key.entity.onMiss(); // see if its the first time we see this reader, and make sure to register a cleanup key CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getReaderCacheHelper().getKey()); if (!registeredClosedListeners.containsKey(cleanupKey)) { @@ -158,8 +170,6 @@ BytesReference getOrCompute( OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey); } } - } else { - key.entity.onHit(); } return value; } @@ -172,7 +182,7 @@ BytesReference getOrCompute( */ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { assert reader.getReaderCacheHelper() != null; - cache.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey)); + tieredCacheService.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey)); } /** @@ -180,7 +190,7 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference * * @opensearch.internal */ - private static class Loader implements CacheLoader { + private static class Loader implements org.opensearch.indices.TieredCacheLoader { private final CacheEntity entity; private final CheckedSupplier loader; @@ -198,7 +208,6 @@ public boolean isLoaded() { @Override public BytesReference load(Key key) throws Exception { BytesReference value = loader.get(); - entity.onCached(key, value); loaded = true; return value; } @@ -212,7 +221,7 @@ interface CacheEntity extends Accountable { /** * Called after the value was loaded. */ - void onCached(Key key, BytesReference value); + void onCached(Key key, BytesReference value, TierType tierType); /** * Returns true iff the resource behind this entity is still open ie. @@ -229,12 +238,12 @@ interface CacheEntity extends Accountable { /** * Called each time this entity has a cache hit. */ - void onHit(); + void onHit(TierType tierType); /** * Called each time this entity has a cache miss. */ - void onMiss(); + void onMiss(TierType tierType); /** * Called when this entity instance is removed @@ -247,7 +256,7 @@ interface CacheEntity extends Accountable { * * @opensearch.internal */ - static class Key implements Accountable { + public static class Key implements Accountable { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality @@ -328,6 +337,9 @@ public int hashCode() { } } + /** + * Logic to clean up in-memory cache. + */ synchronized void cleanCache() { final Set currentKeysToClean = new HashSet<>(); final Set currentFullClean = new HashSet<>(); @@ -344,7 +356,7 @@ synchronized void cleanCache() { } } if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { - for (Iterator iterator = cache.keys().iterator(); iterator.hasNext();) { + for (Iterator iterator = tieredCacheService.getOnHeapCachingTier().keys().iterator(); iterator.hasNext();) { Key key = iterator.next(); if (currentFullClean.contains(key.entity.getCacheIdentity())) { iterator.remove(); @@ -355,15 +367,14 @@ synchronized void cleanCache() { } } } - - cache.refresh(); + tieredCacheService.getOnHeapCachingTier().refresh(); } /** * Returns the current size of the cache */ - int count() { - return cache.count(); + long count() { + return tieredCacheService.count(); } int numRegisteredCloseListeners() { // for testing diff --git a/server/src/main/java/org/opensearch/indices/OnHeapCachingTier.java b/server/src/main/java/org/opensearch/indices/OnHeapCachingTier.java new file mode 100644 index 0000000000000..f7d68c27c1904 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/OnHeapCachingTier.java @@ -0,0 +1,16 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +/** + * This is specific to onHeap caching tier and can be used to add methods which are specific to this tier. + * @param Type of key + * @param Type of value + */ +public interface OnHeapCachingTier extends CachingTier {} diff --git a/server/src/main/java/org/opensearch/indices/OpenSearchOnHeapCache.java b/server/src/main/java/org/opensearch/indices/OpenSearchOnHeapCache.java new file mode 100644 index 0000000000000..189b741ed1696 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/OpenSearchOnHeapCache.java @@ -0,0 +1,124 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.common.cache.Cache; +import org.opensearch.common.cache.CacheBuilder; +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.unit.TimeValue; + +import java.util.concurrent.ExecutionException; +import java.util.function.ToLongBiFunction; + +public class OpenSearchOnHeapCache implements OnHeapCachingTier, RemovalListener { + + private final Cache cache; + private RemovalListener removalListener; + + private OpenSearchOnHeapCache(long maxWeightInBytes, ToLongBiFunction weigher, TimeValue expireAfterAcess) { + CacheBuilder cacheBuilder = CacheBuilder.builder() + .setMaximumWeight(maxWeightInBytes) + .weigher(weigher) + .removalListener(this); + if (expireAfterAcess != null) { + cacheBuilder.setExpireAfterAccess(expireAfterAcess); + } + cache = cacheBuilder.build(); + } + + @Override + public void setRemovalListener(RemovalListener removalListener) { + this.removalListener = removalListener; + } + + @Override + public void invalidateAll() { + cache.invalidateAll(); + } + + @Override + public Iterable keys() { + return this.cache.keys(); + } + + @Override + public int count() { + return cache.count(); + } + + @Override + public TierType getTierType() { + return TierType.ON_HEAP; + } + + @Override + public V get(K key) { + return cache.get(key); + } + + @Override + public void put(K key, V value) { + cache.put(key, value); + } + + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws ExecutionException { + return cache.computeIfAbsent(key, key1 -> loader.load(key)); + } + + @Override + public void invalidate(K key) { + cache.invalidate(key); + } + + @Override + public V compute(K key, TieredCacheLoader loader) throws Exception { + return cache.compute(key, key1 -> loader.load(key)); + } + + @Override + public void refresh() { + cache.refresh(); + } + + @Override + public void onRemoval(RemovalNotification notification) { + removalListener.onRemoval(notification); + } + + public static class Builder { + private long maxWeightInBytes; + + private ToLongBiFunction weigher; + + private TimeValue expireAfterAcess; + + public Builder() {} + + public Builder setMaximumWeight(long sizeInBytes) { + this.maxWeightInBytes = sizeInBytes; + return this; + } + + public Builder setWeigher(ToLongBiFunction weigher) { + this.weigher = weigher; + return this; + } + + public Builder setExpireAfterAccess(TimeValue expireAfterAcess) { + this.expireAfterAcess = expireAfterAcess; + return this; + } + + public OpenSearchOnHeapCache build() { + return new OpenSearchOnHeapCache(maxWeightInBytes, weigher, expireAfterAcess); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/TierType.java b/server/src/main/java/org/opensearch/indices/TierType.java new file mode 100644 index 0000000000000..9a286fd26151b --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/TierType.java @@ -0,0 +1,15 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +public enum TierType { + + ON_HEAP, + DISK; +} diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheEventListener.java b/server/src/main/java/org/opensearch/indices/TieredCacheEventListener.java new file mode 100644 index 0000000000000..084ac5a57e0d3 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/TieredCacheEventListener.java @@ -0,0 +1,22 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.common.cache.RemovalNotification; + +public interface TieredCacheEventListener { + + void onMiss(K key, TierType tierType); + + void onRemoval(RemovalNotification notification); + + void onHit(K key, V value, TierType tierType); + + void onCached(K key, V value, TierType tierType); +} diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheLoader.java b/server/src/main/java/org/opensearch/indices/TieredCacheLoader.java new file mode 100644 index 0000000000000..f6bb1a74e973e --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/TieredCacheLoader.java @@ -0,0 +1,15 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +public interface TieredCacheLoader { + V load(K key) throws Exception; + + boolean isLoaded(); +} diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheService.java b/server/src/main/java/org/opensearch/indices/TieredCacheService.java new file mode 100644 index 0000000000000..59e5e0e00b6c1 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/TieredCacheService.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import java.util.Optional; + +/** + * This service encapsulates all logic to write/fetch to/from appropriate tiers. Can be implemented with different + * flavors like spillover etc. + * @param Type of key + * @param Type of value + */ +public interface TieredCacheService { + + V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception; + + V get(K key); + + void invalidate(K key); + + void invalidateAll(); + + long count(); + + OnHeapCachingTier getOnHeapCachingTier(); + + Optional> getDiskCachingTier(); +} diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyService.java b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyService.java new file mode 100644 index 0000000000000..7799170a1ede9 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyService.java @@ -0,0 +1,231 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +/** + * This service spillover the evicted items from upper tier to lower tier. For now, we are spilling the in-memory + * cache items to disk tier cache. + * @param Type of key + * @param Type of value + */ +public class TieredCacheSpilloverStrategyService implements TieredCacheService, RemovalListener { + + private final OnHeapCachingTier onHeapCachingTier; + + /** + * Optional in case tiered caching is turned off. + */ + private final Optional> diskCachingTier; + private final TieredCacheEventListener tieredCacheEventListener; + + /** + * Maintains caching tiers in order of get calls. + */ + private final List> cachingTierList; + + private TieredCacheSpilloverStrategyService( + OnHeapCachingTier onHeapCachingTier, + DiskCachingTier diskCachingTier, + TieredCacheEventListener tieredCacheEventListener + ) { + this.onHeapCachingTier = Objects.requireNonNull(onHeapCachingTier); + this.diskCachingTier = Optional.ofNullable(diskCachingTier); + this.tieredCacheEventListener = Objects.requireNonNull(tieredCacheEventListener); + this.cachingTierList = this.diskCachingTier.map(diskTier -> Arrays.asList(onHeapCachingTier, diskTier)) + .orElse(List.of(onHeapCachingTier)); + setRemovalListeners(); + } + + /** + * This method logic is divided into 2 parts: + * 1. First check whether key is present or not in desired tier. If yes, return the value. + * 2. If the key is not present, then add the key/value pair to onHeap cache. + * @param key Key for lookup. + * @param loader Used to load value in case it is not present in any tier. + * @return value + * @throws Exception + */ + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { + CacheValue cacheValue = getValueFromTierCache(true).apply(key); + if (cacheValue == null) { + // Add the value to the onHeap cache. Any items if evicted will be moved to lower tier. + V value = onHeapCachingTier.compute(key, loader); + tieredCacheEventListener.onCached(key, value, TierType.ON_HEAP); + return value; + } + return cacheValue.value; + } + + @Override + public V get(K key) { + CacheValue cacheValue = getValueFromTierCache(true).apply(key); + if (cacheValue == null) { + return null; + } + return cacheValue.value; + } + + /** + * First fetches the tier type which has this key. And then invalidate accordingly. + * @param key + */ + @Override + public void invalidate(K key) { + // We don't need to track hits/misses in this case. + CacheValue cacheValue = getValueFromTierCache(false).apply(key); + if (cacheValue != null) { + switch (cacheValue.source) { + case ON_HEAP: + onHeapCachingTier.invalidate(key); + break; + case DISK: + diskCachingTier.ifPresent(diskTier -> diskTier.invalidate(key)); + break; + default: + break; + } + } + } + + @Override + public void invalidateAll() { + for (CachingTier cachingTier : cachingTierList) { + cachingTier.invalidateAll(); + } + } + + /** + * Returns the total count of items present in all cache tiers. + * @return total count of items in cache + */ + @Override + public long count() { + long totalCount = 0; + for (CachingTier cachingTier : cachingTierList) { + totalCount += cachingTier.count(); + } + return totalCount; + } + + /** + * Called whenever an item is evicted from any cache tier. If the item was evicted from onHeap cache, it is moved + * to disk tier cache. In case it was evicted from disk tier cache, it will discarded. + * @param notification Contains info about the removal like reason, key/value etc. + */ + @Override + public void onRemoval(RemovalNotification notification) { + if (RemovalReason.EVICTED.equals(notification.getRemovalReason())) { + switch (notification.getTierType()) { + case ON_HEAP: + diskCachingTier.ifPresent(diskTier -> { + diskTier.put(notification.getKey(), notification.getValue()); + tieredCacheEventListener.onCached(notification.getKey(), notification.getValue(), TierType.DISK); + }); + break; + default: + break; + } + } + tieredCacheEventListener.onRemoval(notification); + } + + @Override + public OnHeapCachingTier getOnHeapCachingTier() { + return this.onHeapCachingTier; + } + + @Override + public Optional> getDiskCachingTier() { + return this.diskCachingTier; + } + + /** + * Register this service as a listener to removal events from different caching tiers. + */ + private void setRemovalListeners() { + for (CachingTier cachingTier : cachingTierList) { + cachingTier.setRemovalListener(this); + } + } + + private Function> getValueFromTierCache(boolean trackStats) { + return key -> { + for (CachingTier cachingTier : cachingTierList) { + V value = cachingTier.get(key); + if (value != null) { + if (trackStats) { + tieredCacheEventListener.onHit(key, value, cachingTier.getTierType()); + } + return new CacheValue<>(value, cachingTier.getTierType()); + } + if (trackStats) { + tieredCacheEventListener.onMiss(key, cachingTier.getTierType()); + } + } + return null; + }; + } + + /** + * Represents a cache value along with its associated tier type where it is stored. + * @param Type of value. + */ + public static class CacheValue { + V value; + TierType source; + + CacheValue(V value, TierType source) { + this.value = value; + this.source = source; + } + } + + public static class Builder { + private OnHeapCachingTier onHeapCachingTier; + private DiskCachingTier diskCachingTier; + private TieredCacheEventListener tieredCacheEventListener; + + public Builder() {} + + public Builder setOnHeapCachingTier(OnHeapCachingTier onHeapCachingTier) { + this.onHeapCachingTier = onHeapCachingTier; + return this; + } + + public Builder setOnDiskCachingTier(DiskCachingTier diskCachingTier) { + this.diskCachingTier = diskCachingTier; + return this; + } + + public Builder setTieredCacheEventListener(TieredCacheEventListener tieredCacheEventListener) { + this.tieredCacheEventListener = tieredCacheEventListener; + return this; + } + + public TieredCacheSpilloverStrategyService build() { + return new TieredCacheSpilloverStrategyService( + this.onHeapCachingTier, + this.diskCachingTier, + this.tieredCacheEventListener + ); + } + } + +} diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java index 415844dccb611..2fe1f87f5d828 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java @@ -321,7 +321,7 @@ public long ramBytesUsed() { } @Override - public void onCached(Key key, BytesReference value) {} + public void onCached(Key key, BytesReference value, TierType tierType) {} @Override public boolean isOpen() { @@ -334,10 +334,10 @@ public Object getCacheIdentity() { } @Override - public void onHit() {} + public void onHit(TierType tierType) {} @Override - public void onMiss() {} + public void onMiss(TierType tierType) {} @Override public void onRemoval(RemovalNotification notification) {} diff --git a/server/src/test/java/org/opensearch/indices/TieredCacheSpilloverStrategyServiceTests.java b/server/src/test/java/org/opensearch/indices/TieredCacheSpilloverStrategyServiceTests.java new file mode 100644 index 0000000000000..4c4c7f195ba31 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/TieredCacheSpilloverStrategyServiceTests.java @@ -0,0 +1,458 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +public class TieredCacheSpilloverStrategyServiceTests extends OpenSearchTestCase { + + public void testComputeAndAbsentWithoutAnyOnHeapCacheEviction() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + MockTieredCacheEventListener eventListener = new MockTieredCacheEventListener(); + TieredCacheSpilloverStrategyService spilloverStrategyService = intializeTieredCacheService( + onHeapCacheSize, + randomIntBetween(1, 4), + eventListener + ); + int numOfItems1 = randomIntBetween(1, onHeapCacheSize / 2 - 1); + List keys = new ArrayList<>(); + // Put values in cache. + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + keys.add(key); + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(key, tieredCacheLoader); + } + assertEquals(numOfItems1, eventListener.enumMap.get(TierType.ON_HEAP).missCount.count()); + assertEquals(0, eventListener.enumMap.get(TierType.ON_HEAP).hitCount.count()); + assertEquals(0, eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count()); + + // Try to hit cache again with some randomization. + int numOfItems2 = randomIntBetween(1, onHeapCacheSize / 2 - 1); + int cacheHit = 0; + int cacheMiss = 0; + for (int iter = 0; iter < numOfItems2; iter++) { + if (randomBoolean()) { + // Hit cache with stored key + cacheHit++; + int index = randomIntBetween(0, keys.size() - 1); + spilloverStrategyService.computeIfAbsent(keys.get(index), getTieredCacheLoader()); + } else { + // Hit cache with randomized key which is expected to miss cache always. + spilloverStrategyService.computeIfAbsent(UUID.randomUUID().toString(), getTieredCacheLoader()); + cacheMiss++; + } + } + assertEquals(cacheHit, eventListener.enumMap.get(TierType.ON_HEAP).hitCount.count()); + assertEquals(numOfItems1 + cacheMiss, eventListener.enumMap.get(TierType.ON_HEAP).missCount.count()); + assertEquals(0, eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count()); + } + + public void testComputeAndAbsentWithEvictionsFromOnHeapCache() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(60, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + MockTieredCacheEventListener eventListener = new MockTieredCacheEventListener(); + TieredCacheSpilloverStrategyService spilloverStrategyService = intializeTieredCacheService( + onHeapCacheSize, + diskCacheSize, + eventListener + ); + + // Put values in cache more than it's size and cause evictions from onHeap. + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize); + List onHeapKeys = new ArrayList<>(); + List diskTierKeys = new ArrayList<>(); + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + if (iter > (onHeapCacheSize - 1)) { + // All these are bound to go to disk based cache. + diskTierKeys.add(key); + } else { + onHeapKeys.add(key); + } + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(key, tieredCacheLoader); + } + assertEquals(numOfItems1, eventListener.enumMap.get(TierType.ON_HEAP).missCount.count()); + assertEquals(0, eventListener.enumMap.get(TierType.ON_HEAP).hitCount.count()); + assertTrue(eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count() > 0); + + assertEquals( + eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count(), + eventListener.enumMap.get(TierType.DISK).cachedCount.count() + ); + assertEquals(diskTierKeys.size(), eventListener.enumMap.get(TierType.DISK).cachedCount.count()); + + // Try to hit cache again with some randomization. + int numOfItems2 = randomIntBetween(50, 200); + int onHeapCacheHit = 0; + int diskCacheHit = 0; + int cacheMiss = 0; + for (int iter = 0; iter < numOfItems2; iter++) { + if (randomBoolean()) { + if (randomBoolean()) { // Hit cache with key stored in onHeap cache. + onHeapCacheHit++; + int index = randomIntBetween(0, onHeapKeys.size() - 1); + spilloverStrategyService.computeIfAbsent(onHeapKeys.get(index), getTieredCacheLoader()); + } else { // Hit cache with key stored in disk cache. + diskCacheHit++; + int index = randomIntBetween(0, diskTierKeys.size() - 1); + spilloverStrategyService.computeIfAbsent(diskTierKeys.get(index), getTieredCacheLoader()); + } + } else { + // Hit cache with randomized key which is expected to miss cache always. + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(UUID.randomUUID().toString(), tieredCacheLoader); + cacheMiss++; + } + } + // On heap cache misses would also include diskCacheHits as it means it missed onHeap cache. + assertEquals(numOfItems1 + cacheMiss + diskCacheHit, eventListener.enumMap.get(TierType.ON_HEAP).missCount.count()); + assertEquals(onHeapCacheHit, eventListener.enumMap.get(TierType.ON_HEAP).hitCount.count()); + assertEquals(cacheMiss + numOfItems1, eventListener.enumMap.get(TierType.DISK).missCount.count()); + assertEquals(diskCacheHit, eventListener.enumMap.get(TierType.DISK).hitCount.count()); + } + + public void testComputeAndAbsentWithEvictionsFromBothTier() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + + MockTieredCacheEventListener eventListener = new MockTieredCacheEventListener(); + TieredCacheSpilloverStrategyService spilloverStrategyService = intializeTieredCacheService( + onHeapCacheSize, + diskCacheSize, + eventListener + ); + + int numOfItems = randomIntBetween(totalSize + 1, totalSize * 3); + for (int iter = 0; iter < numOfItems; iter++) { + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(UUID.randomUUID().toString(), tieredCacheLoader); + } + assertTrue(eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count() > 0); + assertTrue(eventListener.enumMap.get(TierType.DISK).evictionsMetric.count() > 0); + } + + public void testGetAndCount() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + + MockTieredCacheEventListener eventListener = new MockTieredCacheEventListener(); + TieredCacheSpilloverStrategyService spilloverStrategyService = intializeTieredCacheService( + onHeapCacheSize, + diskCacheSize, + eventListener + ); + + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize); + List onHeapKeys = new ArrayList<>(); + List diskTierKeys = new ArrayList<>(); + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + if (iter > (onHeapCacheSize - 1)) { + // All these are bound to go to disk based cache. + diskTierKeys.add(key); + } else { + onHeapKeys.add(key); + } + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(key, tieredCacheLoader); + } + + for (int iter = 0; iter < numOfItems1; iter++) { + if (randomBoolean()) { + if (randomBoolean()) { + int index = randomIntBetween(0, onHeapKeys.size() - 1); + assertNotNull(spilloverStrategyService.get(onHeapKeys.get(index))); + } else { + int index = randomIntBetween(0, diskTierKeys.size() - 1); + assertNotNull(spilloverStrategyService.get(diskTierKeys.get(index))); + } + } else { + assertNull(spilloverStrategyService.get(UUID.randomUUID().toString())); + } + } + assertEquals(numOfItems1, spilloverStrategyService.count()); + } + + public void testWithDiskTierNull() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + MockTieredCacheEventListener eventListener = new MockTieredCacheEventListener(); + TieredCacheSpilloverStrategyService spilloverStrategyService = new TieredCacheSpilloverStrategyService.Builder< + String, + String>().setOnHeapCachingTier(new MockOnHeapCacheTier<>(onHeapCacheSize)).setTieredCacheEventListener(eventListener).build(); + int numOfItems = randomIntBetween(onHeapCacheSize + 1, onHeapCacheSize * 3); + for (int iter = 0; iter < numOfItems; iter++) { + TieredCacheLoader tieredCacheLoader = getTieredCacheLoader(); + spilloverStrategyService.computeIfAbsent(UUID.randomUUID().toString(), tieredCacheLoader); + } + assertTrue(eventListener.enumMap.get(TierType.ON_HEAP).evictionsMetric.count() > 0); + assertEquals(0, eventListener.enumMap.get(TierType.DISK).cachedCount.count()); + assertEquals(0, eventListener.enumMap.get(TierType.DISK).evictionsMetric.count()); + assertEquals(0, eventListener.enumMap.get(TierType.DISK).missCount.count()); + } + + private TieredCacheLoader getTieredCacheLoader() { + return new TieredCacheLoader() { + boolean isLoaded = false; + + @Override + public String load(String key) { + isLoaded = true; + return UUID.randomUUID().toString(); + } + + @Override + public boolean isLoaded() { + return isLoaded; + } + }; + } + + private TieredCacheSpilloverStrategyService intializeTieredCacheService( + int onHeapCacheSize, + int diksCacheSize, + TieredCacheEventListener cacheEventListener + ) { + DiskCachingTier diskCache = new MockDiskCachingTier<>(diksCacheSize); + OnHeapCachingTier openSearchOnHeapCache = new MockOnHeapCacheTier<>(onHeapCacheSize); + return new TieredCacheSpilloverStrategyService.Builder().setOnHeapCachingTier(openSearchOnHeapCache) + .setOnDiskCachingTier(diskCache) + .setTieredCacheEventListener(cacheEventListener) + .build(); + } + + class MockOnHeapCacheTier implements OnHeapCachingTier, RemovalListener { + + Map onHeapCacheTier; + int maxSize; + private RemovalListener removalListener; + + MockOnHeapCacheTier(int size) { + maxSize = size; + this.onHeapCacheTier = new ConcurrentHashMap(); + } + + @Override + public V get(K key) { + return this.onHeapCacheTier.get(key); + } + + @Override + public void put(K key, V value) { + this.onHeapCacheTier.put(key, value); + } + + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { + if (this.onHeapCacheTier.size() > maxSize) { // If it exceeds, just notify for evict. + onRemoval(new RemovalNotification<>(key, loader.load(key), RemovalReason.EVICTED, TierType.ON_HEAP)); + return loader.load(key); + } + return this.onHeapCacheTier.computeIfAbsent(key, k -> { + try { + return loader.load(key); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void invalidate(K key) { + this.onHeapCacheTier.remove(key); + } + + @Override + public V compute(K key, TieredCacheLoader loader) throws Exception { + if (this.onHeapCacheTier.size() >= maxSize) { // If it exceeds, just notify for evict. + onRemoval(new RemovalNotification<>(key, loader.load(key), RemovalReason.EVICTED, TierType.ON_HEAP)); + return loader.load(key); + } + return this.onHeapCacheTier.compute(key, ((k, v) -> { + try { + return loader.load(key); + } catch (Exception e) { + throw new RuntimeException(e); + } + })); + } + + @Override + public void setRemovalListener(RemovalListener removalListener) { + this.removalListener = removalListener; + } + + @Override + public void invalidateAll() { + this.onHeapCacheTier.clear(); + } + + @Override + public Iterable keys() { + return this.onHeapCacheTier.keySet(); + } + + @Override + public int count() { + return this.onHeapCacheTier.size(); + } + + @Override + public TierType getTierType() { + return TierType.ON_HEAP; + } + + @Override + public void onRemoval(RemovalNotification notification) { + removalListener.onRemoval(notification); + } + } + + class MockTieredCacheEventListener implements TieredCacheEventListener { + + EnumMap enumMap = new EnumMap<>(TierType.class); + + MockTieredCacheEventListener() { + for (TierType tierType : TierType.values()) { + enumMap.put(tierType, new TestStatsHolder()); + } + } + + @Override + public void onMiss(K key, TierType tierType) { + enumMap.get(tierType).missCount.inc(); + } + + @Override + public void onRemoval(RemovalNotification notification) { + if (notification.getRemovalReason().equals(RemovalReason.EVICTED)) { + enumMap.get(notification.getTierType()).evictionsMetric.inc(); + } + } + + @Override + public void onHit(K key, V value, TierType tierType) { + enumMap.get(tierType).hitCount.inc(); + } + + @Override + public void onCached(K key, V value, TierType tierType) { + enumMap.get(tierType).cachedCount.inc(); + } + + class TestStatsHolder { + final CounterMetric evictionsMetric = new CounterMetric(); + final CounterMetric hitCount = new CounterMetric(); + final CounterMetric missCount = new CounterMetric(); + + final CounterMetric cachedCount = new CounterMetric(); + } + } + + class MockDiskCachingTier implements DiskCachingTier, RemovalListener { + Map diskTier; + private RemovalListener removalListener; + int maxSize; + + MockDiskCachingTier(int size) { + this.maxSize = size; + diskTier = new ConcurrentHashMap(); + } + + @Override + public V get(K key) { + return this.diskTier.get(key); + } + + @Override + public void put(K key, V value) { + if (this.diskTier.size() >= maxSize) { // For simplification + onRemoval(new RemovalNotification<>(key, value, RemovalReason.EVICTED, TierType.DISK)); + return; + } + this.diskTier.put(key, value); + } + + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { + return this.diskTier.computeIfAbsent(key, k -> { + try { + return loader.load(k); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void invalidate(K key) { + this.diskTier.remove(key); + } + + @Override + public V compute(K key, TieredCacheLoader loader) throws Exception { + if (this.diskTier.size() >= maxSize) { // If it exceeds, just notify for evict. + onRemoval(new RemovalNotification<>(key, loader.load(key), RemovalReason.EVICTED, TierType.DISK)); + return loader.load(key); + } + return this.diskTier.compute(key, (k, v) -> { + try { + return loader.load(key); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void setRemovalListener(RemovalListener removalListener) { + this.removalListener = removalListener; + } + + @Override + public void invalidateAll() { + this.diskTier.clear(); + } + + @Override + public Iterable keys() { + return null; + } + + @Override + public int count() { + return this.diskTier.size(); + } + + @Override + public TierType getTierType() { + return TierType.DISK; + } + + @Override + public void onRemoval(RemovalNotification notification) { + this.removalListener.onRemoval(notification); + } + } +}