Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,45 @@

import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;

import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.io.IOUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public final class BlockIOUtils {
private static final Logger LOG =
LoggerFactory.getLogger(BlockIOUtils.class);
// TODO: remove the reflection when we update to Hadoop 3.3 or above.
private static Method byteBufferPositionedReadMethod;

static {
initByteBufferPositionReadableMethod();
}

// Disallow instantiation
private BlockIOUtils() {

}

private static void initByteBufferPositionReadableMethod() {
try {
//long position, ByteBuffer buf
byteBufferPositionedReadMethod = FSDataInputStream.class.getMethod("read", long.class,
ByteBuffer.class);
} catch (NoSuchMethodException e) {
LOG.debug("Unable to find positioned bytebuffer read API of FSDataInputStream. "
+ "preadWithExtra() will use a temporary on-heap byte array.");
}
}

public static boolean isByteBufferReadable(FSDataInputStream is) {
InputStream cur = is.getWrappedStream();
for (;;) {
Expand Down Expand Up @@ -197,6 +220,10 @@ public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int nec
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
* specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
* read.
*
* If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer
* directly, and does not allocate a temporary byte array.
*
* @param buff ByteBuff to read into.
* @param dis the input stream to read from
* @param position the position within the stream from which to start reading
Expand All @@ -207,6 +234,17 @@ public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int nec
*/
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen) throws IOException {
boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer");

if (preadbytebuffer) {
return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen);
} else {
return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen);
}
}

private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen) throws IOException {
int remain = necessaryLen + extraLen;
byte[] buf = new byte[remain];
int bytesRead = 0;
Expand All @@ -220,15 +258,49 @@ public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long
bytesRead += ret;
remain -= ret;
}
// Copy the bytes from on-heap bytes[] to ByteBuffer[] now, and after resolving HDFS-3246, we
// will read the bytes to ByteBuffer[] directly without allocating any on-heap byte[].
// TODO I keep the bytes copy here, because I want to abstract the ByteBuffer[]
// preadWithExtra method for the upper layer, only need to refactor this method if the
// ByteBuffer pread is OK.
copyToByteBuff(buf, 0, bytesRead, buff);
return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
}

private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen) throws IOException {
int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0;
ByteBuffer[] buffers = buff.nioByteBuffers();
ByteBuffer cur = buffers[idx];
while (bytesRead < necessaryLen) {
int ret;
while (!cur.hasRemaining()) {
if (++idx >= buffers.length) {
throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
}
cur = buffers[idx];
}
cur.limit(cur.position() + Math.min(remain, cur.remaining()));
try {
ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position + bytesRead, cur);
} catch (IllegalAccessException e) {
throw new IOException("Unable to invoke ByteBuffer positioned read when trying to read "
+ bytesRead + " bytes from position " + position, e);
} catch (InvocationTargetException e) {
throw new IOException("Encountered an exception when invoking ByteBuffer positioned read"
+ " when trying to read " + bytesRead + " bytes from position " + position, e);
} catch (NullPointerException e) {
throw new IOException("something is null");
} catch (Exception e) {
throw e;
}
if (ret < 0) {
throw new IOException("Premature EOF from inputStream (positional read returned " + ret
+ ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
+ " extra bytes, successfully read " + bytesRead);
}
bytesRead += ret;
remain -= ret;
}

return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
}

