Skip to content

HDFS-14564: Add libhdfs APIs for readFully; add readFully to ByteBufferPositionedReadable #963

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,8 @@ public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
checkStream();
if (!(in instanceof PositionedReadable)) {
throw new UnsupportedOperationException("This stream does not support " +
"positioned read.");
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+ " does not support positioned read.");
}
final int n = ((PositionedReadable) in).read(position, buffer, offset,
length);
Expand All @@ -351,8 +351,8 @@ public int read(long position, final ByteBuffer buf)
throws IOException {
checkStream();
if (!(in instanceof ByteBufferPositionedReadable)) {
throw new UnsupportedOperationException("This stream does not support " +
"positioned reads with byte buffers.");
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+ " does not support positioned reads with byte buffers.");
}
int bufPos = buf.position();
final int n = ((ByteBufferPositionedReadable) in).read(position, buf);
Expand All @@ -363,7 +363,27 @@ public int read(long position, final ByteBuffer buf)

return n;
}


/**
* Positioned readFully using {@link ByteBuffer}s. This method is thread-safe.
*/
@Override
public void readFully(long position, final ByteBuffer buf)
throws IOException {
checkStream();
if (!(in instanceof ByteBufferPositionedReadable)) {
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+ " does not support positioned reads with byte buffers.");
}
int bufPos = buf.position();
((ByteBufferPositionedReadable) in).readFully(position, buf);
final int n = buf.position() - bufPos;
if (n > 0) {
// This operation does not change the current offset of the file
decrypt(position, buf, n, bufPos);
}
}

/**
* Decrypt length bytes in buffer starting at offset. Output is also put
* into buffer starting at offset. It is thread-safe.
Expand Down Expand Up @@ -480,8 +500,8 @@ public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
checkStream();
if (!(in instanceof PositionedReadable)) {
throw new UnsupportedOperationException("This stream does not support " +
"positioned readFully.");
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+ " does not support positioned readFully.");
}
((PositionedReadable) in).readFully(position, buffer, offset, length);
if (length > 0) {
Expand Down Expand Up @@ -513,8 +533,8 @@ public void seek(long pos) throws IOException {
}
} else {
if (!(in instanceof Seekable)) {
throw new UnsupportedOperationException("This stream does not " +
"support seek.");
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+ " does not support seek.");
}
((Seekable) in).seek(pos);
resetStreamOffset(pos);
Expand Down Expand Up @@ -672,8 +692,8 @@ public boolean seekToNewSource(long targetPos) throws IOException {
"Cannot seek to negative offset.");
checkStream();
if (!(in instanceof Seekable)) {
throw new UnsupportedOperationException("This stream does not support " +
"seekToNewSource.");
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+ " does not support seekToNewSource.");
}
boolean result = ((Seekable) in).seekToNewSource(targetPos);
resetStreamOffset(targetPos);
Expand All @@ -687,16 +707,16 @@ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
checkStream();
if (outBuffer.remaining() > 0) {
if (!(in instanceof Seekable)) {
throw new UnsupportedOperationException("This stream does not " +
"support seek.");
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+ " does not support seek.");
}
// Have some decrypted data unread, need to reset.
((Seekable) in).seek(getPos());
resetStreamOffset(getPos());
}
if (!(in instanceof HasEnhancedByteBufferAccess)) {
throw new UnsupportedOperationException("This stream does not support " +
"enhanced byte buffer access.");
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+ " does not support enhanced byte buffer access.");
}
final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
read(bufferPool, maxLength, opts);
Expand All @@ -714,8 +734,8 @@ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
@Override
public void releaseBuffer(ByteBuffer buffer) {
if (!(in instanceof HasEnhancedByteBufferAccess)) {
throw new UnsupportedOperationException("This stream does not support " +
"release buffer.");
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+ " does not support release buffer.");
}
((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
}
Expand All @@ -724,8 +744,8 @@ public void releaseBuffer(ByteBuffer buffer) {
public void setReadahead(Long readahead) throws IOException,
UnsupportedOperationException {
if (!(in instanceof CanSetReadahead)) {
throw new UnsupportedOperationException("This stream does not support " +
"setting the readahead caching strategy.");
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+ " does not support setting the readahead caching strategy.");
}
((CanSetReadahead) in).setReadahead(readahead);
}
Expand All @@ -734,8 +754,9 @@ public void setReadahead(Long readahead) throws IOException,
public void setDropBehind(Boolean dropCache) throws IOException,
UnsupportedOperationException {
if (!(in instanceof CanSetReadahead)) {
throw new UnsupportedOperationException("This stream does not " +
"support setting the drop-behind caching setting.");
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+ " stream does not support setting the drop-behind caching"
+ " setting.");
}
((CanSetDropBehind) in).setDropBehind(dropCache);
}
Expand Down Expand Up @@ -842,8 +863,8 @@ public boolean hasCapability(String capability) {
case StreamCapabilities.READBYTEBUFFER:
case StreamCapabilities.PREADBYTEBUFFER:
if (!(in instanceof StreamCapabilities)) {
throw new UnsupportedOperationException("This stream does not expose " +
"its stream capabilities.");
throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+ " does not expose its stream capabilities.");
}
return ((StreamCapabilities) in).hasCapability(capability);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;

Expand Down Expand Up @@ -55,6 +56,8 @@ public interface ByteBufferPositionedReadable {
* <p>
* Implementations should treat 0-length requests as legitimate, and must not
* signal an error upon their receipt.
* <p>
* This does not change the current offset of a file, and is thread-safe.
*
* @param position position within file
* @param buf the ByteBuffer to receive the results of the read operation.
Expand All @@ -63,4 +66,25 @@ public interface ByteBufferPositionedReadable {
* @throws IOException if there is some error performing the read
*/
int read(long position, ByteBuffer buf) throws IOException;

