Skip to content

Commit 0e1628c

Browse files
Gagan6164Aditya Khera
authored andcommitted
Replace ChildMemoryCircuitBreaker with NoopCircuitBreaker for File Cache. (opensearch-project#19166)
Signed-off-by: Gagan Singh Saini <gagasa@amazon.com>
1 parent 19b16ad commit 0e1628c

File tree

12 files changed

+29
-120
lines changed

12 files changed

+29
-120
lines changed

benchmarks/src/main/java/org/opensearch/benchmark/store/remote/filecache/FileCacheBenchmark.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
package org.opensearch.benchmark.store.remote.filecache;
1010

1111
import org.apache.lucene.store.IndexInput;
12-
import org.opensearch.core.common.breaker.CircuitBreaker;
13-
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
1412
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
1513
import org.opensearch.index.store.remote.filecache.FileCache;
1614
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
@@ -93,8 +91,7 @@ public static class CacheParameters {
9391
public void setup() {
9492
fileCache = FileCacheFactory.createConcurrentLRUFileCache(
9593
(long) maximumNumberOfEntries * INDEX_INPUT.length(),
96-
concurrencyLevel,
97-
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
94+
concurrencyLevel
9895
);
9996
for (long i = 0; i < maximumNumberOfEntries; i++) {
10097
final Path key = Paths.get(Long.toString(i));

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.opensearch.common.SetOnce;
1515
import org.opensearch.common.annotation.PublicApi;
1616
import org.opensearch.core.common.breaker.CircuitBreaker;
17-
import org.opensearch.core.common.breaker.CircuitBreakingException;
1817
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats.FileCacheStatsType;
1918
import org.opensearch.index.store.remote.utils.cache.RefCountedCache;
2019
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
@@ -61,11 +60,18 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {
6160
private static final Logger logger = LogManager.getLogger(FileCache.class);
6261
private final SegmentedCache<Path, CachedIndexInput> theCache;
6362

64-
private final CircuitBreaker circuitBreaker;
63+
private final CircuitBreaker circuitBreaker = null;
6564

65+
/**
66+
* @deprecated Use {@link FileCache(SegmentedCache<Path, CachedIndexInput>)}. CircuitBreaker parameter is not used.
67+
*/
68+
@Deprecated(forRemoval = true)
6669
public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
67-
this.theCache = cache;
68-
this.circuitBreaker = circuitBreaker;
70+
this(cache);
71+
}
72+
73+
public FileCache(SegmentedCache<Path, CachedIndexInput> theCache) {
74+
this.theCache = theCache;
6975
}
7076

7177
public long capacity() {
@@ -74,7 +80,6 @@ public long capacity() {
7480

7581
@Override
7682
public CachedIndexInput put(Path filePath, CachedIndexInput indexInput) {
77-
checkParentBreaker();
7883
CachedIndexInput cachedIndexInput = theCache.put(filePath, indexInput);
7984
return cachedIndexInput;
8085
}
@@ -84,7 +89,6 @@ public CachedIndexInput compute(
8489
Path key,
8590
BiFunction<? super Path, ? super CachedIndexInput, ? extends CachedIndexInput> remappingFunction
8691
) {
87-
checkParentBreaker();
8892
CachedIndexInput cachedIndexInput = theCache.compute(key, remappingFunction);
8993
return cachedIndexInput;
9094
}
@@ -204,22 +208,6 @@ public void closeIndexInputReferences() {
204208
theCache.closeIndexInputReferences();
205209
}
206210

207-
/**
208-
* Ensures that the PARENT breaker is not tripped when an entry is added to the cache
209-
*/
210-
private void checkParentBreaker() {
211-
try {
212-
circuitBreaker.addEstimateBytesAndMaybeBreak(0, "filecache_entry");
213-
} catch (CircuitBreakingException ex) {
214-
throw new CircuitBreakingException(
215-
"Unable to create file cache entries",
216-
ex.getBytesWanted(),
217-
ex.getByteLimit(),
218-
ex.getDurability()
219-
);
220-
}
221-
}
222-
223211
/**
224212
* Restores the file cache instance performing a folder scan of the
225213
* {@link org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory#LOCAL_STORE_LOCATION}

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheFactory.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.opensearch.index.store.remote.filecache;
1010

1111
import org.opensearch.common.cache.RemovalReason;
12-
import org.opensearch.core.common.breaker.CircuitBreaker;
1312
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
1413

1514
import java.nio.file.Files;
@@ -37,12 +36,12 @@
3736
*/
3837
public class FileCacheFactory {
3938

40-
public static FileCache createConcurrentLRUFileCache(long capacity, CircuitBreaker circuitBreaker) {
41-
return new FileCache(createDefaultBuilder().capacity(capacity).build(), circuitBreaker);
39+
public static FileCache createConcurrentLRUFileCache(long capacity) {
40+
return new FileCache(createDefaultBuilder().capacity(capacity).build());
4241
}
4342

44-
public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel, CircuitBreaker circuitBreaker) {
45-
return new FileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build(), circuitBreaker);
43+
public static FileCache createConcurrentLRUFileCache(long capacity, int concurrencyLevel) {
44+
return new FileCache(createDefaultBuilder().capacity(capacity).concurrencyLevel(concurrencyLevel).build());
4645
}
4746

4847
private static SegmentedCache.Builder<Path, CachedIndexInput> createDefaultBuilder() {

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -803,8 +803,8 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
803803
pluginCircuitBreakers,
804804
settingsModule.getClusterSettings()
805805
);
806-
// File cache will be initialized by the node once circuit breakers are in place.
807-
initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST));
806+
807+
initializeFileCache(settings);
808808

809809
pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> {
810810
CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());
@@ -2353,7 +2353,7 @@ DiscoveryNode getNode() {
23532353
* If the user doesn't configure the cache size, it fails if the node is a data + warm node.
23542354
* Else it configures the size to 80% of total capacity for a dedicated warm node, if not explicitly defined.
23552355
*/
2356-
private void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) throws IOException {
2356+
private void initializeFileCache(Settings settings) throws IOException {
23572357
if (DiscoveryNode.isWarmNode(settings) == false) {
23582358
return;
23592359
}
@@ -2378,7 +2378,7 @@ private void initializeFileCache(Settings settings, CircuitBreaker circuitBreake
23782378
throw new SettingsException("Cache size must be larger than zero and less than total capacity");
23792379
}
23802380

2381-
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
2381+
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity);
23822382
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(this.fileCache.capacity(), ByteSizeUnit.BYTES);
23832383
ForkJoinPool loadFileCacheThreadpool = new ForkJoinPool(
23842384
Runtime.getRuntime().availableProcessors(),

server/src/test/java/org/opensearch/action/admin/indices/cache/clear/TransportClearIndicesCacheActionTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.opensearch.cluster.routing.ShardRouting;
1919
import org.opensearch.cluster.service.ClusterService;
2020
import org.opensearch.common.settings.Settings;
21-
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
2221
import org.opensearch.core.index.shard.ShardId;
2322
import org.opensearch.core.rest.RestStatus;
2423
import org.opensearch.env.Environment;
@@ -85,7 +84,7 @@ public void testOnShardOperation() throws IOException {
8584
when(shardRouting.shardId()).thenReturn(shardId);
8685
final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId);
8786
final Path cacheEntryPath = shardPath.getDataPath();
88-
final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024, 16, new NoopCircuitBreaker(""));
87+
final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024, 16);
8988

9089
when(testNode.fileCache()).thenReturn(fileCache);
9190
when(testNode.getNodeEnvironment()).thenReturn(nodeEnvironment);

server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
import org.apache.lucene.store.IOContext;
1616
import org.apache.lucene.store.IndexInput;
1717
import org.apache.lucene.store.IndexOutput;
18-
import org.opensearch.core.common.breaker.CircuitBreaker;
19-
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
2018
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
2119
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
2220
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
@@ -65,7 +63,7 @@ public void setup() throws IOException {
6563
remoteSegmentStoreDirectory.init();
6664
localDirectory = FSDirectory.open(createTempDir());
6765
removeExtraFSFiles();
68-
fileCache = FileCacheFactory.createConcurrentLRUFileCache(FILE_CACHE_CAPACITY, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
66+
fileCache = FileCacheFactory.createConcurrentLRUFileCache(FILE_CACHE_CAPACITY);
6967
compositeDirectory = new CompositeDirectory(localDirectory, remoteSegmentStoreDirectory, fileCache, threadPool);
7068
addFilesToDirectory(LOCAL_FILES);
7169
}

server/src/test/java/org/opensearch/index/store/DefaultCompositeDirectoryFactoryTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import org.apache.lucene.store.Directory;
1212
import org.apache.lucene.store.FSDirectory;
1313
import org.opensearch.common.settings.Settings;
14-
import org.opensearch.core.common.breaker.CircuitBreaker;
15-
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
1614
import org.opensearch.core.index.shard.ShardId;
1715
import org.opensearch.index.IndexSettings;
1816
import org.opensearch.index.shard.ShardPath;
@@ -45,7 +43,7 @@ public void setup() throws IOException {
4543
shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0));
4644
localDirectoryFactory = mock(IndexStorePlugin.DirectoryFactory.class);
4745
localDirectory = FSDirectory.open(createTempDir());
48-
fileCache = FileCacheFactory.createConcurrentLRUFileCache(10000, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
46+
fileCache = FileCacheFactory.createConcurrentLRUFileCache(10000);
4947
when(localDirectoryFactory.newDirectory(indexSettings, shardPath)).thenReturn(localDirectory);
5048
setupRemoteSegmentStoreDirectory();
5149
populateMetadata();

server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheCleanerTests.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
import org.opensearch.cluster.metadata.IndexMetadata;
1616
import org.opensearch.cluster.routing.AllocationId;
1717
import org.opensearch.common.settings.Settings;
18-
import org.opensearch.core.common.breaker.CircuitBreaker;
19-
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
2018
import org.opensearch.core.index.shard.ShardId;
2119
import org.opensearch.env.NodeEnvironment;
2220
import org.opensearch.gateway.WriteStateException;
@@ -77,11 +75,7 @@ public class FileCacheCleanerTests extends OpenSearchTestCase {
7775

7876
private static final Logger logger = LogManager.getLogger(FileCache.class);
7977

80-
private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
81-
1024 * 1024,
82-
1,
83-
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
84-
);
78+
private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024, 1);
8579
private final Map<ShardId, Path> files = new HashMap<>();
8680
private NodeEnvironment env;
8781
private FileCacheCleaner cleaner;

server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java

Lines changed: 2 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@
1515
import org.apache.lucene.store.IndexInput;
1616
import org.opensearch.common.SetOnce;
1717
import org.opensearch.common.SuppressForbidden;
18-
import org.opensearch.common.breaker.TestCircuitBreaker;
19-
import org.opensearch.core.common.breaker.CircuitBreaker;
20-
import org.opensearch.core.common.breaker.CircuitBreakingException;
21-
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
2218
import org.opensearch.env.NodeEnvironment;
2319
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
2420
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
@@ -52,17 +48,7 @@ public void init() throws Exception {
5248
}
5349

5450
private FileCache createFileCache(long capacity) {
55-
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
56-
}
57-
58-
private FileCache createFileCache(long capacity, CircuitBreaker circuitBreaker) {
59-
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, circuitBreaker);
60-
}
61-
62-
private FileCache createCircuitBreakingFileCache(long capacity) {
63-
TestCircuitBreaker testCircuitBreaker = new TestCircuitBreaker();
64-
testCircuitBreaker.startBreaking();
65-
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, testCircuitBreaker);
51+
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL);
6652
}
6753

6854
private Path createPath(String middle) {
@@ -182,13 +168,6 @@ public void testPutThrowException() {
182168
});
183169
}
184170

185-
public void testPutThrowCircuitBreakingException() {
186-
FileCache fileCache = createCircuitBreakingFileCache(MEGA_BYTES);
187-
Path path = createPath("0");
188-
assertThrows(CircuitBreakingException.class, () -> fileCache.put(path, new StubCachedIndexInput(8 * MEGA_BYTES)));
189-
assertNull(fileCache.get(path));
190-
}
191-
192171
public void testCompute() {
193172
FileCache fileCache = createFileCache(MEGA_BYTES);
194173
Path path = createPath("0");
@@ -206,27 +185,6 @@ public void testComputeThrowException() {
206185
});
207186
}
208187

