Skip to content

Commit d7fe8db

Browse files
committed
HBASE-27668 PB's parseDelimitedFrom can successfully return when there are not enough bytes
1 parent 4a9cf99 commit d7fe8db

File tree

4 files changed

+147
-39
lines changed

4 files changed

+147
-39
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.hadoop.hbase.protobuf.ProtobufMagic.PB_MAGIC;
2121

2222
import java.io.ByteArrayOutputStream;
23+
import java.io.EOFException;
2324
import java.io.IOException;
2425
import java.io.InputStream;
2526
import java.lang.reflect.Constructor;
@@ -137,6 +138,7 @@
137138
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
138139
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
139140
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
141+
import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
140142
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
141143
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
142144
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
@@ -3700,4 +3702,52 @@ public static ClusterStatusProtos.ServerTask toServerTask(ServerTask task) {
37003702
.setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTime()).build();
37013703
}
37023704

3705+
/**
3706+
* Check whether this IPBE indicates EOF or not.
3707+
* <p/>
3708+
* We will check the exception message, if it is likely the one of
3709+
* InvalidProtocolBufferException.truncatedMessage, we will consider it as EOF, otherwise not.
3710+
*/
3711+
public static boolean isEOF(InvalidProtocolBufferException e) {
3712+
return e.getMessage().contains("input has been truncated");
3713+
}
3714+
3715+
/**
3716+
* This is a wrapper of the PB message's parseDelimitedFrom. The difference is, if we can not
3717+
* determine whether there are enough bytes in stream, i.e, the available method does not have a
3718+
* valid return value, we will try to read all the bytes to a byte array first, and then parse the
3719+
* pb message with {@link Parser#parseFrom(byte[])} instead of call
3720+
* {@link Parser#parseDelimitedFrom(InputStream)} directly. This is because even if the bytes are
3721+
* not enough bytes, {@link Parser#parseDelimitedFrom(InputStream)} could still return without any
3722+
* errors but just leave us a partial PB message.
3723+
* @return The PB message if we can parse it successfully, otherwise there will always be an
3724+
* exception thrown, will never return {@code null}.
3725+
*/
3726+
public static <T extends Message> T parseDelimitedFrom(InputStream in, Parser<T> parser)
3727+
throws IOException {
3728+
int firstByte = in.read();
3729+
if (firstByte < 0) {
3730+
throw new EOFException("EOF while reading message size");
3731+
}
3732+
int size = CodedInputStream.readRawVarint32(firstByte, in);
3733+
int available = in.available();
3734+
if (available > 0) {
3735+
if (available < size) {
3736+
throw new EOFException("Available bytes not enough for parsing PB message, expect at least "
3737+
+ size + " bytes, but only " + available + " bytes available");
3738+
}
3739+
// this piece of code is copied from GeneratedMessageV3.parseFrom
3740+
try {
3741+
return parser.parseFrom(ByteStreams.limit(in, size));
3742+
} catch (InvalidProtocolBufferException e) {
3743+
throw e.unwrapIOException();
3744+
}
3745+
} else {
3746+
// this usually means the stream does not have a proper available implementation, let's read
3747+
// the content to an byte array before parsing.
3748+
byte[] bytes = new byte[size];
3749+
ByteStreams.readFully(in, bytes);
3750+
return parser.parseFrom(bytes);
3751+
}
3752+
}
37033753
}

hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.junit.Assert.assertTrue;
2323

2424
import java.io.IOException;
25+
import java.lang.reflect.Method;
2526
import java.nio.ByteBuffer;
2627
import java.util.Collections;
2728
import java.util.List;
@@ -53,6 +54,7 @@
5354
import org.apache.hbase.thirdparty.com.google.protobuf.Any;
5455
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
5556
import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
57+
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
5658

