Skip to content

Commit 4a3c7d7

Browse files
authored
HBASE-21946 Use ByteBuffer pread instead of byte[] pread in HFileBlock when applicable (#3434)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
1 parent c74366c commit 4a3c7d7

File tree

2 files changed

+171
-5
lines changed

2 files changed

+171
-5
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,45 @@
2020

2121
import java.io.IOException;
2222
import java.io.InputStream;
23+
import java.lang.reflect.InvocationTargetException;
24+
import java.lang.reflect.Method;
2325
import java.nio.ByteBuffer;
2426

2527
import org.apache.hadoop.fs.ByteBufferReadable;
2628
import org.apache.hadoop.fs.FSDataInputStream;
2729
import org.apache.hadoop.hbase.nio.ByteBuff;
2830
import org.apache.hadoop.io.IOUtils;
2931
import org.apache.yetus.audience.InterfaceAudience;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
3034

3135
@InterfaceAudience.Private
3236
public final class BlockIOUtils {
37+
private static final Logger LOG =
38+
LoggerFactory.getLogger(BlockIOUtils.class);
39+
// TODO: remove the reflection when we update to Hadoop 3.3 or above.
40+
private static Method byteBufferPositionedReadMethod;
41+
42+
static {
43+
initByteBufferPositionReadableMethod();
44+
}
3345

3446
// Disallow instantiation
3547
private BlockIOUtils() {
3648

3749
}
3850

51+
private static void initByteBufferPositionReadableMethod() {
52+
try {
53+
//long position, ByteBuffer buf
54+
byteBufferPositionedReadMethod = FSDataInputStream.class.getMethod("read", long.class,
55+
ByteBuffer.class);
56+
} catch (NoSuchMethodException e) {
57+
LOG.debug("Unable to find positioned bytebuffer read API of FSDataInputStream. "
58+
+ "preadWithExtra() will use a temporary on-heap byte array.");
59+
}
60+
}
61+
3962
public static boolean isByteBufferReadable(FSDataInputStream is) {
4063
InputStream cur = is.getWrappedStream();
4164
for (;;) {
@@ -197,6 +220,10 @@ public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int nec
197220
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
198221
* specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
199222
* read.
223+
*
224+
* If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer
225+
* directly, and does not allocate a temporary byte array.
226+
*
200227
* @param buff ByteBuff to read into.
201228
* @param dis the input stream to read from
202229
* @param position the position within the stream from which to start reading
@@ -207,6 +234,17 @@ public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int nec
207234
*/
208235
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
209236
int necessaryLen, int extraLen) throws IOException {
237+
boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer");
238+
239+
if (preadbytebuffer) {
240+
return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen);
241+
} else {
242+
return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen);
243+
}
244+
}
245+
246+
private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position,
247+
int necessaryLen, int extraLen) throws IOException {
210248
int remain = necessaryLen + extraLen;
211249
byte[] buf = new byte[remain];
212250
int bytesRead = 0;
@@ -220,15 +258,49 @@ public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long
220258
bytesRead += ret;
221259
remain -= ret;
222260
}
223-
// Copy the bytes from on-heap bytes[] to ByteBuffer[] now, and after resolving HDFS-3246, we
224-
// will read the bytes to ByteBuffer[] directly without allocating any on-heap byte[].
225-
// TODO I keep the bytes copy here, because I want to abstract the ByteBuffer[]
226-
// preadWithExtra method for the upper layer, only need to refactor this method if the
227-
// ByteBuffer pread is OK.
228261
copyToByteBuff(buf, 0, bytesRead, buff);
229262
return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
230263
}
231264

