Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
package org.opensearch.benchmark.store.remote.filecache;

import org.apache.lucene.store.IndexInput;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
Expand Down Expand Up @@ -93,8 +91,7 @@ public static class CacheParameters {
public void setup() {
fileCache = FileCacheFactory.createConcurrentLRUFileCache(
(long) maximumNumberOfEntries * INDEX_INPUT.length(),
concurrencyLevel,
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
concurrencyLevel
);
for (long i = 0; i < maximumNumberOfEntries; i++) {
final Path key = Paths.get(Long.toString(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.opensearch.common.SetOnce;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats.FileCacheStatsType;
import org.opensearch.index.store.remote.utils.cache.RefCountedCache;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
Expand Down Expand Up @@ -61,11 +60,18 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {
private static final Logger logger = LogManager.getLogger(FileCache.class);
private final SegmentedCache<Path, CachedIndexInput> theCache;

private final CircuitBreaker circuitBreaker;
private final CircuitBreaker circuitBreaker = null;

/**
* @deprecated Use {@link FileCache(SegmentedCache<Path, CachedIndexInput>)}. CircuitBreaker parameter is not used.
*/
@Deprecated(forRemoval = true)
public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
this.circuitBreaker = circuitBreaker;
this(cache);
}

public FileCache(SegmentedCache<Path, CachedIndexInput> theCache) {
this.theCache = theCache;
}

public long capacity() {
Expand All @@ -74,7 +80,6 @@ public long capacity() {

@Override
public CachedIndexInput put(Path filePath, CachedIndexInput indexInput) {
checkParentBreaker();
CachedIndexInput cachedIndexInput = theCache.put(filePath, indexInput);
return cachedIndexInput;
}
Expand All @@ -84,7 +89,6 @@ public CachedIndexInput compute(
Path key,
BiFunction<? super Path, ? super CachedIndexInput, ? extends CachedIndexInput> remappingFunction
) {
checkParentBreaker();
CachedIndexInput cachedIndexInput = theCache.compute(key, remappingFunction);
return cachedIndexInput;
}
Expand Down Expand Up @@ -204,22 +208,6 @@ public void closeIndexInputReferences() {
theCache.closeIndexInputReferences();
}

/**
* Ensures that the PARENT breaker is not tripped when an entry is added to the cache
*/
private void checkParentBreaker() {
try {
circuitBreaker.addEstimateBytesAndMaybeBreak(0, "filecache_entry");
} catch (CircuitBreakingException ex) {
throw new CircuitBreakingException(
"Unable to create file cache entries",
ex.getBytesWanted(),
ex.getByteLimit(),
ex.getDurability()
);
}
}

/**
* Restores the file cache instance performing a folder scan of the
* {@link org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory#LOCAL_STORE_LOCATION}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.index.store.remote.filecache;

import org.opensearch.common.cache.RemovalReason;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;

import java.nio.file.Files;
Expand Down Expand Up @@ -37,12 +36,12 @@
*/
public class FileCacheFactory {

public static FileCache createConcurrentLRUFileCache(long capacity, CircuitBreaker circuitBreaker) {
return new FileCache(createDefaultBuilder().capacity(capacity).build(), circuitBreaker);
public static FileCache createConcurrentLRUFileCache(long capacity) {
return new FileCache(createDefaultBuilder().capacity(capacity).build());
}

public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel, CircuitBreaker circuitBreaker) {
return new FileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build(), circuitBreaker);
public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel) {
return new FileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build());
}

private static SegmentedCache.Builder<Path, CachedIndexInput> createDefaultBuilder() {
Expand Down
8 changes: 4 additions & 4 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -803,8 +803,8 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
pluginCircuitBreakers,
settingsModule.getClusterSettings()
);
// File cache will be initialized by the node once circuit breakers are in place.
initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST));

initializeFileCache(settings);

pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> {
CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());
Expand Down Expand Up @@ -2353,7 +2353,7 @@ DiscoveryNode getNode() {
* If the user doesn't configure the cache size, it fails if the node is a data + warm node.
* Else it configures the size to 80% of total capacity for a dedicated warm node, if not explicitly defined.
*/
private void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) throws IOException {
private void initializeFileCache(Settings settings) throws IOException {
if (DiscoveryNode.isWarmNode(settings) == false) {
return;
}
Expand All @@ -2378,7 +2378,7 @@ private void initializeFileCache(Settings settings, CircuitBreaker circuitBreake
throw new SettingsException("Cache size must be larger than zero and less than total capacity");
}

this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(this.fileCache.capacity(), ByteSizeUnit.BYTES);
ForkJoinPool loadFileCacheThreadpool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.env.Environment;
Expand Down Expand Up @@ -85,7 +84,7 @@ public void testOnShardOperation() throws IOException {
when(shardRouting.shardId()).thenReturn(shardId);
final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId);
final Path cacheEntryPath = shardPath.getDataPath();
final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024, 16, new NoopCircuitBreaker(""));
final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024, 16);

when(testNode.fileCache()).thenReturn(fileCache);
when(testNode.getNodeEnvironment()).thenReturn(nodeEnvironment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
Expand Down Expand Up @@ -65,7 +63,7 @@ public void setup() throws IOException {
remoteSegmentStoreDirectory.init();
localDirectory = FSDirectory.open(createTempDir());
removeExtraFSFiles();
fileCache = FileCacheFactory.createConcurrentLRUFileCache(FILE_CACHE_CAPACITY, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
fileCache = FileCacheFactory.createConcurrentLRUFileCache(FILE_CACHE_CAPACITY);
compositeDirectory = new CompositeDirectory(localDirectory, remoteSegmentStoreDirectory, fileCache, threadPool);
addFilesToDirectory(LOCAL_FILES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
Expand Down Expand Up @@ -45,7 +43,7 @@ public void setup() throws IOException {
shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0));
localDirectoryFactory = mock(IndexStorePlugin.DirectoryFactory.class);
localDirectory = FSDirectory.open(createTempDir());
fileCache = FileCacheFactory.createConcurrentLRUFileCache(10000, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
fileCache = FileCacheFactory.createConcurrentLRUFileCache(10000);
when(localDirectoryFactory.newDirectory(indexSettings, shardPath)).thenReturn(localDirectory);
setupRemoteSegmentStoreDirectory();
populateMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.AllocationId;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.gateway.WriteStateException;
Expand Down Expand Up @@ -77,11 +75,7 @@ public class FileCacheCleanerTests extends OpenSearchTestCase {

private static final Logger logger = LogManager.getLogger(FileCache.class);

private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
1024 * 1024,
1,
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
);
private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024, 1);
private final Map<ShardId, Path> files = new HashMap<>();
private NodeEnvironment env;
private FileCacheCleaner cleaner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.SetOnce;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.breaker.TestCircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
Expand Down Expand Up @@ -52,17 +48,7 @@ public void init() throws Exception {
}

private FileCache createFileCache(long capacity) {
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
}

private FileCache createFileCache(long capacity, CircuitBreaker circuitBreaker) {
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, circuitBreaker);
}

private FileCache createCircuitBreakingFileCache(long capacity) {
TestCircuitBreaker testCircuitBreaker = new TestCircuitBreaker();
testCircuitBreaker.startBreaking();
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, testCircuitBreaker);
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL);
}

private Path createPath(String middle) {
Expand Down Expand Up @@ -182,13 +168,6 @@ public void testPutThrowException() {
});
}

public void testPutThrowCircuitBreakingException() {
FileCache fileCache = createCircuitBreakingFileCache(MEGA_BYTES);
Path path = createPath("0");
assertThrows(CircuitBreakingException.class, () -> fileCache.put(path, new StubCachedIndexInput(8 * MEGA_BYTES)));
assertNull(fileCache.get(path));
}

public void testCompute() {
FileCache fileCache = createFileCache(MEGA_BYTES);
Path path = createPath("0");
Expand All @@ -206,27 +185,6 @@ public void testComputeThrowException() {
});
}

public void testComputeThrowCircuitBreakingException() {
FileCache fileCache = createCircuitBreakingFileCache(MEGA_BYTES);
Path path = createPath("0");
assertThrows(CircuitBreakingException.class, () -> fileCache.compute(path, (p, i) -> new StubCachedIndexInput(8 * MEGA_BYTES)));
assertNull(fileCache.get(path));
}

public void testEntryNotRemovedCircuitBreaker() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
FileCache fileCache = createFileCache(MEGA_BYTES, circuitBreaker);
Path path = createPath("0");
fileCache.put(path, new StubCachedIndexInput(8 * MEGA_BYTES));
// put should succeed since circuit breaker hasn't tripped yet
assertEquals(fileCache.get(path).length(), 8 * MEGA_BYTES);
circuitBreaker.startBreaking();
// compute should throw CircuitBreakingException but shouldn't remove entry already present
assertThrows(CircuitBreakingException.class, () -> fileCache.compute(path, (p, i) -> new StubCachedIndexInput(2 * MEGA_BYTES)));
assertNotNull(fileCache.get(path));
assertEquals(fileCache.get(path).length(), 8 * MEGA_BYTES);
}

public void testRemove() {
FileCache fileCache = createFileCache(MEGA_BYTES);
for (int i = 0; i < 4; i++) {
Expand Down Expand Up @@ -347,11 +305,7 @@ public void testPruneWithPredicate() {
}

public void testUsage() {
FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
16 * MEGA_BYTES,
1,
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
);
FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(16 * MEGA_BYTES, 1);
putAndDecRef(fileCache, 0, 16 * MEGA_BYTES);

long expectedCacheUsage = 16 * MEGA_BYTES;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;
Expand Down Expand Up @@ -45,7 +43,7 @@ public void setup() throws IOException {
indexOutput.close();
filePath = basePath.resolve(TEST_FILE);
underlyingIndexInput = fsDirectory.openInput(TEST_FILE, IOContext.DEFAULT);
fileCache = FileCacheFactory.createConcurrentLRUFileCache(FILE_CACHE_CAPACITY, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
fileCache = FileCacheFactory.createConcurrentLRUFileCache(FILE_CACHE_CAPACITY);
}

protected void setupIndexInputAndAddToFileCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.SimpleFSLockFactory;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
Expand Down Expand Up @@ -44,11 +42,7 @@
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public abstract class TransferManagerTestCase extends OpenSearchTestCase {
protected static final int EIGHT_MB = 1024 * 1024 * 8;
protected final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
EIGHT_MB * 2,
1,
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
);
protected final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(EIGHT_MB * 2, 1);
protected MMapDirectory directory;
protected TransferManager transferManager;
protected ThreadPool threadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import org.apache.lucene.util.Constants;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.env.NodeEnvironment;
Expand Down Expand Up @@ -130,11 +128,7 @@ public void testFsCacheInfo() throws IOException {
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ByteSizeValue gbByteSizeValue = new ByteSizeValue(1, ByteSizeUnit.GB);
env.fileCacheNodePath().fileCacheReservedSize = gbByteSizeValue;
FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
gbByteSizeValue.getBytes(),
16,
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
);
FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(gbByteSizeValue.getBytes(), 16);
FsProbe probe = new FsProbe(env, fileCache);
FsInfo stats = probe.stats(null);
assertNotNull(stats);
Expand Down Expand Up @@ -170,11 +164,7 @@ public void testFsInfoWhenFileCacheOccupied() throws IOException {
final long totalSpace = adjustForHugeFilesystems(env.fileCacheNodePath().fileStore.getTotalSpace());
ByteSizeValue gbByteSizeValue = new ByteSizeValue(totalSpace, ByteSizeUnit.BYTES);
env.fileCacheNodePath().fileCacheReservedSize = gbByteSizeValue;
FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
gbByteSizeValue.getBytes(),
16,
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
);
FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(gbByteSizeValue.getBytes(), 16);

FsProbe probe = new FsProbe(env, fileCache);
FsInfo stats = probe.stats(null);
Expand Down
Loading