Skip to content

Commit fa199e0

Browse files
committed
HBASE-27732 NPE in TestBasicWALEntryStreamFSHLog.testEOFExceptionInOldWALsDirectory
1 parent 997d132 commit fa199e0

File tree

10 files changed

+329
-286
lines changed

10 files changed

+329
-286
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -298,27 +298,34 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
298298
final Comparator<Path> LOG_NAME_COMPARATOR =
299299
(o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
300300

301-
private static final class WalProps {
301+
private static final class WALProps {
302302

303303
/**
304304
* Map the encoded region name to the highest sequence id.
305305
* <p/>
306306
* Contains all the regions it has an entry for.
307307
*/
308-
public final Map<byte[], Long> encodedName2HighestSequenceId;
308+
final Map<byte[], Long> encodedName2HighestSequenceId;
309309

310310
/**
311311
* The log file size. Notice that the size may not be accurate if we do asynchronous close in
312312
* sub classes.
313313
*/
314-
public final long logSize;
314+
final long logSize;
315315

316316
/**
317317
* The nanoTime of the log rolling, used to determine the time interval that has passed since.
318318
*/
319-
public final long rollTimeNs;
319+
final long rollTimeNs;
320320

321-
public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
321+
/**
322+
* If we do asynchronous close in sub classes, it is possible that when adding WALProps to the
323+
* rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file,
324+
* for safety.
325+
*/
326+
volatile boolean closed = false;
327+
328+
public WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
322329
this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
323330
this.logSize = logSize;
324331
this.rollTimeNs = System.nanoTime();
@@ -329,7 +336,7 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
329336
* Map of WAL log file to properties. The map is sorted by the log file creation timestamp
330337
* (contained in the log file name).
331338
*/
332-
protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
339+
protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props =
333340
new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
334341

335342
/**
@@ -348,6 +355,9 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
348355

349356
protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
350357

358+
protected final ExecutorService closeExecutor = Executors.newCachedThreadPool(
359+
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
360+
351361
// Run in caller if we get reject execution exception, to avoid aborting region server when we get
352362
// reject execution exception. Usually this should not happen but let's make it more robust.
353363
private final ExecutorService logArchiveExecutor =
@@ -697,7 +707,7 @@ Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
697707
Map<byte[], List<byte[]>> regions = null;
698708
int logCount = getNumRolledLogFiles();
699709
if (logCount > this.maxLogs && logCount > 0) {
700-
Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
710+
Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry();
701711
regions =
702712
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
703713
}
@@ -720,17 +730,34 @@ Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
720730
return regions;
721731
}
722732

733+
/**
734+
* Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file.
735+
*/
736+
protected final void markClosedAndClean(Path path) {
737+
WALProps props = walFile2Props.get(path);
738+
// typically this should not be null, but if there is no big issue if it is already null, so
739+
// let's make the code more robust
740+
if (props != null) {
741+
props.closed = true;
742+
cleanOldLogs();
743+
}
744+
}
745+
723746
/**
724747
* Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
725748
*/
726-
private void cleanOldLogs() throws IOException {
749+
private void cleanOldLogs() {
727750
List<Pair<Path, Long>> logsToArchive = null;
728751
long now = System.nanoTime();
729752
boolean mayLogTooOld = nextLogTooOldNs <= now;
730753
ArrayList<byte[]> regionsBlockingWal = null;
731754
// For each log file, look at its Map of regions to highest sequence id; if all sequence ids
732755
// are older than what is currently in memory, the WAL can be GC'd.
733-
for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
756+
for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) {
757+
if (!e.getValue().closed) {
758+
LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey());
759+
continue;
760+
}
734761
Path log = e.getKey();
735762
ArrayList<byte[]> regionsBlockingThisWal = null;
736763
long ageNs = now - e.getValue().rollTimeNs;
@@ -834,7 +861,7 @@ protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long ol
834861
String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
835862
if (oldPath != null) {
836863
this.walFile2Props.put(oldPath,
837-
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
864+
new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
838865
this.totalLogSize.addAndGet(oldFileLen);
839866
LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
840867
CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.util.SortedSet;
3636
import java.util.TreeSet;
3737
import java.util.concurrent.ExecutorService;
38-
import java.util.concurrent.Executors;
3938
import java.util.concurrent.LinkedBlockingQueue;
4039
import java.util.concurrent.ThreadPoolExecutor;
4140
import java.util.concurrent.TimeUnit;
@@ -179,9 +178,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
179178

180179
private final long batchSize;
181180

182-
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
183-
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
184-
185181
private volatile AsyncFSOutput fsOut;
186182

187183
private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
@@ -778,32 +774,35 @@ private void waitForSafePoint() {
778774
}
779775
}
780776

781-
protected final long closeWriter(AsyncWriter writer, Path path) {
782-
if (writer != null) {
783-
inflightWALClosures.put(path.getName(), writer);
784-
long fileLength = writer.getLength();
785-
closeExecutor.execute(() -> {
786-
try {
787-
writer.close();
788-
} catch (IOException e) {
789-
LOG.warn("close old writer failed", e);
790-
} finally {
791-
inflightWALClosures.remove(path.getName());
792-
}
793-
});
794-
return fileLength;
795-
} else {
796-
return 0L;
797-
}
777+
private void closeWriter(AsyncWriter writer, Path path) {
778+
inflightWALClosures.put(path.getName(), writer);
779+
closeExecutor.execute(() -> {
780+
try {
781+
writer.close();
782+
} catch (IOException e) {
783+
LOG.warn("close old writer failed", e);
784+
} finally {
785+
// call this even if the above close fails, as there is no other chance we can set closed to
786+
// true, it will not cause big problems.
787+
markClosedAndClean(path);
788+
inflightWALClosures.remove(path.getName());
789+
}
790+
});
798791
}
799792

800793
@Override
801794
protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
802795
throws IOException {
803796
Preconditions.checkNotNull(nextWriter);
804797
waitForSafePoint();
805-
long oldFileLen = closeWriter(this.writer, oldPath);
806-
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
798+
if (writer != null) {
799+
long oldFileLen = writer.getLength();
800+
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
801+
closeWriter(writer, oldPath);
802+
} else {
803+
logRollAndSetupWalProps(oldPath, newPath, 0);
804+
}
805+
807806
this.writer = nextWriter;
808807
if (nextWriter instanceof AsyncProtobufLogWriter) {
809808
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
import java.util.List;
3737
import java.util.concurrent.BlockingQueue;
3838
import java.util.concurrent.CountDownLatch;
39-
import java.util.concurrent.ExecutorService;
40-
import java.util.concurrent.Executors;
4139
import java.util.concurrent.LinkedBlockingQueue;
4240
import java.util.concurrent.TimeUnit;
4341
import java.util.concurrent.atomic.AtomicInteger;
@@ -169,8 +167,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
169167
private final AtomicInteger closeErrorCount = new AtomicInteger();
170168

171169
private final int waitOnShutdownInSeconds;
172-
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
173-
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
174170

175171
/**
176172
* Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
@@ -377,10 +373,10 @@ protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) th
377373
LOG.warn(
378374
"Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage());
379375
}
380-
long oldFileLen = 0L;
381376
// It is at the safe point. Swap out writer from under the blocked writer thread.
382377
if (this.writer != null) {
383-
oldFileLen = this.writer.getLength();
378+
long oldFileLen = this.writer.getLength();
379+
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
384380
// In case of having unflushed entries or we already reached the
385381
// closeErrorsTolerated count, call the closeWriter inline rather than in async
386382
// way so that in case of an IOE we will throw it back and abort RS.
@@ -393,12 +389,20 @@ protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) th
393389
try {
394390
closeWriter(localWriter, oldPath, false);
395391
} catch (IOException e) {
396-
// We will never reach here.
392+
LOG.warn("close old writer failed", e);
393+
} finally {
394+
// call this even if the above close fails, as there is no other chance we can set
395+
// closed to
396+
// true, it will not cause big problems.
397+
markClosedAndClean(oldPath);
398+
inflightWALClosures.remove(oldPath.getName());
397399
}
398400
});
399401
}
402+
} else {
403+
logRollAndSetupWalProps(oldPath, newPath, 0);
400404
}
401-
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
405+
402406
this.writer = nextWriter;
403407
if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) {
404408
this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream();
@@ -453,8 +457,6 @@ private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws
453457
}
454458
LOG.warn("Riding over failed WAL close of " + path
455459
+ "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
456-
} finally {
457-
inflightWALClosures.remove(path.getName());
458460
}
459461
}
460462

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,10 +415,11 @@ public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
415415
serverName = ServerName.parseServerName(logDirName);
416416
} catch (IllegalArgumentException | IllegalStateException ex) {
417417
serverName = null;
418-
LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
418+
LOG.warn("Cannot parse a server name from path={}", logFile, ex);
419419
}
420420
if (serverName != null && serverName.getStartCode() < 0) {
421-
LOG.warn("Invalid log file path=" + logFile);
421+
LOG.warn("Invalid log file path={}, start code {} is less than 0", logFile,
422+
serverName.getStartCode());
422423
serverName = null;
423424
}
424425
return serverName;
@@ -483,6 +484,11 @@ public static Path findArchivedLog(Path path, Configuration conf) throws IOExcep
483484
}
484485

485486
ServerName serverName = getServerNameFromWALDirectoryName(path);
487+
if (serverName == null) {
488+
LOG.warn("Can not extract server name from path {}, "
489+
+ "give up searching the separated old log dir", path);
490+
return null;
491+
}
486492
// Try finding the log in separate old log dir
487493
oldLogDir = new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
488494
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -411,9 +411,11 @@ public void testWALRollWriting() throws Exception {
411411
r.flush(true);
412412
}
413413
ADMIN.rollWALWriter(regionServer.getServerName());
414-
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
415-
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
416-
assertTrue(("actual count: " + count), count <= 2);
414+
TEST_UTIL.waitFor(5000, () -> {
415+
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
416+
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
417+
return count <= 2;
418+
});
417419
}
418420

419421
private void setUpforLogRolling() {

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,11 @@ public void testRollWALWALWriter() throws Exception {
152152
r.flush(true);
153153
}
154154
admin.rollWALWriter(regionServer.getServerName()).join();
155-
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
156-
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
157-
assertTrue(("actual count: " + count), count <= 2);
155+
TEST_UTIL.waitFor(5000, () -> {
156+
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
157+
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
158+
return count <= 2;
159+
});
158160
}
159161

160162
private void setUpforLogRolling() {

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -477,9 +477,8 @@ public void testFlushingWhenLogRolling() throws Exception {
477477
// Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
478478
int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
479479
assertNull(getWAL(desiredRegion).rollWriter());
480-
while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) {
481-
Thread.sleep(100);
482-
}
480+
TEST_UTIL.waitFor(60000,
481+
() -> getNumRolledLogFiles(desiredRegion) > currentNumRolledLogFiles);
483482
}
484483
assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
485484
assertTrue(
@@ -518,7 +517,7 @@ public String explainFailure() throws Exception {
518517
desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize());
519518
// let WAL cleanOldLogs
520519
assertNull(getWAL(desiredRegion).rollWriter(true));
521-
assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);
520+
TEST_UTIL.waitFor(60000, () -> getNumRolledLogFiles(desiredRegion) < maxLogs);
522521
} finally {
523522
TEST_UTIL.shutdownMiniCluster();
524523
}

0 commit comments

Comments
 (0)