Skip to content

Commit 230fdc0

Browse files
authored
HBASE-27732 NPE in TestBasicWALEntryStreamFSHLog.testEOFExceptionInOldWALsDirectory (apache#5119)
Add a 'closed' flag in WALProps in AbstractFSWAL to indicate that whether a WAL file has been closed, if not, we will not try to archive it. Will mark it as closed after we fully close it in the background close task, and try to archive again. Also modified some tests since now the archiving of a rolled WAL file is also asynchronous, we need to wait instead of asserting directly. Signed-off-by: Viraj Jasani <vjasani@apache.org>
1 parent 3e08f92 commit 230fdc0

File tree

10 files changed

+361
-287
lines changed

10 files changed

+361
-287
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -298,27 +298,34 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
298298
final Comparator<Path> LOG_NAME_COMPARATOR =
299299
(o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
300300

301-
private static final class WalProps {
301+
private static final class WALProps {
302302

303303
/**
304304
* Map the encoded region name to the highest sequence id.
305305
* <p/>
306306
* Contains all the regions it has an entry for.
307307
*/
308-
public final Map<byte[], Long> encodedName2HighestSequenceId;
308+
private final Map<byte[], Long> encodedName2HighestSequenceId;
309309

310310
/**
311311
* The log file size. Notice that the size may not be accurate if we do asynchronous close in
312312
* sub classes.
313313
*/
314-
public final long logSize;
314+
private final long logSize;
315315

316316
/**
317317
* The nanoTime of the log rolling, used to determine the time interval that has passed since.
318318
*/
319-
public final long rollTimeNs;
319+
private final long rollTimeNs;
320320

321-
public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
321+
/**
322+
* If we do asynchronous close in sub classes, it is possible that when adding WALProps to the
323+
* rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file,
324+
* for safety.
325+
*/
326+
private volatile boolean closed = false;
327+
328+
WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
322329
this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
323330
this.logSize = logSize;
324331
this.rollTimeNs = System.nanoTime();
@@ -329,7 +336,7 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
329336
* Map of WAL log file to properties. The map is sorted by the log file creation timestamp
330337
* (contained in the log file name).
331338
*/
332-
protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
339+
protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props =
333340
new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
334341

335342
/**
@@ -348,6 +355,9 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
348355

349356
protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
350357

358+
protected final ExecutorService closeExecutor = Executors.newCachedThreadPool(
359+
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
360+
351361
// Run in caller if we get reject execution exception, to avoid aborting region server when we get
352362
// reject execution exception. Usually this should not happen but let's make it more robust.
353363
private final ExecutorService logArchiveExecutor =
@@ -697,7 +707,7 @@ Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
697707
Map<byte[], List<byte[]>> regions = null;
698708
int logCount = getNumRolledLogFiles();
699709
if (logCount > this.maxLogs && logCount > 0) {
700-
Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
710+
Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry();
701711
regions =
702712
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
703713
}
@@ -720,17 +730,38 @@ Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
720730
return regions;
721731
}
722732

733+
/**
734+
* Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file.
735+
*/
736+
protected final void markClosedAndClean(Path path) {
737+
WALProps props = walFile2Props.get(path);
738+
// typically this should not be null, but if there is no big issue if it is already null, so
739+
// let's make the code more robust
740+
if (props != null) {
741+
props.closed = true;
742+
cleanOldLogs();
743+
}
744+
}
745+
723746
/**
724747
* Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
748+
* <p/>
749+
* Use synchronized because we may call this method in different threads, normally when replacing
750+
* writer, and since now close writer may be asynchronous, we will also call this method in the
751+
* closeExecutor, right after we actually close a WAL writer.
725752
*/
726-
private void cleanOldLogs() throws IOException {
753+
private synchronized void cleanOldLogs() {
727754
List<Pair<Path, Long>> logsToArchive = null;
728755
long now = System.nanoTime();
729756
boolean mayLogTooOld = nextLogTooOldNs <= now;
730757
ArrayList<byte[]> regionsBlockingWal = null;
731758
// For each log file, look at its Map of regions to highest sequence id; if all sequence ids
732759
// are older than what is currently in memory, the WAL can be GC'd.
733-
for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
760+
for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) {
761+
if (!e.getValue().closed) {
762+
LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey());
763+
continue;
764+
}
734765
Path log = e.getKey();
735766
ArrayList<byte[]> regionsBlockingThisWal = null;
736767
long ageNs = now - e.getValue().rollTimeNs;
@@ -834,7 +865,7 @@ protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long ol
834865
String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
835866
if (oldPath != null) {
836867
this.walFile2Props.put(oldPath,
837-
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
868+
new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
838869
this.totalLogSize.addAndGet(oldFileLen);
839870
LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
840871
CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
@@ -1029,6 +1060,20 @@ public Void call() throws Exception {
10291060
// and abort the region server
10301061
logArchiveExecutor.shutdown();
10311062
}
1063+
// we also need to wait logArchive to finish if we want to a graceful shutdown as we may still
1064+
// have some pending archiving tasks not finished yet, and in close we may archive all the
1065+
// remaining WAL files, there could be race if we do not wait for the background archive task
1066+
// finish
1067+
try {
1068+
if (!logArchiveExecutor.awaitTermination(walShutdownTimeout, TimeUnit.MILLISECONDS)) {
1069+
throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
1070+
+ " the shutdown of WAL doesn't complete! Please check the status of underlying "
1071+
+ "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
1072+
+ "\"");
1073+
}
1074+
} catch (InterruptedException e) {
1075+
throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
1076+
}
10321077
}
10331078

10341079
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.util.SortedSet;
3636
import java.util.TreeSet;
3737
import java.util.concurrent.ExecutorService;
38-
import java.util.concurrent.Executors;
3938
import java.util.concurrent.LinkedBlockingQueue;
4039
import java.util.concurrent.ThreadPoolExecutor;
4140
import java.util.concurrent.TimeUnit;
@@ -179,9 +178,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
179178

180179
private final long batchSize;
181180

182-
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
183-
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
184-
185181
private volatile AsyncFSOutput fsOut;
186182

187183
private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
@@ -778,32 +774,40 @@ private void waitForSafePoint() {
778774
}
779775
}
780776

781-
protected final long closeWriter(AsyncWriter writer, Path path) {
782-
if (writer != null) {
783-
inflightWALClosures.put(path.getName(), writer);
784-
long fileLength = writer.getLength();
785-
closeExecutor.execute(() -> {
786-
try {
787-
writer.close();
788-
} catch (IOException e) {
789-
LOG.warn("close old writer failed", e);
790-
} finally {
791-
inflightWALClosures.remove(path.getName());
792-
}
793-
});
794-
return fileLength;
795-
} else {
796-
return 0L;
797-
}
777+
private void closeWriter(AsyncWriter writer, Path path) {
778+
inflightWALClosures.put(path.getName(), writer);
779+
closeExecutor.execute(() -> {
780+
try {
781+
writer.close();
782+
} catch (IOException e) {
783+
LOG.warn("close old writer failed", e);
784+
} finally {
785+
// call this even if the above close fails, as there is no other chance we can set closed to
786+
// true, it will not cause big problems.
787+
markClosedAndClean(path);
788+
inflightWALClosures.remove(path.getName());
789+
}
790+
});
798791
}
799792

800793
@Override
801794
protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
802795
throws IOException {
803796
Preconditions.checkNotNull(nextWriter);
804797
waitForSafePoint();
805-
long oldFileLen = closeWriter(this.writer, oldPath);
806-
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
798+
// we will call rollWriter in init method, where we want to create the first writer and
799+
// obviously the previous writer is null, so here we need this null check. And why we must call
800+
// logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after
801+
// closing the writer asynchronously, we need to make sure the WALProps is put into
802+
// walFile2Props before we call markClosedAndClean
803+
if (writer != null) {
804+
long oldFileLen = writer.getLength();
805+
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
806+
closeWriter(writer, oldPath);
807+
} else {
808+
logRollAndSetupWalProps(oldPath, newPath, 0);
809+
}
810+
807811
this.writer = nextWriter;
808812
if (nextWriter instanceof AsyncProtobufLogWriter) {
809813
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
import java.util.List;
3737
import java.util.concurrent.BlockingQueue;
3838
import java.util.concurrent.CountDownLatch;
39-
import java.util.concurrent.ExecutorService;
40-
import java.util.concurrent.Executors;
4139
import java.util.concurrent.LinkedBlockingQueue;
4240
import java.util.concurrent.TimeUnit;
4341
import java.util.concurrent.atomic.AtomicInteger;
@@ -169,8 +167,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
169167
private final AtomicInteger closeErrorCount = new AtomicInteger();
170168

171169
private final int waitOnShutdownInSeconds;
172-
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
173-
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
174170

175171
/**
176172
* Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
@@ -377,28 +373,44 @@ protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) th
377373
LOG.warn(
378374
"Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage());
379375
}
380-
long oldFileLen = 0L;
381376
// It is at the safe point. Swap out writer from under the blocked writer thread.
377+
// we will call rollWriter in init method, where we want to create the first writer and
378+
// obviously the previous writer is null, so here we need this null check. And why we must
379+
// call logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean
380+
// after closing the writer asynchronously, we need to make sure the WALProps is put into
381+
// walFile2Props before we call markClosedAndClean
382382
if (this.writer != null) {
383-
oldFileLen = this.writer.getLength();
383+
long oldFileLen = this.writer.getLength();
384+
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
384385
// In case of having unflushed entries or we already reached the
385386
// closeErrorsTolerated count, call the closeWriter inline rather than in async
386387
// way so that in case of an IOE we will throw it back and abort RS.
387388
inflightWALClosures.put(oldPath.getName(), writer);
388389
if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) {
389-
closeWriter(this.writer, oldPath, true);
390+
try {
391+
closeWriter(this.writer, oldPath, true);
392+
} finally {
393+
inflightWALClosures.remove(oldPath.getName());
394+
}
390395
} else {
391396
Writer localWriter = this.writer;
392397
closeExecutor.execute(() -> {
393398
try {
394399
closeWriter(localWriter, oldPath, false);
395400
} catch (IOException e) {
396-
// We will never reach here.
401+
LOG.warn("close old writer failed", e);
402+
} finally {
403+
// call this even if the above close fails, as there is no other chance we can set
404+
// closed to true, it will not cause big problems.
405+
markClosedAndClean(oldPath);
406+
inflightWALClosures.remove(oldPath.getName());
397407
}
398408
});
399409
}
410+
} else {
411+
logRollAndSetupWalProps(oldPath, newPath, 0);
400412
}
401-
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
413+
402414
this.writer = nextWriter;
403415
if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) {
404416
this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream();
@@ -453,8 +465,6 @@ private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws
453465
}
454466
LOG.warn("Riding over failed WAL close of " + path
455467
+ "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
456-
} finally {
457-
inflightWALClosures.remove(path.getName());
458468
}
459469
}
460470

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,10 +415,11 @@ public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
415415
serverName = ServerName.parseServerName(logDirName);
416416
} catch (IllegalArgumentException | IllegalStateException ex) {
417417
serverName = null;
418-
LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
418+
LOG.warn("Cannot parse a server name from path={}", logFile, ex);
419419
}
420420
if (serverName != null && serverName.getStartCode() < 0) {
421-
LOG.warn("Invalid log file path=" + logFile);
421+
LOG.warn("Invalid log file path={}, start code {} is less than 0", logFile,
422+
serverName.getStartCode());
422423
serverName = null;
423424
}
424425
return serverName;
@@ -483,6 +484,11 @@ public static Path findArchivedLog(Path path, Configuration conf) throws IOExcep
483484
}
484485

485486
ServerName serverName = getServerNameFromWALDirectoryName(path);
487+
if (serverName == null) {
488+
LOG.warn("Can not extract server name from path {}, "
489+
+ "give up searching the separated old log dir", path);
490+
return null;
491+
}
486492
// Try finding the log in separate old log dir
487493
oldLogDir = new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
488494
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -411,9 +411,11 @@ public void testWALRollWriting() throws Exception {
411411
r.flush(true);
412412
}
413413
ADMIN.rollWALWriter(regionServer.getServerName());
414-
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
415-
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
416-
assertTrue(("actual count: " + count), count <= 2);
414+
TEST_UTIL.waitFor(5000, () -> {
415+
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
416+
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
417+
return count <= 2;
418+
});
417419
}
418420

419421
private void setUpforLogRolling() {

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,11 @@ public void testRollWALWALWriter() throws Exception {
152152
r.flush(true);
153153
}
154154
admin.rollWALWriter(regionServer.getServerName()).join();
155-
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
156-
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
157-
assertTrue(("actual count: " + count), count <= 2);
155+
TEST_UTIL.waitFor(5000, () -> {
156+
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
157+
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
158+
return count <= 2;
159+
});
158160
}
159161

160162
private void setUpforLogRolling() {

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -477,9 +477,8 @@ public void testFlushingWhenLogRolling() throws Exception {
477477
// Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
478478
int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
479479
assertNull(getWAL(desiredRegion).rollWriter());
480-
while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) {
481-
Thread.sleep(100);
482-
}
480+
TEST_UTIL.waitFor(60000,
481+
() -> getNumRolledLogFiles(desiredRegion) > currentNumRolledLogFiles);
483482
}
484483
assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
485484
assertTrue(
@@ -518,7 +517,7 @@ public String explainFailure() throws Exception {
518517
desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize());
519518
// let WAL cleanOldLogs
520519
assertNull(getWAL(desiredRegion).rollWriter(true));
521-
assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);
520+
TEST_UTIL.waitFor(60000, () -> getNumRolledLogFiles(desiredRegion) < maxLogs);
522521
} finally {
523522
TEST_UTIL.shutdownMiniCluster();
524523
}

0 commit comments

Comments
 (0)