Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,

/** Stream statistics. */
private final AbfsInputStreamStatistics streamStatistics;
private long bytesFromReadAhead; // bytes read from readAhead; for testing
private long bytesFromRemoteRead; // bytes read remotely; for testing

public AbfsInputStream(
final AbfsClient client,
Expand Down Expand Up @@ -235,6 +237,7 @@ private int readInternal(final long position, final byte[] b, final int offset,

// try reading from buffers first
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
bytesFromReadAhead += receivedBytes;
if (receivedBytes > 0) {
incrementReadOps();
LOG.debug("Received data from read ahead, not doing remote read");
Expand Down Expand Up @@ -302,6 +305,7 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
throw new IOException("Unexpected Content-Length");
}
LOG.debug("HTTP request read bytes = {}", bytesRead);
bytesFromRemoteRead += bytesRead;
return (int) bytesRead;
}

Expand Down Expand Up @@ -503,6 +507,26 @@ public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}

/**
* Getter for bytes read from readAhead buffer that fills asynchronously.
*
* @return value of the counter in long.
*/
@VisibleForTesting
public long getBytesFromReadAhead() {
return bytesFromReadAhead;
}

/**
* Getter for bytes read remotely from the data store.
*
* @return value of the counter in long.
*/
@VisibleForTesting
public long getBytesFromRemoteRead() {
return bytesFromRemoteRead;
}

/**
* Get the statistics of the stream.
* @return a string value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ public class ITestAbfsInputStreamStatistics
private static final int ONE_MB = 1024 * 1024;
private static final int ONE_KB = 1024;
private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024;
private static final int CUSTOM_READ_AHEAD_BUFFER_SIZE = 8 * CUSTOM_BLOCK_BUFFER_SIZE;
private static final int THREAD_SLEEP_10_SECONDS = 10;
private static final int TIMEOUT_30_SECONDS = 30000;
private byte[] defBuffer = new byte[ONE_MB];

public ITestAbfsInputStreamStatistics() throws Exception {
Expand Down Expand Up @@ -295,8 +292,8 @@ public void testWithNullStreamStatistics() throws IOException {
/**
* Testing readAhead counters in AbfsInputStream with 30 seconds timeout.
*/
@Test(timeout = TIMEOUT_30_SECONDS)
public void testReadAheadCounters() throws IOException, InterruptedException {
@Test
public void testReadAheadCounters() throws IOException {
describe("Test to check correct values for readAhead counters in "
+ "AbfsInputStream");

Expand Down Expand Up @@ -334,46 +331,35 @@ public void testReadAheadCounters() throws IOException, InterruptedException {
AbfsInputStreamStatisticsImpl stats =
(AbfsInputStreamStatisticsImpl) in.getStreamStatistics();

/*
* Since, readAhead is done in background threads. Sometimes, the
* threads aren't finished in the background and could result in
* inaccurate results. So, we wait till we have the accurate values
* with a limit of 30 seconds as that's when the test times out.
*
*/
while (stats.getRemoteBytesRead() < CUSTOM_READ_AHEAD_BUFFER_SIZE
|| stats.getReadAheadBytesRead() < CUSTOM_BLOCK_BUFFER_SIZE) {
Thread.sleep(THREAD_SLEEP_10_SECONDS);
}

/*
* Verifying the counter values of readAheadBytesRead and remoteBytesRead.
*
* readAheadBytesRead : Since, we read 1KBs 5 times, that means we go
* from 0 to 5KB in the file. The bufferSize is set to 4KB, and since
* we have 8 blocks of readAhead buffer. We would have 8 blocks of 4KB
* buffer. Our read is till 5KB, hence readAhead would ideally read 2
* blocks of 4KB which is equal to 8KB. But, sometimes to get more than
* one block from readAhead buffer we might have to wait for background
* blocks of 4KB which is equal to 8KB. But, sometimes to get blocks
* from readAhead buffer we might have to wait for background
* threads to fill the buffer and hence we might do remote read which
* would be faster. Therefore, readAheadBytesRead would be equal to or
* greater than 4KB.
* would be faster. Therefore, readAheadBytesRead would be greater than
* or equal to the value of bytesFromReadAhead at the point we measure it.
*
* remoteBytesRead : Since, the bufferSize is set to 4KB and the number
* of blocks or readAheadQueueDepth is equal to 8. We would read 8 * 4
* KB buffer on the first read, which is equal to 32KB. But, if we are not
* able to read some bytes that were in the buffer after doing
* readAhead, we might use remote read again. Thus, the bytes read
* remotely could also be greater than 32Kb.
* remotely would be greater than or equal to the bytesFromRemoteRead
* value that we measure at some point of the operation.
*
*/
Assertions.assertThat(stats.getReadAheadBytesRead()).describedAs(
"Mismatch in readAheadBytesRead counter value")
.isGreaterThanOrEqualTo(CUSTOM_BLOCK_BUFFER_SIZE);
.isGreaterThanOrEqualTo(in.getBytesFromReadAhead());

Assertions.assertThat(stats.getRemoteBytesRead()).describedAs(
"Mismatch in remoteBytesRead counter value")
.isGreaterThanOrEqualTo(CUSTOM_READ_AHEAD_BUFFER_SIZE);
.isGreaterThanOrEqualTo(in.getBytesFromRemoteRead());

} finally {
IOUtils.cleanupWithLogger(LOG, out, in);
Expand Down