Skip to content

Commit e86dce6

Browse files
author
Sahil Takiar
committed
HDFS-3246: Addressing PR comments
1 parent 0cbc30b commit e86dce6

File tree

5 files changed

+118
-75
lines changed

5 files changed

+118
-75
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

Lines changed: 55 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -329,19 +329,18 @@ public void close() throws IOException {
329329
public int read(long position, byte[] buffer, int offset, int length)
330330
throws IOException {
331331
checkStream();
332-
if (in instanceof PositionedReadable) {
333-
final int n = ((PositionedReadable) in).read(position, buffer, offset,
334-
length);
335-
if (n > 0) {
336-
// This operation does not change the current offset of the file
337-
decrypt(position, buffer, offset, n);
338-
}
339-
340-
return n;
341-
} else {
332+
if (!(in instanceof PositionedReadable)) {
342333
throw new UnsupportedOperationException("This stream does not support " +
343334
"positioned read.");
344335
}
336+
final int n = ((PositionedReadable) in).read(position, buffer, offset,
337+
length);
338+
if (n > 0) {
339+
// This operation does not change the current offset of the file
340+
decrypt(position, buffer, offset, n);
341+
}
342+
343+
return n;
345344
}
346345

347346
/**
@@ -351,19 +350,18 @@ public int read(long position, byte[] buffer, int offset, int length)
351350
public int read(long position, final ByteBuffer buf)
352351
throws IOException {
353352
checkStream();
354-
if (in instanceof ByteBufferPositionedReadable) {
355-
int bufPos = buf.position();
356-
final int n = ((ByteBufferPositionedReadable) in).read(position, buf);
357-
if (n > 0) {
358-
// This operation does not change the current offset of the file
359-
decrypt(position, buf, n, bufPos);
360-
}
361-
362-
return n;
363-
} else {
353+
if (!(in instanceof ByteBufferPositionedReadable)) {
364354
throw new UnsupportedOperationException("This stream does not support " +
365355
"positioned reads with byte buffers.");
366356
}
357+
int bufPos = buf.position();
358+
final int n = ((ByteBufferPositionedReadable) in).read(position, buf);
359+
if (n > 0) {
360+
// This operation does not change the current offset of the file
361+
decrypt(position, buf, n, bufPos);
362+
}
363+
364+
return n;
367365
}
368366

369367
/**
@@ -481,16 +479,15 @@ private void decrypt(long filePosition, ByteBuffer buf, int length, int start)
481479
public void readFully(long position, byte[] buffer, int offset, int length)
482480
throws IOException {
483481
checkStream();
484-
if (in instanceof PositionedReadable) {
485-
((PositionedReadable) in).readFully(position, buffer, offset, length);
486-
if (length > 0) {
487-
// This operation does not change the current offset of the file
488-
decrypt(position, buffer, offset, length);
489-
}
490-
} else {
482+
if (!(in instanceof PositionedReadable)) {
491483
throw new UnsupportedOperationException("This stream does not support " +
492484
"positioned readFully.");
493485
}
486+
((PositionedReadable) in).readFully(position, buffer, offset, length);
487+
if (length > 0) {
488+
// This operation does not change the current offset of the file
489+
decrypt(position, buffer, offset, length);
490+
}
494491
}
495492

496493
@Override
@@ -515,13 +512,12 @@ public void seek(long pos) throws IOException {
515512
outBuffer.position(outBuffer.position() + forward);
516513
}
517514
} else {
518-
if (in instanceof Seekable) {
519-
((Seekable) in).seek(pos);
520-
resetStreamOffset(pos);
521-
} else {
515+
if (!(in instanceof Seekable)) {
522516
throw new UnsupportedOperationException("This stream does not " +
523517
"support seek.");
524518
}
519+
((Seekable) in).seek(pos);
520+
resetStreamOffset(pos);
525521
}
526522
}
527523

@@ -675,14 +671,13 @@ public boolean seekToNewSource(long targetPos) throws IOException {
675671
Preconditions.checkArgument(targetPos >= 0,
676672
"Cannot seek to negative offset.");
677673
checkStream();
678-
if (in instanceof Seekable) {
679-
boolean result = ((Seekable) in).seekToNewSource(targetPos);
680-
resetStreamOffset(targetPos);
681-
return result;
682-
} else {
674+
if (!(in instanceof Seekable)) {
683675
throw new UnsupportedOperationException("This stream does not support " +
684676
"seekToNewSource.");
685677
}
678+
boolean result = ((Seekable) in).seekToNewSource(targetPos);
679+
resetStreamOffset(targetPos);
680+
return result;
686681
}
687682

688683
@Override
@@ -691,63 +686,58 @@ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
691686
UnsupportedOperationException {
692687
checkStream();
693688
if (outBuffer.remaining() > 0) {
694-
if (in instanceof Seekable) {
695-
// Have some decrypted data unread, need to reset.
696-
((Seekable) in).seek(getPos());
697-
resetStreamOffset(getPos());
698-
} else {
689+
if (!(in instanceof Seekable)) {
699690
throw new UnsupportedOperationException("This stream does not " +
700691
"support seek.");
701692
}
693+
// Have some decrypted data unread, need to reset.
694+
((Seekable) in).seek(getPos());
695+
resetStreamOffset(getPos());
702696
}
703-
if (in instanceof HasEnhancedByteBufferAccess) {
704-
final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
705-
read(bufferPool, maxLength, opts);
706-
if (buffer != null) {
707-
final int n = buffer.remaining();
708-
if (n > 0) {
709-
streamOffset += buffer.remaining(); // Read n bytes
710-
final int pos = buffer.position();
711-
decrypt(buffer, n, pos);
712-
}
713-
}
714-
return buffer;
715-
} else {
697+
if (!(in instanceof HasEnhancedByteBufferAccess)) {
716698
throw new UnsupportedOperationException("This stream does not support " +
717699
"enhanced byte buffer access.");
718700
}
701+
final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
702+
read(bufferPool, maxLength, opts);
703+
if (buffer != null) {
704+
final int n = buffer.remaining();
705+
if (n > 0) {
706+
streamOffset += buffer.remaining(); // Read n bytes
707+
final int pos = buffer.position();
708+
decrypt(buffer, n, pos);
709+
}
710+
}
711+
return buffer;
719712
}
720713

721714
@Override
722715
public void releaseBuffer(ByteBuffer buffer) {
723-
if (in instanceof HasEnhancedByteBufferAccess) {
724-
((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
725-
} else {
716+
if (!(in instanceof HasEnhancedByteBufferAccess)) {
726717
throw new UnsupportedOperationException("This stream does not support " +
727718
"release buffer.");
728719
}
720+
((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
729721
}
730722

731723
@Override
732724
public void setReadahead(Long readahead) throws IOException,
733725
UnsupportedOperationException {
734-
if (in instanceof CanSetReadahead) {
735-
((CanSetReadahead) in).setReadahead(readahead);
736-
} else {
726+
if (!(in instanceof CanSetReadahead)) {
737727
throw new UnsupportedOperationException("This stream does not support " +
738728
"setting the readahead caching strategy.");
739729
}
730+
((CanSetReadahead) in).setReadahead(readahead);
740731
}
741732

742733
@Override
743734
public void setDropBehind(Boolean dropCache) throws IOException,
744735
UnsupportedOperationException {
745-
if (in instanceof CanSetReadahead) {
746-
((CanSetDropBehind) in).setDropBehind(dropCache);
747-
} else {
736+
if (!(in instanceof CanSetReadahead)) {
748737
throw new UnsupportedOperationException("This stream does not " +
749738
"support setting the drop-behind caching setting.");
750739
}
740+
((CanSetDropBehind) in).setDropBehind(dropCache);
751741
}
752742

753743
@Override
@@ -851,12 +841,11 @@ public boolean hasCapability(String capability) {
851841
case StreamCapabilities.DROPBEHIND:
852842
case StreamCapabilities.READBYTEBUFFER:
853843
case StreamCapabilities.PREADBYTEBUFFER:
854-
if (in instanceof StreamCapabilities) {
855-
return ((StreamCapabilities) in).hasCapability(capability);
856-
} else {
844+
if (!(in instanceof StreamCapabilities)) {
857845
throw new UnsupportedOperationException("This stream does not expose " +
858846
"its stream capabilities.");
859847
}
848+
return ((StreamCapabilities) in).hasCapability(capability);
860849
default:
861850
return false;
862851
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@ public interface ByteBufferPositionedReadable {
4141
* should be written to.
4242
* <p>
4343
* After a successful call, {@code buf.position()} will be advanced by the
44-
* number of bytes read and {@code buf.limit()} should be unchanged.
44+
* number of bytes read and {@code buf.limit()} will be unchanged.
4545
* <p>
4646
* In the case of an exception, the state of the buffer (the contents of the
47-
* buffer, the {@code buf.position()}, the {@code buf.limit(), etc.) is
47+
* buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
4848
* undefined, and callers should be prepared to recover from this
4949
* eventuality.
5050
* <p>

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import java.nio.ByteBuffer;
22+
2223
import org.apache.hadoop.classification.InterfaceAudience;
2324
import org.apache.hadoop.classification.InterfaceStability;
2425

@@ -33,11 +34,11 @@ public interface ByteBufferReadable {
3334
* Reads up to buf.remaining() bytes into buf. Callers should use
3435
* buf.limit(..) to control the size of the desired read.
3536
* <p>
36-
* After a successful call, buf.position() will be advanced by the number
37-
* of bytes read and buf.limit() should be unchanged.
37+
* After a successful call, {@code buf.position()} will be advanced by the
38+
* number of bytes read and {@code buf.limit()} will be unchanged.
3839
* <p>
3940
* In the case of an exception, the state of the buffer (the contents of the
40-
* buffer, the {@code buf.position()}, the {@code buf.limit(), etc.) is
41+
* buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
4142
* undefined, and callers should be prepared to recover from this
4243
* eventuality.
4344
* <p>

hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,17 @@ int main(int argc, char **argv) {
260260
// read path
261261
hdfsFileDisableDirectRead(readFile);
262262

263+
if (hdfsFileUsesDirectRead(readFile)) {
264+
fprintf(stderr, "Disabled direct reads, but it is still enabled");
265+
shutdown_and_exit(cl, -1);
266+
}
267+
268+
if (!hdfsFileUsesDirectPread(readFile)) {
269+
fprintf(stderr, "Disabled direct reads, but direct preads was "
270+
"disabled as well");
271+
shutdown_and_exit(cl, -1);
272+
}
273+
263274
num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
264275
sizeof(buffer));
265276
if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
@@ -353,6 +364,17 @@ int main(int argc, char **argv) {
353364
// read path
354365
hdfsFileDisableDirectPread(preadFile);
355366

367+
if (hdfsFileUsesDirectPread(preadFile)) {
368+
fprintf(stderr, "Disabled direct preads, but it is still enabled");
369+
shutdown_and_exit(cl, -1);
370+
}
371+
372+
if (!hdfsFileUsesDirectRead(preadFile)) {
373+
fprintf(stderr, "Disabled direct preads, but direct read was "
374+
"disabled as well");
375+
shutdown_and_exit(cl, -1);
376+
}
377+
356378
num_pread_bytes = hdfsPread(fs, preadFile, 0, (void*)buffer, sizeof(buffer));
357379
if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
358380
fprintf(stderr, "Failed to pread. Expected %s but got %s (%d bytes)\n",

hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141
// Bit fields for hdfsFile_internal flags
4242
#define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
43-
#define HDFS_FILE_SUPPORTS_DIRECT_PREAD (1<<0)
43+
#define HDFS_FILE_SUPPORTS_DIRECT_PREAD (1<<1)
4444

4545
/**
4646
* Reads bytes using the read(ByteBuffer) API. By using Java
@@ -1011,6 +1011,19 @@ int hdfsStreamBuilderSetDefaultBlockSize(struct hdfsStreamBuilder *bld,
10111011
return 0;
10121012
}
10131013

1014+
/**
1015+
* Delegates to FsDataInputStream#hasCapability(String). Used to check if a
1016+
* given input stream supports certain methods, such as
1017+
* ByteBufferReadable#read(ByteBuffer).
1018+
*
1019+
* @param jFile the FsDataInputStream to call hasCapability on
1020+
* @param capability the name of the capability to query; for a full list of
1021+
* possible values see StreamCapabilities
1022+
*
1023+
* @return true if the given jFile has the given capability, false otherwise
1024+
*
1025+
* @see org.apache.hadoop.fs.StreamCapabilities
1026+
*/
10141027
static int hdfsHasStreamCapability(jobject jFile,
10151028
const char *capability) {
10161029
int ret = 0;
@@ -1033,7 +1046,7 @@ static int hdfsHasStreamCapability(jobject jFile,
10331046
}
10341047
jthr = invokeMethod(env, &jVal, INSTANCE, jFile,
10351048
JC_FS_DATA_INPUT_STREAM, "hasCapability", "(Ljava/lang/String;)Z",
1036-
jCapabilityString);
1049+
jCapabilityString);
10371050
if (jthr) {
10381051
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
10391052
"hdfsHasStreamCapability(%s): FSDataInputStream#hasCapability",
@@ -1568,7 +1581,16 @@ tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
15681581
"readDirect: FSDataInputStream#read");
15691582
return -1;
15701583
}
1571-
return (jVal.i < 0) ? 0 : jVal.i;
1584+
// Reached EOF, return 0
1585+
if (jVal.i < 0) {
1586+
return 0;
1587+
}
1588+
// 0 bytes read, return error
1589+
if (jVal.i == 0) {
1590+
errno = EINTR;
1591+
return -1;
1592+
}
1593+
return jVal.i;
15721594
}
15731595

15741596
/**
@@ -1693,7 +1715,16 @@ tSize preadDirect(hdfsFS fs, hdfsFile f, tOffset position, void* buffer,
16931715
"preadDirect: FSDataInputStream#read");
16941716
return -1;
16951717
}
1696-
return (jVal.i < 0) ? 0 : jVal.i;
1718+
// Reached EOF, return 0
1719+
if (jVal.i < 0) {
1720+
return 0;
1721+
}
1722+
// 0 bytes read, return error
1723+
if (jVal.i == 0) {
1724+
errno = EINTR;
1725+
return -1;
1726+
}
1727+
return jVal.i;
16971728
}
16981729

16991730
tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length)

0 commit comments

Comments
 (0)