265+
private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position,
266+
int necessaryLen, int extraLen) throws IOException {
267+
int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0;
268+
ByteBuffer[] buffers = buff.nioByteBuffers();
269+
ByteBuffer cur = buffers[idx];
270+
while (bytesRead < necessaryLen) {
271+
int ret;
272+
while (!cur.hasRemaining()) {
273+
if (++idx >= buffers.length) {
274+
throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
275+
}
276+
cur = buffers[idx];
277+
}
278+
cur.limit(cur.position() + Math.min(remain, cur.remaining()));
279+
try {
280+
ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position + bytesRead, cur);
281+
} catch (IllegalAccessException e) {
282+
throw new IOException("Unable to invoke ByteBuffer positioned read when trying to read "
283+
+ bytesRead + " bytes from position " + position, e);
284+
} catch (InvocationTargetException e) {
285+
throw new IOException("Encountered an exception when invoking ByteBuffer positioned read"
286+
+ " when trying to read " + bytesRead + " bytes from position " + position, e);
287+
} catch (NullPointerException e) {
288+
throw new IOException("something is null");
289+
} catch (Exception e) {
290+
throw e;
291+
}
292+
if (ret < 0) {
293+
throw new IOException("Premature EOF from inputStream (positional read returned " + ret
294+
+ ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
295+
+ " extra bytes, successfully read " + bytesRead);
296+
}
297+
bytesRead += ret;
298+
remain -= ret;
299+
}
300+
301+
return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
302+
}
303+
232304
private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out)
233305
throws IOException {
234306
if (offset < 0 || len < 0 || offset + len > buf.length) {

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121
import static org.junit.Assert.assertFalse;
2222
import static org.junit.Assert.assertTrue;
2323
import static org.junit.Assert.fail;
24+
import static org.junit.Assume.assumeTrue;
25+
import static org.mockito.ArgumentMatchers.anyString;
2426
import static org.mockito.Mockito.mock;
2527
import static org.mockito.Mockito.verify;
2628
import static org.mockito.Mockito.verifyNoMoreInteractions;
2729
import static org.mockito.Mockito.when;
2830

2931
import java.io.IOException;
32+
import java.io.InputStream;
3033
import java.nio.ByteBuffer;
3134

3235
import org.apache.hadoop.fs.FSDataInputStream;
@@ -138,9 +141,11 @@ public void testPositionalReadNoExtra() throws IOException {
138141
ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
139142
FSDataInputStream in = mock(FSDataInputStream.class);
140143
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
144+
when(in.hasCapability(anyString())).thenReturn(false);
141145
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
142146
assertFalse("Expect false return when no extra bytes requested", ret);
143147
verify(in).read(position, buf, bufOffset, totalLen);
148+
verify(in).hasCapability(anyString());
144149
verifyNoMoreInteractions(in);
145150
}
146151

@@ -156,10 +161,12 @@ public void testPositionalReadShortReadOfNecessaryBytes() throws IOException {
156161
FSDataInputStream in = mock(FSDataInputStream.class);
157162
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
158163
when(in.read(5, buf, 5, 5)).thenReturn(5);
164+
when(in.hasCapability(anyString())).thenReturn(false);
159165
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
160166
assertFalse("Expect false return when no extra bytes requested", ret);
161167
verify(in).read(position, buf, bufOffset, totalLen);
162168
verify(in).read(5, buf, 5, 5);
169+
verify(in).hasCapability(anyString());
163170
verifyNoMoreInteractions(in);
164171
}
165172

@@ -174,9 +181,11 @@ public void testPositionalReadExtraSucceeded() throws IOException {
174181
ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
175182
FSDataInputStream in = mock(FSDataInputStream.class);
176183
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
184+
when(in.hasCapability(anyString())).thenReturn(false);
177185
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
178186
assertTrue("Expect true return when reading extra bytes succeeds", ret);
179187
verify(in).read(position, buf, bufOffset, totalLen);
188+
verify(in).hasCapability(anyString());
180189
verifyNoMoreInteractions(in);
181190
}
182191

@@ -191,9 +200,11 @@ public void testPositionalReadExtraFailed() throws IOException {
191200
ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
192201
FSDataInputStream in = mock(FSDataInputStream.class);
193202
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen);
203+
when(in.hasCapability(anyString())).thenReturn(false);
194204
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
195205
assertFalse("Expect false return when reading extra bytes fails", ret);
196206
verify(in).read(position, buf, bufOffset, totalLen);
207+
verify(in).hasCapability(anyString());
197208
verifyNoMoreInteractions(in);
198209
}
199210

@@ -210,10 +221,12 @@ public void testPositionalReadShortReadCompletesNecessaryAndExtraBytes()
210221
FSDataInputStream in = mock(FSDataInputStream.class);
211222
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
212223
when(in.read(5, buf, 5, 10)).thenReturn(10);
224+
when(in.hasCapability(anyString())).thenReturn(false);
213225
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
214226
assertTrue("Expect true return when reading extra bytes succeeds", ret);
215227
verify(in).read(position, buf, bufOffset, totalLen);
216228
verify(in).read(5, buf, 5, 10);
229+
verify(in).hasCapability(anyString());
217230
verifyNoMoreInteractions(in);
218231
}
219232

