@@ -421,9 +421,10 @@ public void recoverLease() throws IOException {
421421 }
422422
423423 // Create new state-log
424- if (!rollWriter (flushLogId + 1 )) {
424+ long newFlushLogId = flushLogId + 1 ;
425+ if (!rollWriter (newFlushLogId )) {
425426 // someone else has already created this log
426- LOG .debug ("Someone else has already created log {}. Retrying." , flushLogId );
427+ LOG .debug ("Someone else has already created log {}. Retrying." , newFlushLogId );
427428 continue ;
428429 }
429430
@@ -1042,8 +1043,9 @@ private boolean rollWriter() throws IOException {
10421043 }
10431044
10441045 // Create new state-log
1045- if (!rollWriter (flushLogId + 1 )) {
1046- LOG .warn ("someone else has already created log {}" , flushLogId );
1046+ long newFlushLogId = flushLogId + 1 ;
1047+ if (!rollWriter (newFlushLogId )) {
1048+ LOG .warn ("someone else has already created log {}" , newFlushLogId );
10471049 return false ;
10481050 }
10491051
@@ -1100,7 +1102,8 @@ boolean rollWriter(long logId) throws IOException {
11001102 startPos = newStream .getPos ();
11011103 } catch (IOException ioe ) {
11021104 LOG .warn ("Encountered exception writing header" , ioe );
1103- newStream .close ();
1105+ // Close and delete the incomplete file
1106+ closeAndDeleteIncompleteFile (newStream , newLogFile );
11041107 return false ;
11051108 }
11061109
@@ -1165,6 +1168,29 @@ private void closeCurrentLogStream(boolean abort) {
11651168 stream = null ;
11661169 }
11671170
1171+ private void closeAndDeleteIncompleteFile (FSDataOutputStream newStream , Path newLogFile ) {
1172+ // Close the FS
1173+ try {
1174+ newStream .close ();
1175+ } catch (IOException e ) {
1176+ LOG .error ("Exception occured while closing the file {}" , newLogFile , e );
1177+ }
1178+
1179+ // Delete the incomplete file
1180+ try {
1181+ if (!fs .delete (newLogFile , false )) {
1182+ LOG .warn (
1183+ "Failed to delete the log file {}, increasing the log id by 1 for the next roll attempt" ,
1184+ newLogFile );
1185+ flushLogId ++;
1186+ }
1187+ } catch (IOException e ) {
1188+ LOG .warn ("Exception occured while deleting the file {}" , newLogFile , e );
1189+ flushLogId ++;
1190+ LOG .info ("Increased the log id to {}" , flushLogId );
1191+ }
1192+ }
1193+
11681194 // ==========================================================================
11691195 // Log Files cleaner helpers
11701196 // ==========================================================================
0 commit comments