/**
* Reads {@code buf.remaining()} bytes into buf from a given position in
* the file or until the end of the data was reached before the read
* operation completed. Callers should use {@code buf.limit(...)} to
* control the size of the desired read and {@code buf.position(...)} to
* control the offset into the buffer the data should be written to.
* <p>
* This operation provides similar semantics to
* {@link #read(long, ByteBuffer)}, the difference is that this method is
* guaranteed to read data until the {@link ByteBuffer} is full, or until
* the end of the data stream is reached.
*
* @param position position within file
* @param buf the ByteBuffer to receive the results of the read operation.
* @throws IOException if there is some error performing the read
* @throws EOFException the end of the data was reached before
* the read operation completed
* @see #read(long, ByteBuffer)
*/
void readFully(long position, ByteBuffer buf) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public class FSDataInputStream extends DataInputStream
public FSDataInputStream(InputStream in) {
super(in);
if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
throw new IllegalArgumentException(
"In is not an instance of Seekable or PositionedReadable");
throw new IllegalArgumentException(in.getClass().getCanonicalName() +
" is not an instance of Seekable or PositionedReadable");
}
}

Expand Down Expand Up @@ -150,7 +150,7 @@ public int read(ByteBuffer buf) throws IOException {
}

throw new UnsupportedOperationException("Byte-buffer read unsupported " +
"by input stream");
"by " + in.getClass().getCanonicalName());
}

@Override
Expand All @@ -170,9 +170,8 @@ public void setReadahead(Long readahead)
try {
((CanSetReadahead)in).setReadahead(readahead);
} catch (ClassCastException e) {
throw new UnsupportedOperationException(
"this stream does not support setting the readahead " +
"caching strategy.");
throw new UnsupportedOperationException(in.getClass().getCanonicalName() +
" does not support setting the readahead caching strategy.");
}
}

Expand Down Expand Up @@ -256,6 +255,16 @@ public int read(long position, ByteBuffer buf) throws IOException {
return ((ByteBufferPositionedReadable) in).read(position, buf);
}
throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
"by input stream");
"by " + in.getClass().getCanonicalName());
}

