Skip to content

Commit 23dbc20

Browse files
sahilTakiardeepakdamri
authored andcommitted
HDFS-3246: pRead equivalent for direct read path (apache#597)
HDFS-3246: pRead equivalent for direct read path Contributed by Sahil Takiar
1 parent 61b8764 commit 23dbc20

File tree

10 files changed

+854
-52
lines changed

10 files changed

+854
-52
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
public class FSDataInputStream extends DataInputStream
3939
implements Seekable, PositionedReadable,
4040
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
41-
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
41+
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
42+
ByteBufferPositionedReadable {
4243
/**
4344
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
4445
* objects
@@ -147,7 +148,8 @@ public int read(ByteBuffer buf) throws IOException {
147148
return ((ByteBufferReadable)in).read(buf);
148149
}
149150

150-
throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
151+
throw new UnsupportedOperationException("Byte-buffer read unsupported " +
152+
"by input stream");
151153
}
152154

153155
@Override
@@ -246,4 +248,13 @@ public boolean hasCapability(String capability) {
246248
public String toString() {
247249
return super.toString() + ": " + in;
248250
}
251+
252+
@Override
253+
public int read(long position, ByteBuffer buf) throws IOException {
254+
if (in instanceof ByteBufferPositionedReadable) {
255+
return ((ByteBufferPositionedReadable) in).read(position, buf);
256+
}
257+
throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
258+
"by input stream");
259+
}
249260
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ public interface StreamCapabilities {
5959
*/
6060
String UNBUFFER = "in:unbuffer";
6161

62+
/**
63+
* Stream read(ByteBuffer) capability implemented by
64+
* {@link ByteBufferReadable#read(java.nio.ByteBuffer)}.
65+
*/
66+
String READBYTEBUFFER = "in:readbytebuffer";
67+
68+
/**
69+
* Stream read(long, ByteBuffer) capability implemented by
70+
* {@link ByteBufferPositionedReadable#read(long, java.nio.ByteBuffer)}.
71+
*/
72+
String PREADBYTEBUFFER = "in:preadbytebuffer";
73+
6274
/**
6375
* Capabilities that a stream can support and be queried for.
6476
*/

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java

Lines changed: 175 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.EnumSet;
2727
import java.util.Random;
2828

29+
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
2930
import org.apache.hadoop.fs.ByteBufferReadable;
3031
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
3132
import org.apache.hadoop.fs.CanUnbuffer;
@@ -130,6 +131,32 @@ private void preadCheck(PositionedReadable in) throws Exception {
130131
Assert.assertArrayEquals(result, expectedData);
131132
}
132133

134+
private int byteBufferPreadAll(ByteBufferPositionedReadable in,
135+
ByteBuffer buf) throws IOException {
136+
int n = 0;
137+
int total = 0;
138+
while (n != -1) {
139+
total += n;
140+
if (!buf.hasRemaining()) {
141+
break;
142+
}
143+
n = in.read(total, buf);
144+
}
145+
146+
return total;
147+
}
148+
149+
private void byteBufferPreadCheck(ByteBufferPositionedReadable in)
150+
throws Exception {
151+
ByteBuffer result = ByteBuffer.allocate(dataLen);
152+
int n = byteBufferPreadAll(in, result);
153+
154+
Assert.assertEquals(dataLen, n);
155+
ByteBuffer expectedData = ByteBuffer.allocate(n);
156+
expectedData.put(data, 0, n);
157+
Assert.assertArrayEquals(result.array(), expectedData.array());
158+
}
159+
133160
protected OutputStream getOutputStream(int bufferSize) throws IOException {
134161
return getOutputStream(bufferSize, key, iv);
135162
}
@@ -289,20 +316,36 @@ private int readAll(InputStream in, long pos, byte[] b, int off, int len)
289316

290317
return total;
291318
}
319+
320+
private int readAll(InputStream in, long pos, ByteBuffer buf)
321+
throws IOException {
322+
int n = 0;
323+
int total = 0;
324+
while (n != -1) {
325+
total += n;
326+
if (!buf.hasRemaining()) {
327+
break;
328+
}
329+
n = ((ByteBufferPositionedReadable) in).read(pos + total, buf);
330+
}
331+
332+
return total;
333+
}
292334

293335
/** Test positioned read. */
294336
@Test(timeout=120000)
295337
public void testPositionedRead() throws Exception {
296-
OutputStream out = getOutputStream(defaultBufferSize);
297-
writeData(out);
338+
try (OutputStream out = getOutputStream(defaultBufferSize)) {
339+
writeData(out);
340+
}
298341

299-
InputStream in = getInputStream(defaultBufferSize);
300-
// Pos: 1/3 dataLen
301-
positionedReadCheck(in , dataLen / 3);
342+
try (InputStream in = getInputStream(defaultBufferSize)) {
343+
// Pos: 1/3 dataLen
344+
positionedReadCheck(in, dataLen / 3);
302345

303-
// Pos: 1/2 dataLen
304-
positionedReadCheck(in, dataLen / 2);
305-
in.close();
346+
// Pos: 1/2 dataLen
347+
positionedReadCheck(in, dataLen / 2);
348+
}
306349
}
307350

308351
private void positionedReadCheck(InputStream in, int pos) throws Exception {
@@ -316,6 +359,35 @@ private void positionedReadCheck(InputStream in, int pos) throws Exception {
316359
System.arraycopy(data, pos, expectedData, 0, n);
317360
Assert.assertArrayEquals(readData, expectedData);
318361
}
362+
363+
/** Test positioned read with ByteBuffers. */
364+
@Test(timeout=120000)
365+
public void testPositionedReadWithByteBuffer() throws Exception {
366+
try (OutputStream out = getOutputStream(defaultBufferSize)) {
367+
writeData(out);
368+
}
369+
370+
try (InputStream in = getInputStream(defaultBufferSize)) {
371+
// Pos: 1/3 dataLen
372+
positionedReadCheckWithByteBuffer(in, dataLen / 3);
373+
374+
// Pos: 1/2 dataLen
375+
positionedReadCheckWithByteBuffer(in, dataLen / 2);
376+
}
377+
}
378+
379+
private void positionedReadCheckWithByteBuffer(InputStream in, int pos)
380+
throws Exception {
381+
ByteBuffer result = ByteBuffer.allocate(dataLen);
382+
int n = readAll(in, pos, result);
383+
384+
Assert.assertEquals(dataLen, n + pos);
385+
byte[] readData = new byte[n];
386+
System.arraycopy(result.array(), 0, readData, 0, n);
387+
byte[] expectedData = new byte[n];
388+
System.arraycopy(data, pos, expectedData, 0, n);
389+
Assert.assertArrayEquals(readData, expectedData);
390+
}
319391

320392
/** Test read fully. */
321393
@Test(timeout=120000)
@@ -559,12 +631,40 @@ private void byteBufferReadCheck(InputStream in, ByteBuffer buf,
559631
System.arraycopy(data, 0, expectedData, 0, n);
560632
Assert.assertArrayEquals(readData, expectedData);
561633
}
634+
635+
private void byteBufferPreadCheck(InputStream in, ByteBuffer buf,
636+
int bufPos) throws Exception {
637+
// Test reading from position 0
638+
buf.position(bufPos);
639+
int n = ((ByteBufferPositionedReadable) in).read(0, buf);
640+
Assert.assertEquals(bufPos + n, buf.position());
641+
byte[] readData = new byte[n];
642+
buf.rewind();
643+
buf.position(bufPos);
644+
buf.get(readData);
645+
byte[] expectedData = new byte[n];
646+
System.arraycopy(data, 0, expectedData, 0, n);
647+
Assert.assertArrayEquals(readData, expectedData);
648+
649+
// Test reading from half way through the data
650+
buf.position(bufPos);
651+
n = ((ByteBufferPositionedReadable) in).read(dataLen / 2, buf);
652+
Assert.assertEquals(bufPos + n, buf.position());
653+
readData = new byte[n];
654+
buf.rewind();
655+
buf.position(bufPos);
656+
buf.get(readData);
657+
expectedData = new byte[n];
658+
System.arraycopy(data, dataLen / 2, expectedData, 0, n);
659+
Assert.assertArrayEquals(readData, expectedData);
660+
}
562661

563662
/** Test byte buffer read with different buffer size. */
564663
@Test(timeout=120000)
565664
public void testByteBufferRead() throws Exception {
566-
OutputStream out = getOutputStream(defaultBufferSize);
567-
writeData(out);
665+
try (OutputStream out = getOutputStream(defaultBufferSize)) {
666+
writeData(out);
667+
}
568668

569669
// Default buffer size, initial buffer position is 0
570670
InputStream in = getInputStream(defaultBufferSize);
@@ -614,6 +714,53 @@ public void testByteBufferRead() throws Exception {
614714
byteBufferReadCheck(in, buf, 11);
615715
in.close();
616716
}
717+
718+
/** Test byte buffer pread with different buffer size. */
719+
@Test(timeout=120000)
720+
public void testByteBufferPread() throws Exception {
721+
try (OutputStream out = getOutputStream(defaultBufferSize)) {
722+
writeData(out);
723+
}
724+
725+
try (InputStream defaultBuf = getInputStream(defaultBufferSize);
726+
InputStream smallBuf = getInputStream(smallBufferSize)) {
727+
728+
ByteBuffer buf = ByteBuffer.allocate(dataLen + 100);
729+
730+
// Default buffer size, initial buffer position is 0
731+
byteBufferPreadCheck(defaultBuf, buf, 0);
732+
733+
// Default buffer size, initial buffer position is not 0
734+
buf.clear();
735+
byteBufferPreadCheck(defaultBuf, buf, 11);
736+
737+
// Small buffer size, initial buffer position is 0
738+
buf.clear();
739+
byteBufferPreadCheck(smallBuf, buf, 0);
740+
741+
// Small buffer size, initial buffer position is not 0
742+
buf.clear();
743+
byteBufferPreadCheck(smallBuf, buf, 11);
744+
745+
// Test with direct ByteBuffer
746+
buf = ByteBuffer.allocateDirect(dataLen + 100);
747+
748+
// Direct buffer, default buffer size, initial buffer position is 0
749+
byteBufferPreadCheck(defaultBuf, buf, 0);
750+
751+
// Direct buffer, default buffer size, initial buffer position is not 0
752+
buf.clear();
753+
byteBufferPreadCheck(defaultBuf, buf, 11);
754+
755+
// Direct buffer, small buffer size, initial buffer position is 0
756+
buf.clear();
757+
byteBufferPreadCheck(smallBuf, buf, 0);
758+
759+
// Direct buffer, small buffer size, initial buffer position is not 0
760+
buf.clear();
761+
byteBufferPreadCheck(smallBuf, buf, 11);
762+
}
763+
}
617764

618765
@Test(timeout=120000)
619766
public void testCombinedOp() throws Exception {
@@ -851,5 +998,23 @@ public void testUnbuffer() throws Exception {
851998
// The close will be called when exiting this try-with-resource block
852999
}
8531000
}
1001+
1002+
// Test ByteBuffer pread
1003+
try (InputStream in = getInputStream(smallBufferSize)) {
1004+
if (in instanceof ByteBufferPositionedReadable) {
1005+
ByteBufferPositionedReadable bbpin = (ByteBufferPositionedReadable) in;
1006+
1007+
// Test unbuffer after pread
1008+
byteBufferPreadCheck(bbpin);
1009+
((CanUnbuffer) in).unbuffer();
1010+
1011+
// Test pread again after unbuffer
1012+
byteBufferPreadCheck(bbpin);
1013+
1014+
// Test close after unbuffer
1015+
((CanUnbuffer) in).unbuffer();
1016+
// The close will be called when exiting this try-with-resource block
1017+
}
1018+
}
8541019
}
8551020
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.EnumSet;
2727

2828
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
2930
import org.apache.hadoop.fs.ByteBufferReadable;
3031
import org.apache.hadoop.fs.CanSetDropBehind;
3132
import org.apache.hadoop.fs.CanSetReadahead;
@@ -180,7 +181,7 @@ static class FakeInputStream extends InputStream
180181
implements Seekable, PositionedReadable, ByteBufferReadable,
181182
HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
182183
HasEnhancedByteBufferAccess, CanUnbuffer,
183-
StreamCapabilities {
184+
StreamCapabilities, ByteBufferPositionedReadable {
184185
private final byte[] oneByteBuf = new byte[1];
185186
private int pos = 0;
186187
private final byte[] data;
@@ -303,6 +304,32 @@ public int read(long position, byte[] b, int off, int len)
303304
return -1;
304305
}
305306

307+
@Override
308+
public int read(long position, ByteBuffer buf) throws IOException {
309+
if (buf == null) {
310+
throw new NullPointerException();
311+
} else if (!buf.hasRemaining()) {
312+
return 0;
313+
}
314+
315+
if (position > length) {
316+
throw new IOException("Cannot read after EOF.");
317+
}
318+
if (position < 0) {
319+
throw new IOException("Cannot read to negative offset.");
320+
}
321+
322+
checkStream();
323+
324+
if (position < length) {
325+
int n = (int) Math.min(buf.remaining(), length - position);
326+
buf.put(data, (int) position, n);
327+
return n;
328+
}
329+
330+
return -1;
331+
}
332+
306333
@Override
307334
public void readFully(long position, byte[] b, int off, int len)
308335
throws IOException {
@@ -378,6 +405,8 @@ public boolean hasCapability(String capability) {
378405
case StreamCapabilities.READAHEAD:
379406
case StreamCapabilities.DROPBEHIND:
380407
case StreamCapabilities.UNBUFFER:
408+
case StreamCapabilities.READBYTEBUFFER:
409+
case StreamCapabilities.PREADBYTEBUFFER:
381410
return true;
382411
default:
383412
return false;
@@ -439,7 +468,9 @@ public void testHasCapability() throws Exception {
439468
new String[] {
440469
StreamCapabilities.DROPBEHIND,
441470
StreamCapabilities.READAHEAD,
442-
StreamCapabilities.UNBUFFER
471+
StreamCapabilities.UNBUFFER,
472+
StreamCapabilities.READBYTEBUFFER,
473+
StreamCapabilities.PREADBYTEBUFFER
443474
},
444475
new String[] {
445476
StreamCapabilities.HFLUSH,

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,21 @@ protected InputStream getInputStream(int bufferSize, byte[] key, byte[] iv)
9090
@Override
9191
@Test(timeout=10000)
9292
public void testByteBufferRead() throws Exception {}
93+
94+
@Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
95+
@Override
96+
@Test(timeout=10000)
97+
public void testPositionedReadWithByteBuffer() throws IOException {}
9398

9499
@Ignore("ChecksumFSOutputSummer doesn't support Syncable")
95100
@Override
96101
@Test(timeout=10000)
97102
public void testSyncable() throws IOException {}
103+
104+
@Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
105+
@Override
106+
@Test(timeout=10000)
107+
public void testByteBufferPread() throws IOException {}
98108

99109
@Ignore("ChecksumFSInputChecker doesn't support ByteBuffer read")
100110
@Override

0 commit comments

Comments
 (0)