Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;

import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand All @@ -43,13 +42,14 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.test.LambdaTestUtils;

import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;

@RunWith(Parameterized.class)
public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {
Expand Down Expand Up @@ -281,16 +281,11 @@ public void testEOFRanges() throws Exception {
in.readVectored(fileRanges, allocate);
for (FileRange res : fileRanges) {
CompletableFuture<ByteBuffer> data = res.getData();
try {
ByteBuffer buffer = data.get();
// Shouldn't reach here.
Assert.fail("EOFException must be thrown while reading EOF");
} catch (ExecutionException ex) {
// ignore as expected.
} catch (Exception ex) {
LOG.error("Exception while running vectored read ", ex);
Assert.fail("Exception while running vectored read " + ex);
}
interceptFuture(EOFException.class,
"",
ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
TimeUnit.SECONDS,
data);
}
}
}
Expand Down Expand Up @@ -410,7 +405,7 @@ protected <T extends Throwable> void verifyExceptionalVectoredRead(
fs.openFile(path(VECTORED_READ_FILE_NAME))
.build();
try (FSDataInputStream in = builder.get()) {
LambdaTestUtils.intercept(clazz,
intercept(clazz,
() -> in.readVectored(fileRanges, allocate));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,23 @@ public enum Statistic {
StreamStatisticNames.STREAM_READ_OPERATIONS,
"Count of read() operations in an input stream",
TYPE_COUNTER),
STREAM_READ_VECTORED_OPERATIONS(
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
"Count of readVectored() operations in an input stream.",
TYPE_COUNTER),
STREAM_READ_VECTORED_READ_BYTES_DISCARDED(
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
"Count of bytes discarded during readVectored() operation." +
" in an input stream",
TYPE_COUNTER),
STREAM_READ_VECTORED_INCOMING_RANGES(
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
"Count of incoming file ranges during readVectored() operation.",
TYPE_COUNTER),
STREAM_READ_VECTORED_COMBINED_RANGES(
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
"Count of combined file ranges during readVectored() operation.",
TYPE_COUNTER),
STREAM_READ_REMOTE_STREAM_ABORTED(
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
"Duration of aborting a remote stream during stream IO",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,146 +176,172 @@ public void testSameRanges() throws Exception {
* */
@Test
public void testNormalReadVsVectoredReadStatsCollection() throws Exception {
FileSystem fs = getTestFileSystemWithReadAheadDisabled();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));

FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_NAME))
.withFileStatus(fileStatus)
.build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(fileRanges, getAllocate());
validateVectoredReadResult(fileRanges, DATASET);
returnBuffersToPoolPostRead(fileRanges, getPool());

// audit the io statistics for this stream
IOStatistics st = in.getIOStatistics();
LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st));

// the vectored io operation must be tracked
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
1);

// the vectored io operation is being called with 5 input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
5);

// 5 input ranges got combined in 3 as some of them are close.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
3);

// number of bytes discarded will be based on the above input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
5944);

verifyStatisticCounterValue(st,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
3);

// read bytes should match the sum of requested length for each input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_BYTES,
1424);

}

CompletableFuture<FSDataInputStream> builder1 =
fs.openFile(path(VECTORED_READ_FILE_NAME))
.withFileStatus(fileStatus)
.build();
try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) {
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));

FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_NAME))
.withFileStatus(fileStatus)
.build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(fileRanges, getAllocate());
validateVectoredReadResult(fileRanges, DATASET);
returnBuffersToPoolPostRead(fileRanges, getPool());

// audit the io statistics for this stream
IOStatistics st = in.getIOStatistics();
LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st));

// the vectored io operation must be tracked
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
1);

// the vectored io operation is being called with 5 input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
5);

// 5 input ranges got combined in 3 as some of them are close.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
3);

// number of bytes discarded will be based on the above input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
5944);

verifyStatisticCounterValue(st,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
3);

// read bytes should match the sum of requested length for each input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_BYTES,
1424);

try (FSDataInputStream in = builder1.get()) {
for (FileRange range : fileRanges) {
byte[] temp = new byte[range.getLength()];
in.readFully((int) range.getOffset(), temp, 0, range.getLength());
}

// audit the statistics for this stream
IOStatistics st = in.getIOStatistics();
LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st));