@Override
public void readFully(long position, ByteBuffer buf) throws IOException {
if (in instanceof ByteBufferPositionedReadable) {
((ByteBufferPositionedReadable) in).readFully(position, buf);
} else {
throw new UnsupportedOperationException("Byte-buffer pread " +
"unsupported by " + in.getClass().getCanonicalName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -388,42 +388,41 @@ private void positionedReadCheckWithByteBuffer(InputStream in, int pos)
Assert.assertArrayEquals(readData, expectedData);
}

/** Test read fully */
/** Test read fully. */
@Test(timeout=120000)
public void testReadFully() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
writeData(out);

InputStream in = getInputStream(defaultBufferSize);
final int len1 = dataLen / 4;
// Read len1 bytes
byte[] readData = new byte[len1];
readAll(in, readData, 0, len1);
byte[] expectedData = new byte[len1];
System.arraycopy(data, 0, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);

// Pos: 1/3 dataLen
readFullyCheck(in, dataLen / 3);

// Read len1 bytes
readData = new byte[len1];
readAll(in, readData, 0, len1);
expectedData = new byte[len1];
System.arraycopy(data, len1, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);

// Pos: 1/2 dataLen
readFullyCheck(in, dataLen / 2);

// Read len1 bytes
readData = new byte[len1];
readAll(in, readData, 0, len1);
expectedData = new byte[len1];
System.arraycopy(data, 2 * len1, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);

in.close();
try (InputStream in = getInputStream(defaultBufferSize)) {
final int len1 = dataLen / 4;
// Read len1 bytes
byte[] readData = new byte[len1];
readAll(in, readData, 0, len1);
byte[] expectedData = new byte[len1];
System.arraycopy(data, 0, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);

// Pos: 1/3 dataLen
readFullyCheck(in, dataLen / 3);

// Read len1 bytes
readData = new byte[len1];
readAll(in, readData, 0, len1);
expectedData = new byte[len1];
System.arraycopy(data, len1, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);

// Pos: 1/2 dataLen
readFullyCheck(in, dataLen / 2);

// Read len1 bytes
readData = new byte[len1];
readAll(in, readData, 0, len1);
expectedData = new byte[len1];
System.arraycopy(data, 2 * len1, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);
}
}

private void readFullyCheck(InputStream in, int pos) throws Exception {
Expand All @@ -441,6 +440,60 @@ private void readFullyCheck(InputStream in, int pos) throws Exception {
} catch (EOFException e) {
}
}

/** Test byte byffer read fully. */
@Test(timeout=120000)
public void testByteBufferReadFully() throws Exception {
OutputStream out = getOutputStream(defaultBufferSize);
writeData(out);

try (InputStream in = getInputStream(defaultBufferSize)) {
final int len1 = dataLen / 4;
// Read len1 bytes
byte[] readData = new byte[len1];
readAll(in, readData, 0, len1);
byte[] expectedData = new byte[len1];
System.arraycopy(data, 0, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);

// Pos: 1/3 dataLen
byteBufferReadFullyCheck(in, dataLen / 3);

// Read len1 bytes
readData = new byte[len1];
readAll(in, readData, 0, len1);
expectedData = new byte[len1];
System.arraycopy(data, len1, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);

// Pos: 1/2 dataLen
byteBufferReadFullyCheck(in, dataLen / 2);

// Read len1 bytes
readData = new byte[len1];
readAll(in, readData, 0, len1);
expectedData = new byte[len1];
System.arraycopy(data, 2 * len1, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);
}
}

private void byteBufferReadFullyCheck(InputStream in, int pos)
throws Exception {
ByteBuffer result = ByteBuffer.allocate(dataLen - pos);
((ByteBufferPositionedReadable) in).readFully(pos, result);

byte[] expectedData = new byte[dataLen - pos];
System.arraycopy(data, pos, expectedData, 0, dataLen - pos);
Assert.assertArrayEquals(result.array(), expectedData);

result = ByteBuffer.allocate(dataLen); // Exceeds maximum length
try {
((ByteBufferPositionedReadable) in).readFully(pos, result);
Assert.fail("Read fully exceeds maximum length should fail.");
} catch (EOFException e) {
}
}

/** Test seek to different position. */
@Test(timeout=120000)
Expand Down
Loading