5759
import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
5860
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@@ -574,4 +576,21 @@ public void testTagEncodeTrueDecodeFalse() {
574576
List<Tag> decodedTags = PrivateCellUtil.getTags(decodedCell);
575577
assertEquals(0, decodedTags.size());
576578
}
579+
580+
/**
581+
* Used to confirm that we only consider truncatedMessage as EOF
582+
*/
583+
@Test
584+
public void testIsEOF() throws Exception {
585+
for (Method method : InvalidProtocolBufferException.class.getDeclaredMethods()) {
586+
if (
587+
method.getParameterCount() == 0
588+
&& method.getReturnType() == InvalidProtocolBufferException.class
589+
) {
590+
method.setAccessible(true);
591+
InvalidProtocolBufferException e = (InvalidProtocolBufferException) method.invoke(null);
592+
assertEquals(method.getName().equals("truncatedMessage"), ProtobufUtil.isEOF(e));
593+
}
594+
}
595+
}
577596
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java

Lines changed: 71 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
import org.slf4j.Logger;
3737
import org.slf4j.LoggerFactory;
3838

39-
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
40-
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
4139
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
4240

4341
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -358,60 +356,46 @@ protected Compression.Algorithm getValueCompressionAlgorithm() {
358356

359357
@Override
360358
protected boolean readNext(Entry entry) throws IOException {
359+
resetCompression = false;
361360
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
362361
long originalPosition = this.inputStream.getPos();
363362
if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
364363
LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
365364
return false;
366365
}
367-
WALKey.Builder builder = WALKey.newBuilder();
368-
long size = 0;
369366
boolean resetPosition = false;
370-
// by default, we should reset the compression when seeking back after reading something
371-
resetCompression = true;
372367
try {
373-
long available = -1;
368+
WALKey walKey;
374369
try {
375-
int firstByte = this.inputStream.read();
376-
if (firstByte == -1) {
377-
throw new EOFException();
378-
}
379-
size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
380-
// available may be < 0 on local fs for instance. If so, can't depend on it.
381-
available = this.inputStream.available();
382-
if (available > 0 && available < size) {
383-
// if we quit here, we have just read the length, no actual data yet, which means we
384-
// haven't put anything into the compression dictionary yet, so when seeking back to the
385-
// last good position, we do not need to reset compression context.
386-
// This is very useful for saving the extra effort for reconstructing the compression
387-
// dictionary, where we need to read from the beginning instead of just seek to the
388-
// position, as DFSInputStream implement the available method, so in most cases we will
389-
// reach here if there are not enough data.
390-
resetCompression = false;
391-
throw new EOFException("Available stream not enough for edit, "
392-
+ "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= "
393-
+ size + " at offset = " + this.inputStream.getPos());
370+
walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALKey.parser());
371+
} catch (InvalidProtocolBufferException e) {
372+
if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) {
373+
// only rethrow EOF if it indicates an EOF, or we have reached the partial WALTrailer
374+
resetPosition = true;
375+
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
376+
+ originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
377+
} else {
378+
throw e;
394379
}
395-
ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), (int) size);
396-
} catch (InvalidProtocolBufferException ipbe) {
397-
resetPosition = true;
398-
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
399-
+ originalPosition + ", currentPosition=" + this.inputStream.getPos() + ", messageSize="
400-
+ size + ", currentAvailable=" + available).initCause(ipbe);
380+
} catch (EOFException e) {
381+
// append more detailed information
382+
throw (EOFException) new EOFException("EOF while reading WAL key; originalPosition="
383+
+ originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
401384
}
402-
if (!builder.isInitialized()) {
403-
// TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
404-
// If we can get the KV count, we could, theoretically, try to get next record.
405-
throw new EOFException("Partial PB while reading WAL, "
406-
+ "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
407-
}
408-
WALKey walKey = builder.build();
409385
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
410386
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
411387
LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}",
412388
this.inputStream.getPos());
413389
return true;
414390
}
391+
// Starting from here, we will start to read cells, which will change the content in
392+
// compression dictionary, so if we fail in the below operations, when resetting, we also need
393+
// to clear the compression context, and read from the beginning to reconstruct the
394+
// compression dictionary, instead of seeking to the position directly.
395+
// This is very useful for saving the extra effort for reconstructing the compression
396+
// dictionary, as DFSInputStream implement the available method, so in most cases we will
397+
// not reach here if there are not enough data.
398+
resetCompression = true;
415399
int expectedCells = walKey.getFollowingKvCount();
416400
long posBefore = this.inputStream.getPos();
417401
try {
@@ -490,6 +474,54 @@ private IOException extractHiddenEof(Exception ex) {
490474
return null;
491475
}
492476

477+
/**
478+
* This is used to determine whether we have already reached the WALTrailer. As the size and magic
479+
* are at the end of the WAL file, it is possible that these two options are missing while
480+
* writing, so we will consider there is no trailer. And when we actually reach the WALTrailer, we
481+
* will try to decode it as WALKey and we will fail but the error could be vary as it is parsing
482+
* WALTrailer actually.
483+
* @return whether this is a WALTrailer and we should throw EOF to upper layer the file is done
484+
*/
485+
private boolean isWALTrailer(long startPosition) throws IOException {
486+
// We have nothing in the WALTrailer PB message now so its size is just a int length size and a
487+
// magic at the end
488+
int trailerSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT;
489+
if (fileLength - startPosition >= trailerSize) {
490+
// We still have more than trailerSize bytes before reaching the EOF so this is not a trailer.
491+
// We also test for == here because if this is a valid trailer, we can read it while opening
492+
// the reader so we should not reach here
493+
return false;
494+
}
495+
inputStream.seek(startPosition);
496+
for (int i = 0; i < 4; i++) {
497+
int r = inputStream.read();
498+
if (r == -1) {
499+
// we have reached EOF while reading the length, and all bytes read are 0, so we assume this
500+
// is a partial trailer
501+
return true;
502+
}
503+
if (r != 0) {
504+
// the length is not 0, should not be a trailer
505+
return false;
506+
}
507+
}
508+
for (int i = 0; i < PB_WAL_COMPLETE_MAGIC.length; i++) {
509+
int r = inputStream.read();
510+
if (r == -1) {
511+
// we have reached EOF while reading the magic, and all bytes read are matched, so we assume
512+
// this is a partial trailer
513+
return true;
514+
}
515+
if (r != (PB_WAL_COMPLETE_MAGIC[i] & 0xFF)) {
516+
// does not match magic, should not be a trailer
517+
return false;
518+
}
519+
}
520+
// in fact we should not reach here, as this means the trailer bytes are all matched and
521+
// complete, then we should not call this method...
522+
return true;
523+
}
524+
493525
@Override
494526
protected void seekOnFs(long pos) throws IOException {
495527
this.inputStream.seek(pos);

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.hadoop.hbase.regionserver.wal;
1919

20+
import static org.junit.Assert.assertEquals;
21+
2022
import java.io.IOException;
2123
import org.apache.hadoop.fs.FileStatus;
2224
import org.apache.hadoop.fs.FileSystem;
@@ -35,6 +37,8 @@
3537
import org.junit.Test;
3638
import org.junit.rules.TestName;
3739

40+
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
41+
3842
/**
3943
* WAL tests that can be reused across providers.
4044
*/
@@ -89,6 +93,9 @@ public static void tearDownAfterClass() throws Exception {
8993
*/
9094
@Test
9195
public void testWALTrailer() throws IOException {
96+
// make sure that the size for WALTrailer is 0, we need this assumption when reading partial
97+
// WALTrailer
98+
assertEquals(0, WALTrailer.newBuilder().build().getSerializedSize());
9299
// read With trailer.
93100
doRead(true);
94101
// read without trailer

0 commit comments

Comments
 (0)