Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,108 +329,108 @@ protected boolean hasTagCompression() {

@Override
protected boolean readNext(Entry entry) throws IOException {
while (true) {
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
long originalPosition = this.inputStream.getPos();
if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
long originalPosition = this.inputStream.getPos();
if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
return false;
}
WALKey.Builder builder = WALKey.newBuilder();
long size = 0;
boolean resetPosition = false;
try {
long available = -1;
try {
int firstByte = this.inputStream.read();
if (firstByte == -1) {
throw new EOFException();
}
size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
// available may be < 0 on local fs for instance. If so, can't depend on it.
available = this.inputStream.available();
if (available > 0 && available < size) {
throw new EOFException(
"Available stream not enough for edit, " + "inputStream.available()= "
+ this.inputStream.available() + ", " + "entry size= " + size + " at offset = "
+ this.inputStream.getPos());
}
ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), (int) size);
} catch (InvalidProtocolBufferException ipbe) {
resetPosition = true;
throw (EOFException) new EOFException(
"Invalid PB, EOF? Ignoring; originalPosition=" + originalPosition + ", currentPosition="
+ this.inputStream.getPos() + ", messageSize=" + size + ", currentAvailable="
+ available).initCause(ipbe);
}
if (!builder.isInitialized()) {
// TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
// If we can get the KV count, we could, theoretically, try to get next record.
throw new EOFException(
"Partial PB while reading WAL, " + "probably an unexpected EOF, ignoring. current offset="
+ this.inputStream.getPos());
}
WALKey walKey = builder.build();
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
this.inputStream.getPos());
seekOnFs(originalPosition);
return false;
}
WALKey.Builder builder = WALKey.newBuilder();
long size = 0;
boolean resetPosition = false;
int expectedCells = walKey.getFollowingKvCount();
long posBefore = this.inputStream.getPos();
try {
long available = -1;
try {
int firstByte = this.inputStream.read();
if (firstByte == -1) {
throw new EOFException();
}
size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
// available may be < 0 on local fs for instance. If so, can't depend on it.
available = this.inputStream.available();
if (available > 0 && available < size) {
throw new EOFException("Available stream not enough for edit, " +
"inputStream.available()= " + this.inputStream.available() + ", " +
"entry size= " + size + " at offset = " + this.inputStream.getPos());
}
ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size),
(int)size);
} catch (InvalidProtocolBufferException ipbe) {
int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
if (expectedCells != actualCells) {
resetPosition = true;
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
originalPosition + ", currentPosition=" + this.inputStream.getPos() +
", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
}
if (!builder.isInitialized()) {
// TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
// If we can get the KV count, we could, theoretically, try to get next record.
throw new EOFException("Partial PB while reading WAL, " +
"probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
throw new EOFException("Only read " + actualCells); // other info added in catch
}
WALKey walKey = builder.build();
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
this.inputStream.getPos());
seekOnFs(originalPosition);
return false;
}
int expectedCells = walKey.getFollowingKvCount();
long posBefore = this.inputStream.getPos();
} catch (Exception ex) {
String posAfterStr = "<unknown>";
try {
int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
if (expectedCells != actualCells) {
resetPosition = true;
throw new EOFException("Only read " + actualCells); // other info added in catch
}
} catch (Exception ex) {
String posAfterStr = "<unknown>";
try {
posAfterStr = this.inputStream.getPos() + "";
} catch (Throwable t) {
LOG.trace("Error getting pos for error message - ignoring", t);
}
String message = " while reading " + expectedCells + " WAL KVs; started reading at "
+ posBefore + " and read up to " + posAfterStr;
IOException realEofEx = extractHiddenEof(ex);
throw (EOFException) new EOFException("EOF " + message).
initCause(realEofEx != null ? realEofEx : ex);
}
if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path
+ ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
+ this.walEditsStopOffset);
throw new EOFException("Read WALTrailer while reading WALEdits");
posAfterStr = this.inputStream.getPos() + "";
} catch (Throwable t) {
LOG.trace("Error getting pos for error message - ignoring", t);
}
} catch (EOFException eof) {
// If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
if (originalPosition < 0) {
LOG.warn("Encountered a malformed edit, but can't seek back to last good position "
+ "because originalPosition is negative. last offset={}",
this.inputStream.getPos(), eof);
throw eof;
}
// If stuck at the same place and we got and exception, lets go back at the beginning.
if (inputStream.getPos() == originalPosition) {
if (resetPosition) {
LOG.warn("Encountered a malformed edit, seeking to the beginning of the WAL since " +
"current position and original position match at {}", originalPosition);
seekOnFs(0);
} else {
LOG.debug("Reached the end of file at position {}", originalPosition);
}
String message =
" while reading " + expectedCells + " WAL KVs; started reading at " + posBefore
+ " and read up to " + posAfterStr;
IOException realEofEx = extractHiddenEof(ex);
throw (EOFException) new EOFException("EOF " + message).
initCause(realEofEx != null ? realEofEx : ex);
}
if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
LOG.error(
"Read WALTrailer while reading WALEdits. wal: " + this.path + ", inputStream.getPos(): "
+ this.inputStream.getPos() + ", walEditsStopOffset: " + this.walEditsStopOffset);
throw new EOFException("Read WALTrailer while reading WALEdits");
}
} catch (EOFException eof) {
// If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
if (originalPosition < 0) {
LOG.warn("Encountered a malformed edit, but can't seek back to last good position "
+ "because originalPosition is negative. last offset={}", this.inputStream.getPos(), eof);
throw eof;
}
// If stuck at the same place and we got and exception, lets go back at the beginning.
if (inputStream.getPos() == originalPosition) {
if (resetPosition) {
LOG.warn("Encountered a malformed edit, seeking to the beginning of the WAL since "
+ "current position and original position match at {}", originalPosition);
seekOnFs(0);
} else {
// Else restore our position to original location in hope that next time through we will
// read successfully.
LOG.warn("Encountered a malformed edit, seeking back to last good position in file, " +
"from {} to {}", inputStream.getPos(), originalPosition, eof);
seekOnFs(originalPosition);
LOG.debug("Reached the end of file at position {}", originalPosition);
}
return false;
} else {
// Else restore our position to original location in hope that next time through we will
// read successfully.
LOG.warn("Encountered a malformed edit, seeking back to last good position in file, "
+ "from {} to {}", inputStream.getPos(), originalPosition, eof);
seekOnFs(originalPosition);
}
return true;
return false;
}
return true;
}

private IOException extractHiddenEof(Exception ex) {
Expand Down