Skip to content

CacheBufferedIndexInput should throw EOFException #53975

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -128,15 +129,15 @@ protected void readInternal(final byte[] buffer, final int offset, final int len
lastSeekPosition = lastReadPosition;
}

int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException {
private int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException {
assert assertFileChannelOpen(fc);
int bytesRead = Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position)));
stats.addCachedBytesRead(bytesRead);
return bytesRead;
}

@SuppressForbidden(reason = "Use positional writes on purpose")
void writeCacheFile(FileChannel fc, long start, long end) throws IOException {
private void writeCacheFile(FileChannel fc, long start, long end) throws IOException {
assert assertFileChannelOpen(fc);
final long length = end - start;
final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))];
Expand All @@ -150,6 +151,10 @@ void writeCacheFile(FileChannel fc, long start, long end) throws IOException {
while (remaining > 0) {
final int len = (remaining < copyBuffer.length) ? Math.toIntExact(remaining) : copyBuffer.length;
int bytesRead = input.read(copyBuffer, 0, len);
if (bytesRead == -1) {
throw new EOFException(String.format(Locale.ROOT,
"unexpected EOF reading [%d-%d] ([%d] bytes remaining) from %s", start, end, remaining, cacheFileReference));
}
fc.write(ByteBuffer.wrap(copyBuffer, 0, bytesRead), start + bytesCopied);
bytesCopied += bytesRead;
remaining -= bytesRead;
Expand Down Expand Up @@ -214,7 +219,11 @@ private int readDirectly(long start, long end, byte[] buffer, int offset) throws
while (remaining > 0) {
final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length;
int bytesRead = input.read(copyBuffer, 0, len);
System.arraycopy(copyBuffer, 0, buffer, offset + bytesCopied, len);
if (bytesRead == -1) {
throw new EOFException(String.format(Locale.ROOT,
"unexpected EOF reading [%d-%d] ([%d] bytes remaining) from %s", start, end, remaining, cacheFileReference));
}
System.arraycopy(copyBuffer, 0, buffer, offset + bytesCopied, bytesRead);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaces len with bytesRead in case of a partial read. I think this was technically ok before since we would proceed to overwrite the junk bytes with correct ones, but it was confusing.

bytesCopied += bytesRead;
remaining -= bytesRead;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.mockstore.BlobContainerWrapper;

import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -77,6 +79,52 @@ public void testRandomReads() throws IOException {
}
}

public void testThrowsEOFException() throws IOException {
try (CacheService cacheService = createCacheService(random())) {
cacheService.start();

SnapshotId snapshotId = new SnapshotId("_name", "_uuid");
IndexId indexId = new IndexId("_name", "_uuid");
ShardId shardId = new ShardId("_name", "_uuid", 0);

final String fileName = randomAlphaOfLength(10);
final byte[] input = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8);

final String blobName = randomUnicodeOfLength(10);
final StoreFileMetaData metaData = new StoreFileMetaData(fileName, input.length + 1, "_na", Version.CURRENT.luceneVersion);
final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L,
List.of(new BlobStoreIndexShardSnapshot.FileInfo(blobName, metaData, new ByteSizeValue(input.length + 1))), 0L, 0L, 0, 0L);

final BlobContainer blobContainer = singleBlobContainer(blobName, input);

final Path cacheDir = createTempDir();
try (CacheDirectory cacheDirectory
= new CacheDirectory(snapshot, blobContainer, cacheService, cacheDir, snapshotId, indexId, shardId, () -> 0L)) {
try (IndexInput indexInput = cacheDirectory.openInput(fileName, newIOContext(random()))) {
final byte[] buffer = new byte[input.length + 1];
final IOException exception = expectThrows(IOException.class, () -> indexInput.readBytes(buffer, 0, buffer.length));
if (containsEOFException(exception, new HashSet<>()) == false) {
throw new AssertionError("inner EOFException not thrown", exception);
}
}
}
}
}

private boolean containsEOFException(Throwable throwable, HashSet<Throwable> seenThrowables) {
if (throwable == null || seenThrowables.add(throwable) == false) {
return false;
}
if (throwable instanceof EOFException) {
return true;
}
for (Throwable suppressed : throwable.getSuppressed()) {
if (containsEOFException(suppressed, seenThrowables)) {
return true;
}
}
return containsEOFException(throwable.getCause(), seenThrowables);
}

/**
* BlobContainer that counts the number of {@link java.io.InputStream} it opens, as well as the
Expand Down