209-
public void testComputeThrowCircuitBreakingException() {
210-
FileCache fileCache = createCircuitBreakingFileCache(MEGA_BYTES);
211-
Path path = createPath("0");
212-
assertThrows(CircuitBreakingException.class, () -> fileCache.compute(path, (p, i) -> new StubCachedIndexInput(8 * MEGA_BYTES)));
213-
assertNull(fileCache.get(path));
214-
}
215-
216-
public void testEntryNotRemovedCircuitBreaker() {
217-
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
218-
FileCache fileCache = createFileCache(MEGA_BYTES, circuitBreaker);
219-
Path path = createPath("0");
220-
fileCache.put(path, new StubCachedIndexInput(8 * MEGA_BYTES));
221-
// put should succeed since circuit breaker hasn't tripped yet
222-
assertEquals(fileCache.get(path).length(), 8 * MEGA_BYTES);
223-
circuitBreaker.startBreaking();
224-
// compute should throw CircuitBreakingException but shouldn't remove entry already present
225-
assertThrows(CircuitBreakingException.class, () -> fileCache.compute(path, (p, i) -> new StubCachedIndexInput(2 * MEGA_BYTES)));
226-
assertNotNull(fileCache.get(path));
227-
assertEquals(fileCache.get(path).length(), 8 * MEGA_BYTES);
228-
}
229-
230188
public void testRemove() {
231189
FileCache fileCache = createFileCache(MEGA_BYTES);
232190
for (int i = 0; i < 4; i++) {
@@ -347,11 +305,7 @@ public void testPruneWithPredicate() {
347305
}
348306

349307
public void testUsage() {
350-
FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(
351-
16 * MEGA_BYTES,
352-
1,
353-
new NoopCircuitBreaker(CircuitBreaker.REQUEST)
354-
);
308+
FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(16 * MEGA_BYTES, 1);
355309
putAndDecRef(fileCache, 0, 16 * MEGA_BYTES);
356310

357311
long expectedCacheUsage = 16 * MEGA_BYTES;

server/src/test/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInputTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
import org.apache.lucene.store.IOContext;
1515
import org.apache.lucene.store.IndexInput;
1616
import org.apache.lucene.store.IndexOutput;
17-
import org.opensearch.core.common.breaker.CircuitBreaker;
18-
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
1917
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
2018
import org.opensearch.test.OpenSearchTestCase;
2119
import org.junit.Before;
@@ -45,7 +43,7 @@ public void setup() throws IOException {
4543
indexOutput.close();
4644
filePath = basePath.resolve(TEST_FILE);
4745
underlyingIndexInput = fsDirectory.openInput(TEST_FILE, IOContext.DEFAULT);
48-
fileCache = FileCacheFactory.createConcurrentLRUFileCache(FILE_CACHE_CAPACITY, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
46+
fileCache = FileCacheFactory.createConcurrentLRUFileCache(FILE_CACHE_CAPACITY);
4947
}
5048

5149
protected void setupIndexInputAndAddToFileCache() {

0 commit comments

Comments
 (0)