Skip to content

Commit 28f1ded

Browse files
mehakmeetsteveloughran
authored andcommitted
HADOOP-17113. Adding ReadAhead Counters in ABFS (#2154)
Contributed by Mehakmeet Singh Change-Id: I6bbd8165385a9267ed64831bb1efa18b6554feb1
1 parent 7970710 commit 28f1ded

File tree

4 files changed

+149
-0
lines changed

4 files changed

+149
-0
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,9 @@ private int readInternal(final long position, final byte[] b, final int offset,
238238
if (receivedBytes > 0) {
239239
incrementReadOps();
240240
LOG.debug("Received data from read ahead, not doing remote read");
241+
if (streamStatistics != null) {
242+
streamStatistics.readAheadBytesRead(receivedBytes);
243+
}
241244
return receivedBytes;
242245
}
243246

@@ -292,6 +295,9 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
292295
throw new IOException(ex);
293296
}
294297
long bytesRead = op.getResult().getBytesReceived();
298+
if (streamStatistics != null) {
299+
streamStatistics.remoteBytesRead(bytesRead);
300+
}
295301
if (bytesRead > Integer.MAX_VALUE) {
296302
throw new IOException("Unexpected Content-Length");
297303
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,18 @@ public interface AbfsInputStreamStatistics {
8484
*/
8585
void remoteReadOperation();
8686

87+
/**
88+
* Records the bytes read from readAhead buffer.
89+
* @param bytes the bytes to be incremented.
90+
*/
91+
void readAheadBytesRead(long bytes);
92+
93+
/**
94+
* Records bytes read remotely after nothing from readAheadBuffer was read.
95+
* @param bytes the bytes to be incremented.
96+
*/
97+
void remoteBytesRead(long bytes);
98+
8799
/**
88100
* Makes the string of all the AbfsInputStream statistics.
89101
* @return the string with all the statistics.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public class AbfsInputStreamStatisticsImpl
3333
private long readOperations;
3434
private long bytesReadFromBuffer;
3535
private long remoteReadOperations;
36+
private long readAheadBytesRead;
37+
private long remoteBytesRead;
3638

3739
/**
3840
* Seek backwards, incrementing the seek and backward seek counters.
@@ -128,6 +130,30 @@ public void readOperationStarted(long pos, long len) {
128130
readOperations++;
129131
}
130132

133+
/**
134+
* Total bytes read from readAhead buffer during a read operation.
135+
*
136+
* @param bytes the bytes to be incremented.
137+
*/
138+
@Override
139+
public void readAheadBytesRead(long bytes) {
140+
if (bytes > 0) {
141+
readAheadBytesRead += bytes;
142+
}
143+
}
144+
145+
/**
146+
* Total bytes read remotely after nothing was read from readAhead buffer.
147+
*
148+
* @param bytes the bytes to be incremented.
149+
*/
150+
@Override
151+
public void remoteBytesRead(long bytes) {
152+
if (bytes > 0) {
153+
remoteBytesRead += bytes;
154+
}
155+
}
156+
131157
/**
132158
* {@inheritDoc}
133159
*
@@ -178,6 +204,14 @@ public long getRemoteReadOperations() {
178204
return remoteReadOperations;
179205
}
180206

207+
public long getReadAheadBytesRead() {
208+
return readAheadBytesRead;
209+
}
210+
211+
public long getRemoteBytesRead() {
212+
return remoteBytesRead;
213+
}
214+
181215
/**
182216
* String operator describes all the current statistics.
183217
* <b>Important: there are no guarantees as to the stability
@@ -199,6 +233,8 @@ public String toString() {
199233
sb.append(", ReadOperations=").append(readOperations);
200234
sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer);
201235
sb.append(", remoteReadOperations=").append(remoteReadOperations);
236+
sb.append(", readAheadBytesRead=").append(readAheadBytesRead);
237+
sb.append(", remoteBytesRead=").append(remoteBytesRead);
202238
sb.append('}');
203239
return sb.toString();
204240
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.IOException;
2222

23+
import org.assertj.core.api.Assertions;
2324
import org.junit.Test;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
@@ -39,6 +40,10 @@ public class ITestAbfsInputStreamStatistics
3940
LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class);
4041
private static final int ONE_MB = 1024 * 1024;
4142
private static final int ONE_KB = 1024;
43+
private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024;
44+
private static final int CUSTOM_READ_AHEAD_BUFFER_SIZE = 8 * CUSTOM_BLOCK_BUFFER_SIZE;
45+
private static final int THREAD_SLEEP_10_SECONDS = 10;
46+
private static final int TIMEOUT_30_SECONDS = 30000;
4247
private byte[] defBuffer = new byte[ONE_MB];
4348

4449
public ITestAbfsInputStreamStatistics() throws Exception {
@@ -75,6 +80,8 @@ public void testInitValues() throws IOException {
7580
checkInitValue(stats.getReadOperations(), "readOps");
7681
checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer");
7782
checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps");
83+
checkInitValue(stats.getReadAheadBytesRead(), "readAheadBytesRead");
84+
checkInitValue(stats.getRemoteBytesRead(), "readAheadRemoteBytesRead");
7885

7986
} finally {
8087
IOUtils.cleanupWithLogger(LOG, outputStream, inputStream);
@@ -285,6 +292,94 @@ public void testWithNullStreamStatistics() throws IOException {
285292
}
286293
}
287294

295+
/**
296+
* Testing readAhead counters in AbfsInputStream with 30 seconds timeout.
297+
*/
298+
@Test(timeout = TIMEOUT_30_SECONDS)
299+
public void testReadAheadCounters() throws IOException, InterruptedException {
300+
describe("Test to check correct values for readAhead counters in "
301+
+ "AbfsInputStream");
302+
303+
AzureBlobFileSystem fs = getFileSystem();
304+
AzureBlobFileSystemStore abfss = fs.getAbfsStore();
305+
Path readAheadCountersPath = path(getMethodName());
306+
307+
/*
308+
* Setting the block size for readAhead as 4KB.
309+
*/
310+
abfss.getAbfsConfiguration().setReadBufferSize(CUSTOM_BLOCK_BUFFER_SIZE);
311+
312+
AbfsOutputStream out = null;
313+
AbfsInputStream in = null;
314+
315+
try {
316+
317+
/*
318+
* Creating a file of 1MB size.
319+
*/
320+
out = createAbfsOutputStreamWithFlushEnabled(fs, readAheadCountersPath);
321+
out.write(defBuffer);
322+
out.close();
323+
324+
in = abfss.openFileForRead(readAheadCountersPath, fs.getFsStatistics());
325+
326+
/*
327+
* Reading 1KB after each i * KB positions. Hence the reads are from 0
328+
* to 1KB, 1KB to 2KB, and so on.. for 5 operations.
329+
*/
330+
for (int i = 0; i < 5; i++) {
331+
in.seek(ONE_KB * i);
332+
in.read(defBuffer, ONE_KB * i, ONE_KB);
333+
}
334+
AbfsInputStreamStatisticsImpl stats =
335+
(AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
336+
337+
/*
338+
* Since, readAhead is done in background threads. Sometimes, the
339+
* threads aren't finished in the background and could result in
340+
* inaccurate results. So, we wait till we have the accurate values
341+
* with a limit of 30 seconds as that's when the test times out.
342+
*
343+
*/
344+
while (stats.getRemoteBytesRead() < CUSTOM_READ_AHEAD_BUFFER_SIZE
345+
|| stats.getReadAheadBytesRead() < CUSTOM_BLOCK_BUFFER_SIZE) {
346+
Thread.sleep(THREAD_SLEEP_10_SECONDS);
347+
}
348+
349+
/*
350+
* Verifying the counter values of readAheadBytesRead and remoteBytesRead.
351+
*
352+
* readAheadBytesRead : Since, we read 1KBs 5 times, that means we go
353+
* from 0 to 5KB in the file. The bufferSize is set to 4KB, and since
354+
* we have 8 blocks of readAhead buffer. We would have 8 blocks of 4KB
355+
* buffer. Our read is till 5KB, hence readAhead would ideally read 2
356+
* blocks of 4KB which is equal to 8KB. But, sometimes to get more than
357+
* one block from readAhead buffer we might have to wait for background
358+
* threads to fill the buffer and hence we might do remote read which
359+
* would be faster. Therefore, readAheadBytesRead would be equal to or
360+
* greater than 4KB.
361+
*
362+
* remoteBytesRead : Since, the bufferSize is set to 4KB and the number
363+
* of blocks or readAheadQueueDepth is equal to 8. We would read 8 * 4
364+
* KB buffer on the first read, which is equal to 32KB. But, if we are not
365+
* able to read some bytes that were in the buffer after doing
366+
* readAhead, we might use remote read again. Thus, the bytes read
367+
* remotely could also be greater than 32Kb.
368+
*
369+
*/
370+
Assertions.assertThat(stats.getReadAheadBytesRead()).describedAs(
371+
"Mismatch in readAheadBytesRead counter value")
372+
.isGreaterThanOrEqualTo(CUSTOM_BLOCK_BUFFER_SIZE);
373+
374+
Assertions.assertThat(stats.getRemoteBytesRead()).describedAs(
375+
"Mismatch in remoteBytesRead counter value")
376+
.isGreaterThanOrEqualTo(CUSTOM_READ_AHEAD_BUFFER_SIZE);
377+
378+
} finally {
379+
IOUtils.cleanupWithLogger(LOG, out, in);
380+
}
381+
}
382+
288383
/**
289384
* Method to assert the initial values of the statistics.
290385
*

0 commit comments

Comments
 (0)