Skip to content

Commit

Permalink
[Tiered Caching] Stats rework (1/3): Interfaces and implementations f…
Browse files Browse the repository at this point in the history
…or individual tiers (opensearch-project#12531)

As part of tiered caching stats, changes the common ICache interface to use ICacheKey as its key. This key contains
dimensions (for example, shard ID, index name, or tier) that can be used to aggregate stats. Also changes the
CacheStats interface to store the necessary cache stats, and to support getting stats either as a total or aggregated by
these dimensions.

Integrates these changes with OpenSearchOnHeapCache and EhcacheDiskCache. The stats implementation for the
TieredSpilloverCache will be in a followup PR.

---------

Signed-off-by: Peter Alfonsi <petealft@amazon.com>
Co-authored-by: Peter Alfonsi <petealft@amazon.com>
  • Loading branch information
peteralfonsi and Peter Alfonsi committed Aug 30, 2024
1 parent 40a8d34 commit b0de406
Show file tree
Hide file tree
Showing 24 changed files with 2,410 additions and 331 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -54,7 +56,11 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
private final RemovalListener<K, V> removalListener;

// The listener for removals from the spillover cache as a whole
// TODO: In TSC stats PR, each tier will have its own separate removal listener.
private final RemovalListener<ICacheKey<K>, V> removalListener;
private final List<String> dimensionNames;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());
Expand All @@ -70,9 +76,9 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null");

this.onHeapCache = builder.onHeapCacheFactory.create(
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<K, V>() {
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<ICacheKey<K>, V>() {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
if (SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason())
&& evaluatePolicies(notification.getValue())) {
Expand All @@ -87,6 +93,7 @@ && evaluatePolicies(notification.getValue())) {
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.setDimensionNames(builder.cacheConfig.getDimensionNames())
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes())
.setExpireAfterAccess(builder.cacheConfig.getExpireAfterAccess())
.setClusterSettings(builder.cacheConfig.getClusterSettings())
Expand All @@ -97,7 +104,7 @@ && evaluatePolicies(notification.getValue())) {
);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.cacheList = Arrays.asList(onHeapCache, diskCache);

this.dimensionNames = builder.cacheConfig.getDimensionNames();
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
}

Expand All @@ -112,19 +119,19 @@ ICache<K, V> getDiskCache() {
}

@Override
public V get(K key) {
public V get(ICacheKey<K> key) {
return getValueFromTieredCache().apply(key);
}

@Override
public void put(K key, V value) {
public void put(ICacheKey<K> key, V value) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(key, value);
}
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {

V cacheValue = getValueFromTieredCache().apply(key);
if (cacheValue == null) {
Expand All @@ -141,7 +148,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Except
}

@Override
public void invalidate(K key) {
public void invalidate(ICacheKey<K> key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
// Doing this as we don't know where it is located. We could do a get from both and check that, but what will
// also trigger a hit/miss listener event, so ignoring it for now.
Expand All @@ -167,9 +174,9 @@ public void invalidateAll() {
*/
@SuppressWarnings({ "unchecked" })
@Override
public Iterable<K> keys() {
Iterable<K>[] iterables = (Iterable<K>[]) new Iterable<?>[] { onHeapCache.keys(), diskCache.keys() };
return new ConcatenatedIterables<K>(iterables);
public Iterable<ICacheKey<K>> keys() {
Iterable<ICacheKey<K>>[] iterables = (Iterable<ICacheKey<K>>[]) new Iterable<?>[] { onHeapCache.keys(), diskCache.keys() };
return new ConcatenatedIterables<ICacheKey<K>>(iterables);
}

@Override
Expand Down Expand Up @@ -197,7 +204,12 @@ public void close() throws IOException {
}
}

private Function<K, V> getValueFromTieredCache() {
@Override
public ImmutableCacheStatsHolder stats() {
return null; // TODO: in TSC stats PR
}

private Function<ICacheKey<K>, V> getValueFromTieredCache() {
return key -> {
try (ReleasableLock ignore = readLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
Expand Down Expand Up @@ -354,7 +366,7 @@ public String getCacheName() {
public static class Builder<K, V> {
private ICache.Factory onHeapCacheFactory;
private ICache.Factory diskCacheFactory;
private RemovalListener<K, V> removalListener;
private RemovalListener<ICacheKey<K>, V> removalListener;
private CacheConfig<K, V> cacheConfig;
private CacheType cacheType;
private Map<String, ICache.Factory> cacheFactories;
Expand Down Expand Up @@ -390,7 +402,7 @@ public Builder<K, V> setDiskCacheFactory(ICache.Factory diskCacheFactory) {
* @param removalListener Removal listener
* @return builder
*/
public Builder<K, V> setRemovalListener(RemovalListener<K, V> removalListener) {
public Builder<K, V> setRemovalListener(RemovalListener<ICacheKey<K>, V> removalListener) {
this.removalListener = removalListener;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

Expand All @@ -25,27 +27,27 @@

public class MockDiskCache<K, V> implements ICache<K, V> {

Map<K, V> cache;
Map<ICacheKey<K>, V> cache;
int maxSize;
long delay;

private final RemovalListener<K, V> removalListener;
private final RemovalListener<ICacheKey<K>, V> removalListener;

public MockDiskCache(int maxSize, long delay, RemovalListener<K, V> removalListener) {
public MockDiskCache(int maxSize, long delay, RemovalListener<ICacheKey<K>, V> removalListener) {
this.maxSize = maxSize;
this.delay = delay;
this.removalListener = removalListener;
this.cache = new ConcurrentHashMap<K, V>();
this.cache = new ConcurrentHashMap<ICacheKey<K>, V>();
}

@Override
public V get(K key) {
public V get(ICacheKey<K> key) {
V value = cache.get(key);
return value;
}

@Override
public void put(K key, V value) {
public void put(ICacheKey<K> key, V value) {
if (this.cache.size() >= maxSize) { // For simplification
this.removalListener.onRemoval(new RemovalNotification<>(key, value, RemovalReason.EVICTED));
}
Expand All @@ -58,7 +60,7 @@ public void put(K key, V value) {
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) {
public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) {
V value = cache.computeIfAbsent(key, key1 -> {
try {
return loader.load(key);
Expand All @@ -70,7 +72,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) {
}

@Override
public void invalidate(K key) {
public void invalidate(ICacheKey<K> key) {
this.cache.remove(key);
}

Expand All @@ -80,7 +82,7 @@ public void invalidateAll() {
}

@Override
public Iterable<K> keys() {
public Iterable<ICacheKey<K>> keys() {
return () -> new CacheKeyIterator<>(cache, removalListener);
}

Expand All @@ -92,6 +94,11 @@ public long count() {
@Override
public void refresh() {}

@Override
public ImmutableCacheStatsHolder stats() {
return null;
}

@Override
public void close() {

Expand Down
Loading

0 comments on commit b0de406

Please sign in to comment.