Skip to content

Commit

Permalink
Make cacheEntry.getIndexInput() privileged when fetching blobs from r…
Browse files Browse the repository at this point in the history
…emote snapshot (#16544) (#16582)

* Make cacheEntry.getIndexInput() privileged when fetching blobs from remote store



* Rebase



* Spotless apply



* Clean up doPrivileged calls



* Comment



* Move fetchBlob to PrivilegedExceptionAction. Catch and unwrap IOException.



* Unused import



* Update server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java




* Typo 'thrown'. Catch and throw unknown exception as IOException.



---------




(cherry picked from commit 4213cc2)

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Signed-off-by: Finn <finnegancarroll94@gmail.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Andriy Redko <drreta@gmail.com>
  • Loading branch information
3 people authored Nov 6, 2024
1 parent 6111d05 commit f1b8799
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove resource usages object from search response headers ([#16532](https://github.com/opensearch-project/OpenSearch/pull/16532))
- Support retrieving doc values of unsigned long field ([#16543](https://github.com/opensearch-project/OpenSearch/pull/16543))
- Fix rollover alias supports restored searchable snapshot index([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483))
- Fix permissions error on scripted query against remote snapshot ([#16544](https://github.com/opensearch-project/OpenSearch/pull/16544))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -56,39 +57,52 @@ public TransferManager(final StreamReader streamReader, final FileCache fileCach

/**
* Given a blobFetchRequestList, return it's corresponding IndexInput.
*
* Note: Scripted queries/aggs may trigger a blob fetch within a new security context.
* As such the following operations require elevated permissions.
*
* cacheEntry.getIndexInput() downloads new blobs from the remote store to local fileCache.
* fileCache.compute() as inserting into the local fileCache may trigger an eviction.
*
* @param blobFetchRequest to fetch
* @return future of IndexInput augmented with internal caching maintenance tasks
*/
public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException {

final Path key = blobFetchRequest.getFilePath();
logger.trace("fetchBlob called for {}", key.toString());

// We need to do a privileged action here in order to fetch from remote
// and write/evict from local file cache in case this is invoked as a side
// effect of a plugin (such as a scripted search) that doesn't have the
// necessary permissions.
final CachedIndexInput cacheEntry = AccessController.doPrivileged((PrivilegedAction<CachedIndexInput>) () -> {
return fileCache.compute(key, (path, cachedIndexInput) -> {
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
logger.trace("Transfer Manager - IndexInput closed or not in cache");
// Doesn't exist or is closed, either way create a new one
return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
} else {
logger.trace("Transfer Manager - Already in cache");
// already in the cache and ready to be used (open)
return cachedIndexInput;
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<IndexInput>) () -> {
CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
logger.trace("Transfer Manager - IndexInput closed or not in cache");
// Doesn't exist or is closed, either way create a new one
return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
} else {
logger.trace("Transfer Manager - Already in cache");
// already in the cache and ready to be used (open)
return cachedIndexInput;
}
});

// Cache entry was either retrieved from the cache or newly added, either
// way the reference count has been incremented by one. We can only
// decrement this reference _after_ creating the clone to be returned.
try {
return cacheEntry.getIndexInput().clone();
} finally {
fileCache.decRef(key);
}
});
});

// Cache entry was either retrieved from the cache or newly added, either
// way the reference count has been incremented by one. We can only
// decrement this reference _after_ creating the clone to be returned.
try {
return cacheEntry.getIndexInput().clone();
} finally {
fileCache.decRef(key);
} catch (PrivilegedActionException e) {
final Exception cause = e.getException();
if (cause instanceof IOException) {
throw (IOException) cause;
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new IOException(cause);
}
}
}

Expand Down

0 comments on commit f1b8799

Please sign in to comment.