diff --git a/CHANGELOG.md b/CHANGELOG.md index 33866f8ae602f..598121e69c69a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561)) - Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103)) - [Metrics Framework] Adds support for asynchronous gauge metric type. ([#12642](https://github.com/opensearch-project/OpenSearch/issues/12642)) +- Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601)) +- [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542)) ### Dependencies - Bump `com.squareup.okio:okio` from 3.7.0 to 3.8.0 ([#12290](https://github.com/opensearch-project/OpenSearch/pull/12290)) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java new file mode 100644 index 0000000000000..96ef027c17187 --- /dev/null +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cache.common.policy; + +import org.opensearch.common.cache.policy.CachedQueryResult; +import org.opensearch.common.unit.TimeValue; + +import java.util.function.Function; +import java.util.function.Predicate; + +/** + * A cache tier policy which accepts queries whose took time is greater than some threshold. + * The threshold should be set to approximately the time it takes to get a result from the cache tier. + * The policy accepts values of type V and decodes them into CachedQueryResult.PolicyValues, which has the data needed + * to decide whether to admit the value. + * @param The type of data consumed by test(). + */ +public class TookTimePolicy implements Predicate { + /** + * The minimum took time to allow a query. Set to TimeValue.ZERO to let all data through. + */ + private final TimeValue threshold; + + /** + * Function which extracts the relevant PolicyValues from a serialized CachedQueryResult + */ + private final Function cachedResultParser; + + /** + * Constructs a took time policy. + * @param threshold the threshold + * @param cachedResultParser the function providing policy values + */ + public TookTimePolicy(TimeValue threshold, Function cachedResultParser) { + if (threshold.compareTo(TimeValue.ZERO) < 0) { + throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep()); + } + this.threshold = threshold; + this.cachedResultParser = cachedResultParser; + } + + /** + * Check whether to admit data. + * @param data the input argument + * @return whether to admit the data + */ + public boolean test(V data) { + long tookTimeNanos; + try { + tookTimeNanos = cachedResultParser.apply(data).getTookTimeNanos(); + } catch (Exception e) { + // If we can't read a CachedQueryResult.PolicyValues from the BytesReference, reject the data + return false; + } + + TimeValue tookTime = TimeValue.timeValueNanos(tookTimeNanos); + return tookTime.compareTo(threshold) >= 0; + } +} diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/package-info.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/package-info.java new file mode 100644 index 0000000000000..45cfb00662c98 --- /dev/null +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/policy/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** A package for policies controlling what can enter caches. */ +package org.opensearch.cache.common.policy; diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 966c0f981241c..99c9c77ff3872 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -8,19 +8,23 @@ package org.opensearch.cache.common.tier; +import org.opensearch.cache.common.policy.TookTimePolicy; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.iterable.Iterables; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -28,6 +32,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; +import java.util.function.Predicate; /** * This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap @@ -52,6 +57,7 @@ public class TieredSpilloverCache implements ICache { * Maintains caching tiers in ascending order of cache latency. */ private final List> cacheList; + private final List> policies; TieredSpilloverCache(Builder builder) { Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null"); @@ -63,7 +69,9 @@ public class TieredSpilloverCache implements ICache { @Override public void onRemoval(RemovalNotification notification) { try (ReleasableLock ignore = writeLock.acquire()) { - diskCache.put(notification.getKey(), notification.getValue()); + if (evaluatePolicies(notification.getValue())) { + diskCache.put(notification.getKey(), notification.getValue()); + } } } }) @@ -71,6 +79,8 @@ public void onRemoval(RemovalNotification notification) { .setValueType(builder.cacheConfig.getValueType()) .setSettings(builder.cacheConfig.getSettings()) .setWeigher(builder.cacheConfig.getWeigher()) + .setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes()) // TODO: Part of a workaround for an issue in TSC. Overall fix + // coming soon .build(), builder.cacheType, builder.cacheFactories @@ -78,6 +88,8 @@ public void onRemoval(RemovalNotification notification) { ); this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories); this.cacheList = Arrays.asList(onHeapCache, diskCache); + + this.policies = builder.policies; // Will never be null; builder initializes it to an empty list } // Package private for testing @@ -192,6 +204,15 @@ private Function getValueFromTieredCache() { }; } + boolean evaluatePolicies(V value) { + for (Predicate policy : policies) { + if (!policy.test(value)) { + return false; + } + } + return true; + } + /** * Factory to create TieredSpilloverCache objects. */ @@ -231,11 +252,21 @@ public ICache create(CacheConfig config, CacheType cacheType, ); } ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName); + + TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD + .getConcreteSettingForNamespace(cacheType.getSettingPrefix()) + .get(settings); + Function cachedResultParser = Objects.requireNonNull( + config.getCachedResultParser(), + "Cached result parser fn can't be null" + ); + return new Builder().setDiskCacheFactory(diskCacheFactory) .setOnHeapCacheFactory(onHeapCacheFactory) .setRemovalListener(config.getRemovalListener()) .setCacheConfig(config) .setCacheType(cacheType) + .addPolicy(new TookTimePolicy(diskPolicyThreshold, cachedResultParser)) .build(); } @@ -257,6 +288,7 @@ public static class Builder { private CacheConfig cacheConfig; private CacheType cacheType; private Map cacheFactories; + private final ArrayList> policies = new ArrayList<>(); /** * Default constructor @@ -323,6 +355,26 @@ public Builder setCacheFactories(Map cacheFactorie return this; } + /** + * Set a cache policy to be used to limit access to this cache's disk tier. + * @param policy the policy + * @return builder + */ + public Builder addPolicy(Predicate policy) { + this.policies.add(policy); + return this; + } + + /** + * Set multiple policies to be used to limit access to this cache's disk tier. + * @param policies the policies + * @return builder + */ + public Builder addPolicies(List> policies) { + this.policies.addAll(policies); + return this; + } + /** * Build tiered spillover cache. * @return TieredSpilloverCache diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java index 6b0620c5fbede..0cc8a711faaf5 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java @@ -51,6 +51,11 @@ public List> getSettings() { settingList.add( TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) ); + settingList.add( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace( + cacheType.getSettingPrefix() + ) + ); } return settingList; } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java index 50b4177f599d1..684307960b8a5 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java @@ -9,6 +9,9 @@ package org.opensearch.cache.common.tier; import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; + +import java.util.concurrent.TimeUnit; import static org.opensearch.common.settings.Setting.Property.NodeScope; @@ -36,6 +39,21 @@ public class TieredSpilloverCacheSettings { (key) -> Setting.simpleString(key, "", NodeScope) ); + /** + * Setting defining the minimum took time for a query to be allowed into the disk cache. + */ + public static final Setting.AffixSetting TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting( + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold", + (key) -> Setting.timeSetting( + key, + new TimeValue(10, TimeUnit.MILLISECONDS), // Default value for this setting + TimeValue.ZERO, // Minimum value for this setting + NodeScope + ) + ); + // 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range. + // Will be tuned further with future benchmarks. + /** * Default constructor */ diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java new file mode 100644 index 0000000000000..237c9c7b79db4 --- /dev/null +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cache.common.policy; + +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TotalHits; +import org.opensearch.common.Randomness; +import org.opensearch.common.cache.policy.CachedQueryResult; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.search.TopDocsAndMaxScore; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.search.DocValueFormat; +import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Random; +import java.util.function.Function; + +public class TookTimePolicyTests extends OpenSearchTestCase { + private final Function transformationFunction = (data) -> { + try { + return CachedQueryResult.getPolicyValues(data); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + + private TookTimePolicy getTookTimePolicy(TimeValue threshold) { + return new TookTimePolicy<>(threshold, transformationFunction); + } + + public void testTookTimePolicy() throws Exception { + double threshMillis = 10; + long shortMillis = (long) (0.9 * threshMillis); + long longMillis = (long) (1.5 * threshMillis); + TookTimePolicy tookTimePolicy = getTookTimePolicy(new TimeValue((long) threshMillis)); + BytesReference shortTime = getValidPolicyInput(shortMillis * 1000000); + BytesReference longTime = getValidPolicyInput(longMillis * 1000000); + + boolean shortResult = tookTimePolicy.test(shortTime); + assertFalse(shortResult); + boolean longResult = tookTimePolicy.test(longTime); + assertTrue(longResult); + + TookTimePolicy disabledPolicy = getTookTimePolicy(TimeValue.ZERO); + shortResult = disabledPolicy.test(shortTime); + assertTrue(shortResult); + longResult = disabledPolicy.test(longTime); + assertTrue(longResult); + } + + public void testNegativeOneInput() throws Exception { + // PolicyValues with -1 took time can be passed to this policy if we shouldn't accept it for whatever reason + TookTimePolicy tookTimePolicy = getTookTimePolicy(TimeValue.ZERO); + BytesReference minusOne = getValidPolicyInput(-1L); + assertFalse(tookTimePolicy.test(minusOne)); + } + + public void testInvalidThreshold() throws Exception { + assertThrows(IllegalArgumentException.class, () -> getTookTimePolicy(TimeValue.MINUS_ONE)); + } + + private BytesReference getValidPolicyInput(Long tookTimeNanos) throws IOException { + // When it's used in the cache, the policy will receive BytesReferences which come from + // serializing a CachedQueryResult. + CachedQueryResult cachedQueryResult = new CachedQueryResult(getQSR(), tookTimeNanos); + BytesStreamOutput out = new BytesStreamOutput(); + cachedQueryResult.writeToNoId(out); + return out.bytes(); + } + + private QuerySearchResult getQSR() { + // We can't mock the QSR with mockito because the class is final. Construct a real one + QuerySearchResult mockQSR = new QuerySearchResult(); + + // duplicated from DfsQueryPhaseTests.java + mockQSR.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), + 2.0F + ), + new DocValueFormat[0] + ); + return mockQSR; + } + + private void writeRandomBytes(StreamOutput out, int numBytes) throws IOException { + Random rand = Randomness.get(); + byte[] bytes = new byte[numBytes]; + rand.nextBytes(bytes); + out.writeBytes(bytes); + } +} diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index c9608b7184d2a..3e4fb0efd092e 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -13,16 +13,20 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -31,6 +35,8 @@ import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Predicate; import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; @@ -121,6 +127,12 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception .setWeigher((k, v) -> keyValueSize) .setRemovalListener(removalListener) .setSettings(settings) + .setCachedResultParser(new Function() { + @Override + public CachedQueryResult.PolicyValues apply(String s) { + return new CachedQueryResult.PolicyValues(20_000_000L); + } + }) // Values will always appear to have taken 20_000_000 ns = 20 ms to compute .build(), CacheType.INDICES_REQUEST_CACHE, Map.of( @@ -835,6 +847,210 @@ public void onRemoval(RemovalNotification notification) { } } + public void testDiskTierPolicies() throws Exception { + // For policy function, allow if what it receives starts with "a" and string is even length + ArrayList> policies = new ArrayList<>(); + policies.add(new AllowFirstLetterA()); + policies.add(new AllowEvenLengths()); + + int keyValueSize = 50; + int onHeapCacheSize = 0; + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + keyValueSize, + 100, + removalListener, + Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * 50 + "b" + ) + .build(), + 0, + policies + ); + + Map keyValuePairs = new HashMap<>(); + Map expectedOutputs = new HashMap<>(); + keyValuePairs.put("key1", "abcd"); + expectedOutputs.put("key1", true); + keyValuePairs.put("key2", "abcde"); + expectedOutputs.put("key2", false); + keyValuePairs.put("key3", "bbc"); + expectedOutputs.put("key3", false); + keyValuePairs.put("key4", "ab"); + expectedOutputs.put("key4", true); + keyValuePairs.put("key5", ""); + expectedOutputs.put("key5", false); + + LoadAwareCacheLoader loader = new LoadAwareCacheLoader() { + boolean isLoaded = false; + + @Override + public boolean isLoaded() { + return isLoaded; + } + + @Override + public String load(String key) throws Exception { + isLoaded = true; + return keyValuePairs.get(key); + } + }; + + for (String key : keyValuePairs.keySet()) { + Boolean expectedOutput = expectedOutputs.get(key); + String value = tieredSpilloverCache.computeIfAbsent(key, loader); + assertEquals(keyValuePairs.get(key), value); + String result = tieredSpilloverCache.get(key); + if (expectedOutput) { + // Should retrieve from disk tier if it was accepted + assertEquals(keyValuePairs.get(key), result); + } else { + // Should miss as heap tier size = 0 and the policy rejected it + assertNull(result); + } + } + } + + public void testTookTimePolicyFromFactory() throws Exception { + // Mock took time by passing this map to the policy info wrapper fn + // The policy inspects values, not keys, so this is a map from values -> took time + Map tookTimeMap = new HashMap<>(); + tookTimeMap.put("a", 10_000_000L); + tookTimeMap.put("b", 0L); + tookTimeMap.put("c", 99_999_999L); + tookTimeMap.put("d", null); + tookTimeMap.put("e", -1L); + tookTimeMap.put("f", 8_888_888L); + long timeValueThresholdNanos = 10_000_000L; + + Map keyValueMap = Map.of("A", "a", "B", "b", "C", "c", "D", "d", "E", "e", "F", "f"); + + // Most of setup duplicated from testComputeIfAbsentWithFactoryBasedCacheCreation() + int onHeapCacheSize = randomIntBetween(tookTimeMap.size() + 1, tookTimeMap.size() + 30); + int diskCacheSize = tookTimeMap.size(); + int keyValueSize = 50; + + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + + // Set the desired settings needed to create a TieredSpilloverCache object with INDICES_REQUEST_CACHE cacheType. + Settings settings = Settings.builder() + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ).getKey(), + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME + ) + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ).getKey(), + MockDiskCache.MockDiskCacheFactory.NAME + ) + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * keyValueSize + "b" + ) + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ).getKey(), + new TimeValue(timeValueThresholdNanos / 1_000_000) + ) + .build(); + + ICache tieredSpilloverICache = new TieredSpilloverCache.TieredSpilloverCacheFactory().create( + new CacheConfig.Builder().setKeyType(String.class) + .setKeyType(String.class) + .setWeigher((k, v) -> keyValueSize) + .setRemovalListener(removalListener) + .setSettings(settings) + .setMaxSizeInBytes(onHeapCacheSize * keyValueSize) + .setCachedResultParser(new Function() { + @Override + public CachedQueryResult.PolicyValues apply(String s) { + return new CachedQueryResult.PolicyValues(tookTimeMap.get(s)); + } + }) + .build(), + CacheType.INDICES_REQUEST_CACHE, + Map.of( + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, + new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(), + MockDiskCache.MockDiskCacheFactory.NAME, + new MockDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) + ) + ); + + TieredSpilloverCache tieredSpilloverCache = (TieredSpilloverCache) tieredSpilloverICache; + + // First add all our values to the on heap cache + for (String key : tookTimeMap.keySet()) { + tieredSpilloverCache.computeIfAbsent(key, getLoadAwareCacheLoader(keyValueMap)); + } + assertEquals(tookTimeMap.size(), tieredSpilloverCache.count()); + + // Ensure all these keys get evicted from the on heap tier by adding > heap tier size worth of random keys + for (int i = 0; i < onHeapCacheSize; i++) { + tieredSpilloverCache.computeIfAbsent(UUID.randomUUID().toString(), getLoadAwareCacheLoader(keyValueMap)); + } + ICache onHeapCache = tieredSpilloverCache.getOnHeapCache(); + for (String key : tookTimeMap.keySet()) { + assertNull(onHeapCache.get(key)); + } + + // Now the original keys should be in the disk tier if the policy allows them, or misses if not + for (String key : tookTimeMap.keySet()) { + String computedValue = tieredSpilloverCache.get(key); + String mapValue = keyValueMap.get(key); + Long tookTime = tookTimeMap.get(mapValue); + if (tookTime != null && tookTime > timeValueThresholdNanos) { + // expect a hit + assertNotNull(computedValue); + } else { + // expect a miss + assertNull(computedValue); + } + } + } + + public void testMinimumThresholdSettingValue() throws Exception { + // Confirm we can't set TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD to below + // TimeValue.ZERO (for example, MINUS_ONE) + Setting concreteSetting = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD + .getConcreteSettingForNamespace(CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()); + TimeValue validDuration = new TimeValue(0, TimeUnit.MILLISECONDS); + Settings validSettings = Settings.builder().put(concreteSetting.getKey(), validDuration).build(); + + Settings belowThresholdSettings = Settings.builder().put(concreteSetting.getKey(), TimeValue.MINUS_ONE).build(); + + assertThrows(IllegalArgumentException.class, () -> concreteSetting.get(belowThresholdSettings)); + assertEquals(validDuration, concreteSetting.get(validSettings)); + } + + private static class AllowFirstLetterA implements Predicate { + @Override + public boolean test(String data) { + try { + return (data.charAt(0) == 'a'); + } catch (StringIndexOutOfBoundsException e) { + return false; + } + } + } + + private static class AllowEvenLengths implements Predicate { + @Override + public boolean test(String data) { + return data.length() % 2 == 0; + } + } + private LoadAwareCacheLoader getLoadAwareCacheLoader() { return new LoadAwareCacheLoader<>() { boolean isLoaded = false; @@ -852,12 +1068,45 @@ public boolean isLoaded() { }; } + private LoadAwareCacheLoader getLoadAwareCacheLoader(Map keyValueMap) { + return new LoadAwareCacheLoader<>() { + boolean isLoaded = false; + + @Override + public String load(String key) { + isLoaded = true; + String mapValue = keyValueMap.get(key); + if (mapValue == null) { + mapValue = UUID.randomUUID().toString(); + } + return mapValue; + } + + @Override + public boolean isLoaded() { + return isLoaded; + } + }; + } + private TieredSpilloverCache intializeTieredSpilloverCache( int keyValueSize, int diskCacheSize, RemovalListener removalListener, Settings settings, long diskDeliberateDelay + + ) { + return intializeTieredSpilloverCache(keyValueSize, diskCacheSize, removalListener, settings, diskDeliberateDelay, null); + } + + private TieredSpilloverCache intializeTieredSpilloverCache( + int keyValueSize, + int diskCacheSize, + RemovalListener removalListener, + Settings settings, + long diskDeliberateDelay, + List> policies ) { ICache.Factory onHeapCacheFactory = new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(); CacheConfig cacheConfig = new CacheConfig.Builder().setKeyType(String.class) @@ -877,11 +1126,16 @@ private TieredSpilloverCache intializeTieredSpilloverCache( .build(); ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize); - return new TieredSpilloverCache.Builder().setCacheType(CacheType.INDICES_REQUEST_CACHE) + TieredSpilloverCache.Builder builder = new TieredSpilloverCache.Builder().setCacheType( + CacheType.INDICES_REQUEST_CACHE + ) .setRemovalListener(removalListener) .setOnHeapCacheFactory(onHeapCacheFactory) .setDiskCacheFactory(mockDiskCacheFactory) - .setCacheConfig(cacheConfig) - .build(); + .setCacheConfig(cacheConfig); + if (policies != null) { + builder.addPolicies(policies); + } + return builder.build(); } } diff --git a/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java b/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java new file mode 100644 index 0000000000000..0a98542a05bb7 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/policy/CachedQueryResult.java @@ -0,0 +1,87 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.policy; + +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.search.internal.ShardSearchContextId; +import org.opensearch.search.query.QuerySearchResult; + +import java.io.IOException; + +/** + * A class containing a QuerySearchResult used in a cache, as well as information needed for all cache policies + * to decide whether to admit a given BytesReference. Also handles serialization/deserialization of the underlying QuerySearchResult, + * which is all that is needed outside the cache. At policy checking time, this spares us from having to create an entire + * short-lived QuerySearchResult object just to read a few values. + * @opensearch.internal + */ +public class CachedQueryResult { + private final PolicyValues policyValues; + private final QuerySearchResult qsr; + + public CachedQueryResult(QuerySearchResult qsr, long tookTimeNanos) { + this.qsr = qsr; + this.policyValues = new PolicyValues(tookTimeNanos); + } + + // Retrieve only took time from a serialized CQR, without creating a short-lived QuerySearchResult or CachedQueryResult object. + public static PolicyValues getPolicyValues(BytesReference serializedCQR) throws IOException { + StreamInput in = serializedCQR.streamInput(); + return new PolicyValues(in); + } + + // Retrieve only the QSR from a serialized CQR, and load it into an existing QSR object discarding the took time which isn't needed + // outside the cache + public static void loadQSR( + BytesReference serializedCQR, + QuerySearchResult qsr, + ShardSearchContextId id, + NamedWriteableRegistry registry + ) throws IOException { + StreamInput in = new NamedWriteableAwareStreamInput(serializedCQR.streamInput(), registry); + PolicyValues pv = new PolicyValues(in); // Read and discard PolicyValues + qsr.readFromWithId(id, in); + } + + public void writeToNoId(StreamOutput out) throws IOException { + policyValues.writeTo(out); + qsr.writeToNoId(out); + } + + /** + * A class containing information needed for all cache policies + * to decide whether to admit a given value. + */ + public static class PolicyValues implements Writeable { + final long tookTimeNanos; + // More values can be added here as they're needed for future policies + + public PolicyValues(long tookTimeNanos) { + this.tookTimeNanos = tookTimeNanos; + } + + public PolicyValues(StreamInput in) throws IOException { + this.tookTimeNanos = in.readZLong(); + } + + public long getTookTimeNanos() { + return tookTimeNanos; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeZLong(tookTimeNanos); + } + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/policy/package-info.java b/server/src/main/java/org/opensearch/common/cache/policy/package-info.java new file mode 100644 index 0000000000000..ce9c2f62d7da2 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/policy/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +/** A package for policies controlling what can enter caches. */ +package org.opensearch.common.cache.policy; diff --git a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java index fa82e9be72e6e..6ecb752f91fb9 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java +++ b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java @@ -10,9 +10,11 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import java.util.function.Function; import java.util.function.ToLongBiFunction; /** @@ -42,6 +44,8 @@ public class CacheConfig { private final RemovalListener removalListener; + /** A function which extracts policy-relevant information, such as took time, from values, to allow inspection by policies if present. */ + private Function cachedResultParser; /** * Max size in bytes for the cache. This is needed for backward compatibility. */ @@ -58,6 +62,7 @@ private CacheConfig(Builder builder) { this.settings = builder.settings; this.removalListener = builder.removalListener; this.weigher = builder.weigher; + this.cachedResultParser = builder.cachedResultParser; this.maxSizeInBytes = builder.maxSizeInBytes; this.expireAfterAccess = builder.expireAfterAccess; } @@ -82,6 +87,10 @@ public ToLongBiFunction getWeigher() { return weigher; } + public Function getCachedResultParser() { + return cachedResultParser; + } + public Long getMaxSizeInBytes() { return maxSizeInBytes; } @@ -106,6 +115,7 @@ public static class Builder { private RemovalListener removalListener; private ToLongBiFunction weigher; + private Function cachedResultParser; private long maxSizeInBytes; @@ -138,6 +148,11 @@ public Builder setWeigher(ToLongBiFunction weigher) { return this; } + public Builder setCachedResultParser(Function function) { + this.cachedResultParser = function; + return this; + } + public Builder setMaxSizeInBytes(long sizeInBytes) { this.maxSizeInBytes = sizeInBytes; return this; diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 92fb278c946f1..d22f131853a78 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -44,6 +44,7 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; @@ -136,6 +137,14 @@ public final class IndicesRequestCache implements RemovalListener { + try { + return CachedQueryResult.getPolicyValues(bytesReference); + } catch (IOException e) { + // Set took time to -1, which will always be rejected by the policy. + return new CachedQueryResult.PolicyValues(-1); + } + }) .build(), CacheType.INDICES_REQUEST_CACHE ); diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 6a01da0b2b5be..bbe93d4c6486d 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -64,6 +64,7 @@ import org.opensearch.common.CheckedSupplier; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; @@ -84,9 +85,7 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; -import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; @@ -1702,16 +1701,20 @@ public void loadIntoContext(ShardSearchRequest request, SearchContext context, Q boolean[] loadedFromCache = new boolean[] { true }; BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), out -> { + long beforeQueryPhase = System.nanoTime(); queryPhase.execute(context); - context.queryResult().writeToNoId(out); + // Write relevant info for cache tier policies before the whole QuerySearchResult, so we don't have to read + // the whole QSR into memory when we decide whether to allow it into a particular cache tier based on took time/other info + CachedQueryResult cachedQueryResult = new CachedQueryResult(context.queryResult(), System.nanoTime() - beforeQueryPhase); + cachedQueryResult.writeToNoId(out); loadedFromCache[0] = false; }); if (loadedFromCache[0]) { // restore the cached query result into the context final QuerySearchResult result = context.queryResult(); - StreamInput in = new NamedWriteableAwareStreamInput(bytesReference.streamInput(), namedWriteableRegistry); - result.readFromWithId(context.id(), in); + // Load the cached QSR into result, discarding values used only in the cache + CachedQueryResult.loadQSR(bytesReference, result, context.id(), namedWriteableRegistry); result.setSearchShardTarget(context.shardTarget()); } else if (context.queryResult().searchTimedOut()) { // we have to invalidate the cache entry if we cached a query result form a request that timed out.