private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out)
throws IOException {
if (offset < 0 || len < 0 || offset + len > buf.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -138,9 +141,11 @@ public void testPositionalReadNoExtra() throws IOException {
ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
when(in.hasCapability(anyString())).thenReturn(false);
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
assertFalse("Expect false return when no extra bytes requested", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in);
}

Expand All @@ -156,10 +161,12 @@ public void testPositionalReadShortReadOfNecessaryBytes() throws IOException {
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
when(in.read(5, buf, 5, 5)).thenReturn(5);
when(in.hasCapability(anyString())).thenReturn(false);
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
assertFalse("Expect false return when no extra bytes requested", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verify(in).read(5, buf, 5, 5);
verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in);
}

Expand All @@ -174,9 +181,11 @@ public void testPositionalReadExtraSucceeded() throws IOException {
ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
when(in.hasCapability(anyString())).thenReturn(false);
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
assertTrue("Expect true return when reading extra bytes succeeds", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in);
}

Expand All @@ -191,9 +200,11 @@ public void testPositionalReadExtraFailed() throws IOException {
ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen);
when(in.hasCapability(anyString())).thenReturn(false);
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
assertFalse("Expect false return when reading extra bytes fails", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in);
}

Expand All @@ -210,10 +221,12 @@ public void testPositionalReadShortReadCompletesNecessaryAndExtraBytes()
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
when(in.read(5, buf, 5, 10)).thenReturn(10);
when(in.hasCapability(anyString())).thenReturn(false);
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
assertTrue("Expect true return when reading extra bytes succeeds", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verify(in).read(5, buf, 5, 10);
verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in);
}

Expand All @@ -229,8 +242,89 @@ public void testPositionalReadPrematureEOF() throws IOException {
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(9);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(-1);
when(in.hasCapability(anyString())).thenReturn(false);
exception.expect(IOException.class);
exception.expectMessage("EOF");
BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
}

/**
* Determine if ByteBufferPositionedReadable API is available
* .
* @return true if FSDataInputStream implements ByteBufferPositionedReadable API.
*/
private boolean isByteBufferPositionedReadable() {
try {
//long position, ByteBuffer buf
FSDataInputStream.class.getMethod("read", long.class, ByteBuffer.class);
} catch (NoSuchMethodException e) {
return false;
}
return true;
}

public static class MyFSDataInputStream extends FSDataInputStream {
public MyFSDataInputStream(InputStream in) {
super(in);
}

// This is the ByteBufferPositionReadable API we want to test.
// Because the API is only available in Hadoop 3.3, FSDataInputStream in older Hadoop
// does not implement the interface, and it wouldn't compile trying to mock the method.
// So explicitly declare the method here to make mocking possible.
public int read(long position, ByteBuffer buf) throws IOException {
return 0;
}
}

@Test
public void testByteBufferPositionedReadable() throws IOException {
assumeTrue("Skip the test because ByteBufferPositionedReadable is not available",
isByteBufferPositionedReadable());
long position = 0;
int necessaryLen = 10;
int extraLen = 1;
int totalLen = necessaryLen + extraLen;
int firstReadLen = 6;
int secondReadLen = totalLen - firstReadLen;
ByteBuffer buf = ByteBuffer.allocate(totalLen);
ByteBuff bb = new SingleByteBuff(buf);
MyFSDataInputStream in = mock(MyFSDataInputStream.class);

when(in.read(position, buf)).thenReturn(firstReadLen);
when(in.read(firstReadLen, buf)).thenReturn(secondReadLen);
when(in.hasCapability(anyString())).thenReturn(true);
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
assertTrue("Expect true return when reading extra bytes succeeds", ret);
verify(in).read(position, buf);
verify(in).read(firstReadLen, buf);
verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in);
}

@Test
public void testByteBufferPositionedReadableEOF() throws IOException {
assumeTrue("Skip the test because ByteBufferPositionedReadable is not available",
isByteBufferPositionedReadable());
long position = 0;
int necessaryLen = 10;
int extraLen = 0;
int totalLen = necessaryLen + extraLen;
int firstReadLen = 9;
ByteBuffer buf = ByteBuffer.allocate(totalLen);
ByteBuff bb = new SingleByteBuff(buf);
MyFSDataInputStream in = mock(MyFSDataInputStream.class);

when(in.read(position, buf)).thenReturn(firstReadLen);
when(in.read(position, buf)).thenReturn(-1);
when(in.hasCapability(anyString())).thenReturn(true);
exception.expect(IOException.class);
exception.expectMessage("EOF");
BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);

verify(in).read(position, buf);
verify(in).read(firstReadLen, buf);
verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in);
}
}