Skip to content

Commit

Permalink
[Tiered caching] Framework changes
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
  • Loading branch information
sgup432 committed Oct 19, 2023
1 parent 69f6f4e commit b196615
Show file tree
Hide file tree
Showing 17 changed files with 1,198 additions and 108 deletions.
112 changes: 59 additions & 53 deletions server/src/main/java/org/opensearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -422,68 +422,74 @@ public V computeIfAbsent(K key, CacheLoader<K, V> loader) throws ExecutionExcept
}
});
if (value == null) {
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
// get the value from this future on the thread that won the race to place the future into the segment map
CacheSegment<K, V> segment = getCacheSegment(key);
CompletableFuture<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();
value = compute(key, loader);
}
return value;
}

try (ReleasableLock ignored = segment.writeLock.acquire()) {
future = segment.map.putIfAbsent(key, completableFuture);
}
public V compute(K key, CacheLoader<K, V> loader) throws ExecutionException {
long now = now();
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
// get the value from this future on the thread that won the race to place the future into the segment map
CacheSegment<K, V> segment = getCacheSegment(key);
CompletableFuture<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();

BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
}
return ok.value;
} else {
try (ReleasableLock ignored = segment.writeLock.acquire()) {
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
}
}
return null;
}
};
try (ReleasableLock ignored = segment.writeLock.acquire()) {
future = segment.map.putIfAbsent(key, completableFuture);
}

CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V loaded;
try {
loaded = loader.load(key);
} catch (Exception e) {
future.completeExceptionally(e);
throw new ExecutionException(e);
}
if (loaded == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Entry<>(key, loaded, now));
BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
}
return ok.value;
} else {
completableValue = future.handle(handler);
try (ReleasableLock ignored = segment.writeLock.acquire()) {
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
}
}
return null;
}
};

CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V loaded;
try {
value = completableValue.get();
// check to ensure the future hasn't been completed with an exception
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the exception to be thrown for other concurrent callers
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
loaded = loader.load(key);
} catch (Exception e) {
future.completeExceptionally(e);
throw new ExecutionException(e);
}
if (loaded == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Entry<>(key, loaded, now));
}
} else {
completableValue = future.handle(handler);
}
V value;
try {
value = completableValue.get();
// check to ensure the future hasn't been completed with an exception
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the exception to be thrown for other concurrent callers
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.common.cache;

import org.opensearch.indices.TierType;

/**
* Notification when an element is removed from the cache
*
Expand All @@ -42,11 +44,17 @@ public class RemovalNotification<K, V> {
private final K key;
private final V value;
private final RemovalReason removalReason;
private final TierType tierType;

public RemovalNotification(K key, V value, RemovalReason removalReason) {
this(key, value, removalReason, TierType.ON_HEAP);
}

public RemovalNotification(K key, V value, RemovalReason removalReason, TierType tierType) {
this.key = key;
this.value = value;
this.removalReason = removalReason;
this.tierType = tierType;
}

public K getKey() {
Expand All @@ -60,4 +68,8 @@ public V getValue() {
public RemovalReason getRemovalReason() {
return removalReason;
}

public TierType getTierType() {
return tierType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import org.apache.lucene.util.Accountable;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.indices.TierType;

import java.util.EnumMap;

/**
* Tracks the portion of the request cache in use for a particular shard.
Expand All @@ -43,30 +46,39 @@
*/
public final class ShardRequestCache {

final CounterMetric evictionsMetric = new CounterMetric();
final CounterMetric totalMetric = new CounterMetric();
final CounterMetric hitCount = new CounterMetric();
final CounterMetric missCount = new CounterMetric();
private EnumMap<TierType, StatsHolder> statsHolder = new EnumMap<>(TierType.class);

public ShardRequestCache() {
for (TierType tierType : TierType.values()) {
statsHolder.put(tierType, new StatsHolder());
}
}

public RequestCacheStats stats() {
return new RequestCacheStats(totalMetric.count(), evictionsMetric.count(), hitCount.count(), missCount.count());
// TODO: Change RequestCacheStats to support disk tier stats.
return new RequestCacheStats(
statsHolder.get(TierType.ON_HEAP).totalMetric.count(),
statsHolder.get(TierType.ON_HEAP).evictionsMetric.count(),
statsHolder.get(TierType.ON_HEAP).hitCount.count(),
statsHolder.get(TierType.ON_HEAP).missCount.count()
);
}

public void onHit() {
hitCount.inc();
public void onHit(TierType tierType) {
statsHolder.get(tierType).hitCount.inc();
}

public void onMiss() {
missCount.inc();
public void onMiss(TierType tierType) {
statsHolder.get(tierType).missCount.inc();
}

public void onCached(Accountable key, BytesReference value) {
totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed());
public void onCached(Accountable key, BytesReference value, TierType tierType) {
statsHolder.get(tierType).totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed());
}

public void onRemoval(Accountable key, BytesReference value, boolean evicted) {
public void onRemoval(Accountable key, BytesReference value, boolean evicted, TierType tierType) {
if (evicted) {
evictionsMetric.inc();
statsHolder.get(tierType).evictionsMetric.inc();
}
long dec = 0;
if (key != null) {
Expand All @@ -75,6 +87,14 @@ public void onRemoval(Accountable key, BytesReference value, boolean evicted) {
if (value != null) {
dec += value.ramBytesUsed();
}
totalMetric.dec(dec);
statsHolder.get(tierType).totalMetric.dec(dec);
}

static class StatsHolder {

final CounterMetric evictionsMetric = new CounterMetric();
final CounterMetric totalMetric = new CounterMetric();
final CounterMetric hitCount = new CounterMetric();
final CounterMetric missCount = new CounterMetric();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,27 @@ abstract class AbstractIndexShardCacheEntity implements IndicesRequestCache.Cach
protected abstract ShardRequestCache stats();

@Override
public final void onCached(IndicesRequestCache.Key key, BytesReference value) {
stats().onCached(key, value);
public final void onCached(IndicesRequestCache.Key key, BytesReference value, TierType tierType) {
stats().onCached(key, value, tierType);
}

@Override
public final void onHit() {
stats().onHit();
public final void onHit(TierType tierType) {
stats().onHit(tierType);
}

@Override
public final void onMiss() {
stats().onMiss();
public final void onMiss(TierType tierType) {
stats().onMiss(tierType);
}

@Override
public final void onRemoval(RemovalNotification<IndicesRequestCache.Key, BytesReference> notification) {
stats().onRemoval(notification.getKey(), notification.getValue(), notification.getRemovalReason() == RemovalReason.EVICTED);
stats().onRemoval(
notification.getKey(),
notification.getValue(),
notification.getRemovalReason() == RemovalReason.EVICTED,
notification.getTierType()
);
}
}
45 changes: 45 additions & 0 deletions server/src/main/java/org/opensearch/indices/CachingTier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices;

import org.opensearch.common.cache.RemovalListener;

/**
* Caching tier interface. Can be implemented/extended by concrete classes to provide different flavors of cache like
* onHeap, disk etc.
* @param <K> Type of key
* @param <V> Type of value
*/
public interface CachingTier<K, V> {

V get(K key);

void put(K key, V value);

V computeIfAbsent(K key, TieredCacheLoader<K, V> loader) throws Exception;

void invalidate(K key);

V compute(K key, TieredCacheLoader<K, V> loader) throws Exception;

void setRemovalListener(RemovalListener<K, V> removalListener);

void invalidateAll();

Iterable<K> keys();

int count();

TierType getTierType();

/**
* Force any outstanding size-based and time-based evictions to occur
*/
default void refresh() {}
}
18 changes: 18 additions & 0 deletions server/src/main/java/org/opensearch/indices/DiskCachingTier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices;

/**
* This is specific to disk caching tier and can be used to add methods which are specific to disk tier.
* @param <K> Type of key
* @param <V> Type of value
*/
public interface DiskCachingTier<K, V> extends CachingTier<K, V> {

}
Loading

0 comments on commit b196615

Please sign in to comment.