verifyStatisticCounterValue(st,
CompletableFuture<FSDataInputStream> builder1 =
fs.openFile(path(VECTORED_READ_FILE_NAME))
.withFileStatus(fileStatus)
.build();

try (FSDataInputStream in = builder1.get()) {
for (FileRange range : fileRanges) {
byte[] temp = new byte[range.getLength()];
in.readFully((int) range.getOffset(), temp, 0, range.getLength());
}

// audit the statistics for this stream
IOStatistics st = in.getIOStatistics();
LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st));

verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
0);

// all other counter values consistent.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
0);
verifyStatisticCounterValue(st,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
5);

// read bytes should match the sum of requested length for each input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_BYTES,
1424);
}
// validate stats are getting merged at fs instance level.
IOStatistics fsStats = fs.getIOStatistics();
// only 1 vectored io call is made in this fs instance.
verifyStatisticCounterValue(fsStats,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
0);

// all other counter values consistent.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
0);
verifyStatisticCounterValue(st,
1);
// 8 get requests were made in this fs instance.
verifyStatisticCounterValue(fsStats,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
5);
8);

// read bytes should match the sum of requested length for each input ranges.
verifyStatisticCounterValue(st,
verifyStatisticCounterValue(fsStats,
StreamStatisticNames.STREAM_READ_BYTES,
1424);
2848);
}
}

@Test
public void testMultiVectoredReadStatsCollection() throws Exception {
FileSystem fs = getTestFileSystemWithReadAheadDisabled();
List<FileRange> ranges1 = getConsecutiveRanges();
List<FileRange> ranges2 = getConsecutiveRanges();
FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_NAME))
.withFileStatus(fileStatus)
.build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(ranges1, getAllocate());
in.readVectored(ranges2, getAllocate());
validateVectoredReadResult(ranges1, DATASET);
validateVectoredReadResult(ranges2, DATASET);
returnBuffersToPoolPostRead(ranges1, getPool());
returnBuffersToPoolPostRead(ranges2, getPool());

// audit the io statistics for this stream
IOStatistics st = in.getIOStatistics();

// 2 vectored io calls are made above.
verifyStatisticCounterValue(st,
try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) {
List<FileRange> ranges1 = getConsecutiveRanges();
List<FileRange> ranges2 = getConsecutiveRanges();
FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_NAME))
.withFileStatus(fileStatus)
.build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(ranges1, getAllocate());
in.readVectored(ranges2, getAllocate());
validateVectoredReadResult(ranges1, DATASET);
validateVectoredReadResult(ranges2, DATASET);
returnBuffersToPoolPostRead(ranges1, getPool());
returnBuffersToPoolPostRead(ranges2, getPool());

// audit the io statistics for this stream
IOStatistics st = in.getIOStatistics();

// 2 vectored io calls are made above.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
2);

// 2 vectored io operation is being called with 2 input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
4);

// 2 ranges are getting merged in 1 during both vectored io operation.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
2);

// number of bytes discarded will be 0 as the ranges are consecutive.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
0);
// only 2 http get request will be made because ranges in both range list will be merged
// to 1 because they are consecutive.
verifyStatisticCounterValue(st,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
2);
// read bytes should match the sum of requested length for each input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_BYTES,
2000);
}
IOStatistics fsStats = fs.getIOStatistics();
// 2 vectored io calls are made in this fs instance.
verifyStatisticCounterValue(fsStats,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
2);

// 2 vectored io operation is being called with 2 input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
4);

// 2 ranges are getting merged in 1 during both vectored io operation.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
2);

// number of bytes discarded will be 0 as the ranges are consecutive.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
0);
// only 2 http get request will be made because ranges in both range list will be merged
// to 1 because they are consecutive.
verifyStatisticCounterValue(st,
// 2 get requests were made in this fs instance.
verifyStatisticCounterValue(fsStats,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
2);
// read bytes should match the sum of requested length for each input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_BYTES,
2000);
}
}

private FileSystem getTestFileSystemWithReadAheadDisabled() throws IOException {
private S3AFileSystem getTestFileSystemWithReadAheadDisabled() throws IOException {
Configuration conf = getFileSystem().getConf();
// also resetting the min seek and max size values is important
// as this same test suite has test which overrides these params.
Expand Down