Skip to content

Centralize common code for footer checksum read in BaseSearchableSnapshotIndexInput class #68902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*/
package org.elasticsearch.index.store;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.BufferedIndexInput;
import org.apache.lucene.store.IOContext;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand All @@ -16,11 +18,16 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;

import static org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput.checksumToBytesArray;

public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput {

protected final Logger logger;
protected final BlobContainer blobContainer;
protected final FileInfo fileInfo;
protected final IOContext context;
Expand All @@ -33,6 +40,7 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu
private AtomicBoolean closed;

public BaseSearchableSnapshotIndexInput(
Logger logger,
String resourceDesc,
BlobContainer blobContainer,
FileInfo fileInfo,
Expand All @@ -42,6 +50,7 @@ public BaseSearchableSnapshotIndexInput(
long length
) {
super(resourceDesc, context);
this.logger = Objects.requireNonNull(logger);
this.blobContainer = Objects.requireNonNull(blobContainer);
this.fileInfo = Objects.requireNonNull(fileInfo);
this.context = Objects.requireNonNull(context);
Expand All @@ -54,25 +63,60 @@ public BaseSearchableSnapshotIndexInput(
this.isClone = false;
}

public BaseSearchableSnapshotIndexInput(
String resourceDesc,
BlobContainer blobContainer,
FileInfo fileInfo,
IOContext context,
IndexInputStats stats,
long offset,
long length,
int bufferSize
) {
this(resourceDesc, blobContainer, fileInfo, context, stats, offset, length);
setBufferSize(bufferSize);
}

@Override
public final long length() {
return length;
}

@Override
protected final void readInternal(ByteBuffer b) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Override

assert assertCurrentThreadIsNotCacheFetchAsync();

// We can detect that we're going to read the last 16 bytes (that contains the footer checksum) of the file. Such reads are often
// executed when opening a Directory and since we have the checksum in the snapshot metadata we can use it to fill the ByteBuffer.
if (maybeReadChecksumFromFileInfo(b)) {
logger.trace("read footer of file [{}], bypassing all caches", fileInfo.physicalName());
assert b.remaining() == 0L : b.remaining();
return;
}

doReadInternal(b);
}

protected abstract void doReadInternal(ByteBuffer b) throws IOException;

/**
* Detects read operations that are executed on the last 16 bytes of the index input which is where Lucene stores the footer checksum
* of Lucene files. If such a read is detected this method tries to complete the read operation by reading the checksum from the
* {@link FileInfo} in memory rather than reading the bytes from the {@link BufferedIndexInput} because that could trigger more cache
* operations.
*
* @return true if the footer checksum has been read from the {@link FileInfo}
*/
private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException {
final int remaining = b.remaining();
if (remaining != CodecUtil.footerLength()) {
return false;
}
final long position = getFilePointer() + this.offset;
if (position != fileInfo.length() - CodecUtil.footerLength()) {
return false;
}
if (isClone) {
return false;
}
boolean success = false;
try {
b.put(checksumToBytesArray(fileInfo.checksum()));
success = true;
} catch (NumberFormatException e) {
// tests disable this optimisation by passing an invalid checksum
} finally {
assert b.remaining() == (success ? 0L : remaining) : b.remaining() + " remaining bytes but success is " + success;
}
return success;
}

/**
* Opens an {@link InputStream} for the given range of bytes which reads the data directly from the blob store. If the requested range
* spans multiple blobs then this stream will request them in turn.
Expand Down Expand Up @@ -173,11 +217,18 @@ public final void close() throws IOException {
if (isClone == false) {
stats.incrementCloseCount();
}
innerClose();
doClose();
}
}

public abstract void innerClose() throws IOException;
public abstract void doClose() throws IOException;

protected void ensureContext(Predicate<IOContext> predicate) throws IOException {
if (predicate.test(context) == false) {
assert false : "this method should not be used with this context " + context;
throw new IOException("Cannot read the index input using context [context=" + context + ", input=" + this + ']');
}
}

protected final boolean assertCurrentThreadMayAccessBlobStore() {
final String threadName = Thread.currentThread().getName();
Expand All @@ -199,4 +250,15 @@ protected final boolean assertCurrentThreadMayAccessBlobStore() {
return true;
}

protected static boolean isCacheFetchAsyncThread(final String threadName) {
return threadName.contains('[' + SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME + ']');
}

protected static boolean assertCurrentThreadIsNotCacheFetchAsync() {
final String threadName = Thread.currentThread().getName();
assert false == isCacheFetchAsyncThread(threadName) : "expected the current thread ["
+ threadName
+ "] to belong to the cache fetch async thread pool";
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
Expand All @@ -29,7 +28,6 @@
import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput;
import org.elasticsearch.index.store.IndexInputStats;
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange;

import java.io.EOFException;
Expand All @@ -42,10 +40,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.IntStream;

import static org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput.checksumToBytesArray;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes;

public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput {
Expand Down Expand Up @@ -106,7 +102,7 @@ private CachedBlobContainerIndexInput(
int rangeSize,
int recoveryRangeSize
) {
super(resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length);
super(logger, resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length);
this.directory = directory;
this.cacheFileReference = cacheFileReference;
this.lastReadPosition = this.offset;
Expand All @@ -116,19 +112,12 @@ private CachedBlobContainerIndexInput(
}

@Override
public void innerClose() {
public void doClose() {
if (isClone == false) {
cacheFileReference.releaseOnClose();
}
}

private void ensureContext(Predicate<IOContext> predicate) throws IOException {
if (predicate.test(context) == false) {
assert false : "this method should not be used with this context " + context;
throw new IOException("Cannot read the index input using context [context=" + context + ", input=" + this + ']');
}
}

private long getDefaultRangeSize() {
return (context != CACHE_WARMING_CONTEXT)
? (directory.isRecoveryFinalized() ? defaultRangeSize : recoveryRangeSize)
Expand All @@ -143,24 +132,12 @@ private ByteRange computeRange(long position) {
}

@Override
protected void readInternal(ByteBuffer b) throws IOException {
protected void doReadInternal(ByteBuffer b) throws IOException {
ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT);
assert assertCurrentThreadIsNotCacheFetchAsync();
final long position = getFilePointer() + this.offset;
final int length = b.remaining();

// We can detect that we're going to read the last 16 bytes (that contains the footer checksum) of the file. Such reads are often
// executed when opening a Directory and since we have the checksum in the snapshot metadata we can use it to fill the ByteBuffer.
if (length == CodecUtil.footerLength() && isClone == false && position == fileInfo.length() - length) {
if (readChecksumFromFileInfo(b)) {
logger.trace("read footer of file [{}] at position [{}], bypassing all caches", fileInfo.physicalName(), position);
return;
}
assert b.remaining() == length;
}

logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this);

try {
final CacheFile cacheFile = cacheFileReference.get();

Expand Down Expand Up @@ -401,26 +378,6 @@ private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e
throw new IOException("failed to read data from cache", e);
}

private boolean readChecksumFromFileInfo(ByteBuffer b) throws IOException {
assert isClone == false;
byte[] footer;
try {
footer = checksumToBytesArray(fileInfo.checksum());
} catch (NumberFormatException e) {
// tests disable this optimisation by passing an invalid checksum
footer = null;
}
if (footer == null) {
return false;
}

b.put(footer);
assert b.remaining() == 0L;
return true;

// TODO we should add this to DirectBlobContainerIndexInput too.
}

/**
* Prefetches a complete part and writes it in cache. This method is used to prewarm the cache.
* @return a tuple with {@code Tuple<Persistent Cache Length, Prefetched Length>} values
Expand Down Expand Up @@ -737,23 +694,11 @@ private static boolean assertFileChannelOpen(FileChannel fileChannel) {
return true;
}

private static boolean isCacheFetchAsyncThread(final String threadName) {
return threadName.contains('[' + SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME + ']');
}

private static boolean assertCurrentThreadMayWriteCacheFile() {
final String threadName = Thread.currentThread().getName();
assert isCacheFetchAsyncThread(threadName) : "expected the current thread ["
+ threadName
+ "] to belong to the cache fetch async thread pool";
return true;
}

private static boolean assertCurrentThreadIsNotCacheFetchAsync() {
final String threadName = Thread.currentThread().getName();
assert false == isCacheFetchAsyncThread(threadName) : "expected the current thread ["
+ threadName
+ "] to belong to the cache fetch async thread pool";
return true;
}
}
Loading