diff --git a/CHANGELOG.md b/CHANGELOG.md index 06a749df1b3a7..409e152ac60af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -112,6 +112,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix unchecked cast in dynamic action map getter ([#15394](https://github.com/opensearch-project/OpenSearch/pull/15394)) - Fix null values indexed as "null" strings in flat_object field ([#14069](https://github.com/opensearch-project/OpenSearch/pull/14069)) - Fix terms query on wildcard field returns nothing ([#15607](https://github.com/opensearch-project/OpenSearch/pull/15607)) +- Fix remote snapshot file_cache exceeding capacity ([#15077](https://github.com/opensearch-project/OpenSearch/pull/15077)) ### Security diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index f07c4832d982c..94c25202ac90c 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -95,6 +95,19 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio @SuppressWarnings("removal") private static FileCachedIndexInput createIndexInput(FileCache fileCache, StreamReader streamReader, BlobFetchRequest request) { try { + // This local file cache is ref counted and may not strictly enforce configured capacity. + // If we find available capacity is exceeded, deny further BlobFetchRequests. + if (fileCache.capacity() < fileCache.usage().usage()) { + fileCache.prune(); + throw new IOException( + "Local file cache capacity (" + + fileCache.capacity() + + ") exceeded (" + + fileCache.usage().usage() + + ") - BlobFetchRequest failed: " + + request.getFilePath() + ); + } if (Files.exists(request.getFilePath()) == false) { logger.trace("Fetching from Remote in createIndexInput of Transfer Manager"); try ( diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java index 810a4c336fdf7..1eae5119ab462 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java @@ -99,7 +99,7 @@ public void testConcurrentAccess() throws Exception { } } - public void testFetchBlobWithConcurrentCacheEvictions() throws Exception { + public void testFetchBlobWithConcurrentCacheEvictions() { // Submit 256 tasks to an executor with 16 threads that will each randomly // request one of eight blobs. Given that the cache can only hold two // blobs this will lead to a huge amount of contention and thrashing. @@ -114,41 +114,34 @@ public void testFetchBlobWithConcurrentCacheEvictions() throws Exception { try (IndexInput indexInput = fetchBlobWithName(blobname)) { assertIndexInputIsFunctional(indexInput); } + } catch (IOException ignored) { // fetchBlobWithName may fail due to fixed capacity } catch (Exception e) { throw new AssertionError(e); } })); } // Wait for all threads to complete - for (Future future : futures) { - future.get(10, TimeUnit.SECONDS); + try { + for (Future future : futures) { + future.get(10, TimeUnit.SECONDS); + } + } catch (java.util.concurrent.ExecutionException ignored) { // Index input may be null + } catch (Exception e) { + throw new AssertionError(e); } + } finally { assertTrue(terminate(testRunner)); } MatcherAssert.assertThat("Expected many evictions to happen", fileCache.stats().evictionCount(), greaterThan(0L)); } - public void testUsageExceedsCapacity() throws Exception { - // Fetch resources that exceed the configured capacity of the cache and assert that the - // returned IndexInputs are still functional. - try (IndexInput i1 = fetchBlobWithName("1"); IndexInput i2 = fetchBlobWithName("2"); IndexInput i3 = fetchBlobWithName("3")) { - assertIndexInputIsFunctional(i1); - assertIndexInputIsFunctional(i2); - assertIndexInputIsFunctional(i3); - MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo((long) EIGHT_MB * 3)); - MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB * 3)); - } - MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo(0L)); - MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB * 3)); - // Fetch another resource which will trigger an eviction - try (IndexInput i1 = fetchBlobWithName("1")) { - assertIndexInputIsFunctional(i1); - MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo((long) EIGHT_MB)); - MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB)); - } - MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo(0L)); - MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB)); + public void testOverflowDisabled() throws Exception { + initializeTransferManager(); + IndexInput i1 = fetchBlobWithName("1"); + IndexInput i2 = fetchBlobWithName("2"); + + assertThrows(IOException.class, () -> { IndexInput i3 = fetchBlobWithName("3"); }); } public void testDownloadFails() throws Exception {