@@ -229,8 +242,89 @@ public void testPositionalReadPrematureEOF() throws IOException {
229242
FSDataInputStream in = mock(FSDataInputStream.class);
230243
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(9);
231244
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(-1);
245+
when(in.hasCapability(anyString())).thenReturn(false);
232246
exception.expect(IOException.class);
233247
exception.expectMessage("EOF");
234248
BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
235249
}
250+
251+
/**
252+
* Determine if ByteBufferPositionedReadable API is available
253+
* .
254+
* @return true if FSDataInputStream implements ByteBufferPositionedReadable API.
255+
*/
256+
private boolean isByteBufferPositionedReadable() {
257+
try {
258+
//long position, ByteBuffer buf
259+
FSDataInputStream.class.getMethod("read", long.class, ByteBuffer.class);
260+
} catch (NoSuchMethodException e) {
261+
return false;
262+
}
263+
return true;
264+
}
265+
266+
public static class MyFSDataInputStream extends FSDataInputStream {
267+
public MyFSDataInputStream(InputStream in) {
268+
super(in);
269+
}
270+
271+
// This is the ByteBufferPositionReadable API we want to test.
272+
// Because the API is only available in Hadoop 3.3, FSDataInputStream in older Hadoop
273+
// does not implement the interface, and it wouldn't compile trying to mock the method.
274+
// So explicitly declare the method here to make mocking possible.
275+
public int read(long position, ByteBuffer buf) throws IOException {
276+
return 0;
277+
}
278+
}
279+
280+
@Test
281+
public void testByteBufferPositionedReadable() throws IOException {
282+
assumeTrue("Skip the test because ByteBufferPositionedReadable is not available",
283+
isByteBufferPositionedReadable());
284+
long position = 0;
285+
int necessaryLen = 10;
286+
int extraLen = 1;
287+
int totalLen = necessaryLen + extraLen;
288+
int firstReadLen = 6;
289+
int secondReadLen = totalLen - firstReadLen;
290+
ByteBuffer buf = ByteBuffer.allocate(totalLen);
291+
ByteBuff bb = new SingleByteBuff(buf);
292+
MyFSDataInputStream in = mock(MyFSDataInputStream.class);
293+
294+
when(in.read(position, buf)).thenReturn(firstReadLen);
295+
when(in.read(firstReadLen, buf)).thenReturn(secondReadLen);
296+
when(in.hasCapability(anyString())).thenReturn(true);
297+
boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
298+
assertTrue("Expect true return when reading extra bytes succeeds", ret);
299+
verify(in).read(position, buf);
300+
verify(in).read(firstReadLen, buf);
301+
verify(in).hasCapability(anyString());
302+
verifyNoMoreInteractions(in);
303+
}
304+
305+
@Test
306+
public void testByteBufferPositionedReadableEOF() throws IOException {
307+
assumeTrue("Skip the test because ByteBufferPositionedReadable is not available",
308+
isByteBufferPositionedReadable());
309+
long position = 0;
310+
int necessaryLen = 10;
311+
int extraLen = 0;
312+
int totalLen = necessaryLen + extraLen;
313+
int firstReadLen = 9;
314+
ByteBuffer buf = ByteBuffer.allocate(totalLen);
315+
ByteBuff bb = new SingleByteBuff(buf);
316+
MyFSDataInputStream in = mock(MyFSDataInputStream.class);
317+
318+
when(in.read(position, buf)).thenReturn(firstReadLen);
319+
when(in.read(position, buf)).thenReturn(-1);
320+
when(in.hasCapability(anyString())).thenReturn(true);
321+
exception.expect(IOException.class);
322+
exception.expectMessage("EOF");
323+
BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
324+
325+
verify(in).read(position, buf);
326+
verify(in).read(firstReadLen, buf);
327+
verify(in).hasCapability(anyString());
328+
verifyNoMoreInteractions(in);
329+
}
236330
}

0 commit comments

Comments
 (0)