@@ -329,108 +329,108 @@ protected boolean hasTagCompression() {
329329
330330 @ Override
331331 protected boolean readNext (Entry entry ) throws IOException {
332- while (true ) {
333- // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
334- long originalPosition = this .inputStream .getPos ();
335- if (trailerPresent && originalPosition > 0 && originalPosition == this .walEditsStopOffset ) {
336- LOG .trace ("Reached end of expected edits area at offset {}" , originalPosition );
332+ // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
333+ long originalPosition = this .inputStream .getPos ();
334+ if (trailerPresent && originalPosition > 0 && originalPosition == this .walEditsStopOffset ) {
335+ LOG .trace ("Reached end of expected edits area at offset {}" , originalPosition );
336+ return false ;
337+ }
338+ WALKey .Builder builder = WALKey .newBuilder ();
339+ long size = 0 ;
340+ boolean resetPosition = false ;
341+ try {
342+ long available = -1 ;
343+ try {
344+ int firstByte = this .inputStream .read ();
345+ if (firstByte == -1 ) {
346+ throw new EOFException ();
347+ }
348+ size = CodedInputStream .readRawVarint32 (firstByte , this .inputStream );
349+ // available may be < 0 on local fs for instance. If so, can't depend on it.
350+ available = this .inputStream .available ();
351+ if (available > 0 && available < size ) {
352+ throw new EOFException (
353+ "Available stream not enough for edit, " + "inputStream.available()= "
354+ + this .inputStream .available () + ", " + "entry size= " + size + " at offset = "
355+ + this .inputStream .getPos ());
356+ }
357+ ProtobufUtil .mergeFrom (builder , ByteStreams .limit (this .inputStream , size ), (int ) size );
358+ } catch (InvalidProtocolBufferException ipbe ) {
359+ resetPosition = true ;
360+ throw (EOFException ) new EOFException (
361+ "Invalid PB, EOF? Ignoring; originalPosition=" + originalPosition + ", currentPosition="
362+ + this .inputStream .getPos () + ", messageSize=" + size + ", currentAvailable="
363+ + available ).initCause (ipbe );
364+ }
365+ if (!builder .isInitialized ()) {
366+ // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
367+ // If we can get the KV count, we could, theoretically, try to get next record.
368+ throw new EOFException (
369+ "Partial PB while reading WAL, " + "probably an unexpected EOF, ignoring. current offset="
370+ + this .inputStream .getPos ());
371+ }
372+ WALKey walKey = builder .build ();
373+ entry .getKey ().readFieldsFromPb (walKey , this .byteStringUncompressor );
374+ if (!walKey .hasFollowingKvCount () || 0 == walKey .getFollowingKvCount ()) {
375+ LOG .trace ("WALKey has no KVs that follow it; trying the next one. current offset={}" ,
376+ this .inputStream .getPos ());
377+ seekOnFs (originalPosition );
337378 return false ;
338379 }
339- WALKey .Builder builder = WALKey .newBuilder ();
340- long size = 0 ;
341- boolean resetPosition = false ;
380+ int expectedCells = walKey .getFollowingKvCount ();
381+ long posBefore = this .inputStream .getPos ();
342382 try {
343- long available = -1 ;
344- try {
345- int firstByte = this .inputStream .read ();
346- if (firstByte == -1 ) {
347- throw new EOFException ();
348- }
349- size = CodedInputStream .readRawVarint32 (firstByte , this .inputStream );
350- // available may be < 0 on local fs for instance. If so, can't depend on it.
351- available = this .inputStream .available ();
352- if (available > 0 && available < size ) {
353- throw new EOFException ("Available stream not enough for edit, " +
354- "inputStream.available()= " + this .inputStream .available () + ", " +
355- "entry size= " + size + " at offset = " + this .inputStream .getPos ());
356- }
357- ProtobufUtil .mergeFrom (builder , ByteStreams .limit (this .inputStream , size ),
358- (int )size );
359- } catch (InvalidProtocolBufferException ipbe ) {
383+ int actualCells = entry .getEdit ().readFromCells (cellDecoder , expectedCells );
384+ if (expectedCells != actualCells ) {
360385 resetPosition = true ;
361- throw (EOFException ) new EOFException ("Invalid PB, EOF? Ignoring; originalPosition=" +
362- originalPosition + ", currentPosition=" + this .inputStream .getPos () +
363- ", messageSize=" + size + ", currentAvailable=" + available ).initCause (ipbe );
364- }
365- if (!builder .isInitialized ()) {
366- // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
367- // If we can get the KV count, we could, theoretically, try to get next record.
368- throw new EOFException ("Partial PB while reading WAL, " +
369- "probably an unexpected EOF, ignoring. current offset=" + this .inputStream .getPos ());
386+ throw new EOFException ("Only read " + actualCells ); // other info added in catch
370387 }
371- WALKey walKey = builder .build ();
372- entry .getKey ().readFieldsFromPb (walKey , this .byteStringUncompressor );
373- if (!walKey .hasFollowingKvCount () || 0 == walKey .getFollowingKvCount ()) {
374- LOG .trace ("WALKey has no KVs that follow it; trying the next one. current offset={}" ,
375- this .inputStream .getPos ());
376- seekOnFs (originalPosition );
377- return false ;
378- }
379- int expectedCells = walKey .getFollowingKvCount ();
380- long posBefore = this .inputStream .getPos ();
388+ } catch (Exception ex ) {
389+ String posAfterStr = "<unknown>" ;
381390 try {
382- int actualCells = entry .getEdit ().readFromCells (cellDecoder , expectedCells );
383- if (expectedCells != actualCells ) {
384- resetPosition = true ;
385- throw new EOFException ("Only read " + actualCells ); // other info added in catch
386- }
387- } catch (Exception ex ) {
388- String posAfterStr = "<unknown>" ;
389- try {
390- posAfterStr = this .inputStream .getPos () + "" ;
391- } catch (Throwable t ) {
392- LOG .trace ("Error getting pos for error message - ignoring" , t );
393- }
394- String message = " while reading " + expectedCells + " WAL KVs; started reading at "
395- + posBefore + " and read up to " + posAfterStr ;
396- IOException realEofEx = extractHiddenEof (ex );
397- throw (EOFException ) new EOFException ("EOF " + message ).
398- initCause (realEofEx != null ? realEofEx : ex );
399- }
400- if (trailerPresent && this .inputStream .getPos () > this .walEditsStopOffset ) {
401- LOG .error ("Read WALTrailer while reading WALEdits. wal: " + this .path
402- + ", inputStream.getPos(): " + this .inputStream .getPos () + ", walEditsStopOffset: "
403- + this .walEditsStopOffset );
404- throw new EOFException ("Read WALTrailer while reading WALEdits" );
391+ posAfterStr = this .inputStream .getPos () + "" ;
392+ } catch (Throwable t ) {
393+ LOG .trace ("Error getting pos for error message - ignoring" , t );
405394 }
406- } catch (EOFException eof ) {
407- // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
408- if (originalPosition < 0 ) {
409- LOG .warn ("Encountered a malformed edit, but can't seek back to last good position "
410- + "because originalPosition is negative. last offset={}" ,
411- this .inputStream .getPos (), eof );
412- throw eof ;
413- }
414- // If stuck at the same place and we got and exception, lets go back at the beginning.
415- if (inputStream .getPos () == originalPosition ) {
416- if (resetPosition ) {
417- LOG .warn ("Encountered a malformed edit, seeking to the beginning of the WAL since " +
418- "current position and original position match at {}" , originalPosition );
419- seekOnFs (0 );
420- } else {
421- LOG .debug ("Reached the end of file at position {}" , originalPosition );
422- }
395+ String message =
396+ " while reading " + expectedCells + " WAL KVs; started reading at " + posBefore
397+ + " and read up to " + posAfterStr ;
398+ IOException realEofEx = extractHiddenEof (ex );
399+ throw (EOFException ) new EOFException ("EOF " + message ).
400+ initCause (realEofEx != null ? realEofEx : ex );
401+ }
402+ if (trailerPresent && this .inputStream .getPos () > this .walEditsStopOffset ) {
403+ LOG .error (
404+ "Read WALTrailer while reading WALEdits. wal: " + this .path + ", inputStream.getPos(): "
405+ + this .inputStream .getPos () + ", walEditsStopOffset: " + this .walEditsStopOffset );
406+ throw new EOFException ("Read WALTrailer while reading WALEdits" );
407+ }
408+ } catch (EOFException eof ) {
409+ // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
410+ if (originalPosition < 0 ) {
411+ LOG .warn ("Encountered a malformed edit, but can't seek back to last good position "
412+ + "because originalPosition is negative. last offset={}" , this .inputStream .getPos (), eof );
413+ throw eof ;
414+ }
415+ // If stuck at the same place and we got and exception, lets go back at the beginning.
416+ if (inputStream .getPos () == originalPosition ) {
417+ if (resetPosition ) {
418+ LOG .warn ("Encountered a malformed edit, seeking to the beginning of the WAL since "
419+ + "current position and original position match at {}" , originalPosition );
420+ seekOnFs (0 );
423421 } else {
424- // Else restore our position to original location in hope that next time through we will
425- // read successfully.
426- LOG .warn ("Encountered a malformed edit, seeking back to last good position in file, " +
427- "from {} to {}" , inputStream .getPos (), originalPosition , eof );
428- seekOnFs (originalPosition );
422+ LOG .debug ("Reached the end of file at position {}" , originalPosition );
429423 }
430- return false ;
424+ } else {
425+ // Else restore our position to original location in hope that next time through we will
426+ // read successfully.
427+ LOG .warn ("Encountered a malformed edit, seeking back to last good position in file, "
428+ + "from {} to {}" , inputStream .getPos (), originalPosition , eof );
429+ seekOnFs (originalPosition );
431430 }
432- return true ;
431+ return false ;
433432 }
433+ return true ;
434434 }
435435
436436 private IOException extractHiddenEof (Exception ex ) {
0 commit comments