Skip to content

Commit 17d15a1

Browse files
committed
HBASE-27732 NPE in TestBasicWALEntryStreamFSHLog.testEOFExceptionInOldWALsDirectory (#5119)
Add a 'closed' flag in WALProps in AbstractFSWAL to indicate that whether a WAL file has been closed, if not, we will not try to archive it. Will mark it as closed after we fully close it in the background close task, and try to archive again. Also modified some tests since now the archiving of a rolled WAL file is also asynchronous, we need to wait instead of asserting directly. Signed-off-by: Viraj Jasani <vjasani@apache.org> (cherry picked from commit 230fdc0)
1 parent 3b24b3b commit 17d15a1

File tree

10 files changed

+360
-286
lines changed

10 files changed

+360
-286
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -289,22 +289,29 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
289289
final Comparator<Path> LOG_NAME_COMPARATOR =
290290
(o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
291291

292-
private static final class WalProps {
292+
private static final class WALProps {
293293

294294
/**
295295
* Map the encoded region name to the highest sequence id.
296296
* <p/>
297297
* Contains all the regions it has an entry for.
298298
*/
299-
public final Map<byte[], Long> encodedName2HighestSequenceId;
299+
private final Map<byte[], Long> encodedName2HighestSequenceId;
300300

301301
/**
302302
* The log file size. Notice that the size may not be accurate if we do asynchronous close in
303303
* sub classes.
304304
*/
305-
public final long logSize;
305+
private final long logSize;
306306

307-
public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
307+
/**
308+
* If we do asynchronous close in sub classes, it is possible that when adding WALProps to the
309+
* rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file,
310+
* for safety.
311+
*/
312+
private volatile boolean closed = false;
313+
314+
WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
308315
this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
309316
this.logSize = logSize;
310317
}
@@ -314,7 +321,7 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
314321
* Map of WAL log file to properties. The map is sorted by the log file creation timestamp
315322
* (contained in the log file name).
316323
*/
317-
protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
324+
protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props =
318325
new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
319326

320327
/**
@@ -333,6 +340,9 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
333340

334341
protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
335342

343+
protected final ExecutorService closeExecutor = Executors.newCachedThreadPool(
344+
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
345+
336346
// Run in caller if we get reject execution exception, to avoid aborting region server when we get
337347
// reject execution exception. Usually this should not happen but let's make it more robust.
338348
private final ExecutorService logArchiveExecutor =
@@ -680,7 +690,7 @@ Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
680690
Map<byte[], List<byte[]>> regions = null;
681691
int logCount = getNumRolledLogFiles();
682692
if (logCount > this.maxLogs && logCount > 0) {
683-
Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
693+
Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry();
684694
regions =
685695
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
686696
}
@@ -703,14 +713,35 @@ Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
703713
return regions;
704714
}
705715

716+
/**
717+
* Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file.
718+
*/
719+
protected final void markClosedAndClean(Path path) {
720+
WALProps props = walFile2Props.get(path);
721+
// typically this should not be null, but if there is no big issue if it is already null, so
722+
// let's make the code more robust
723+
if (props != null) {
724+
props.closed = true;
725+
cleanOldLogs();
726+
}
727+
}
728+
706729
/**
707730
* Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
731+
* <p/>
732+
* Use synchronized because we may call this method in different threads, normally when replacing
733+
* writer, and since now close writer may be asynchronous, we will also call this method in the
734+
* closeExecutor, right after we actually close a WAL writer.
708735
*/
709-
private void cleanOldLogs() throws IOException {
736+
private synchronized void cleanOldLogs() {
710737
List<Pair<Path, Long>> logsToArchive = null;
711738
// For each log file, look at its Map of regions to highest sequence id; if all sequence ids
712739
// are older than what is currently in memory, the WAL can be GC'd.
713-
for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
740+
for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) {
741+
if (!e.getValue().closed) {
742+
LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey());
743+
continue;
744+
}
714745
Path log = e.getKey();
715746
Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
716747
if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
@@ -792,7 +823,7 @@ protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long ol
792823
String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
793824
if (oldPath != null) {
794825
this.walFile2Props.put(oldPath,
795-
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
826+
new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
796827
this.totalLogSize.addAndGet(oldFileLen);
797828
LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
798829
CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
@@ -988,6 +1019,20 @@ public Void call() throws Exception {
9881019
// and abort the region server
9891020
logArchiveExecutor.shutdown();
9901021
}
1022+
// we also need to wait logArchive to finish if we want to a graceful shutdown as we may still
1023+
// have some pending archiving tasks not finished yet, and in close we may archive all the
1024+
// remaining WAL files, there could be race if we do not wait for the background archive task
1025+
// finish
1026+
try {
1027+
if (!logArchiveExecutor.awaitTermination(walShutdownTimeout, TimeUnit.MILLISECONDS)) {
1028+
throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
1029+
+ " the shutdown of WAL doesn't complete! Please check the status of underlying "
1030+
+ "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
1031+
+ "\"");
1032+
}
1033+
} catch (InterruptedException e) {
1034+
throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
1035+
}
9911036
}
9921037

9931038
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.util.SortedSet;
3636
import java.util.TreeSet;
3737
import java.util.concurrent.ExecutorService;
38-
import java.util.concurrent.Executors;
3938
import java.util.concurrent.LinkedBlockingQueue;
4039
import java.util.concurrent.ThreadPoolExecutor;
4140
import java.util.concurrent.TimeUnit;
@@ -179,9 +178,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
179178

180179
private final long batchSize;
181180

182-
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
183-
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
184-
185181
private volatile AsyncFSOutput fsOut;
186182

187183
private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
@@ -718,32 +714,40 @@ private void waitForSafePoint() {
718714
}
719715
}
720716

721-
protected final long closeWriter(AsyncWriter writer, Path path) {
722-
if (writer != null) {
723-
inflightWALClosures.put(path.getName(), writer);
724-
long fileLength = writer.getLength();
725-
closeExecutor.execute(() -> {
726-
try {
727-
writer.close();
728-
} catch (IOException e) {
729-
LOG.warn("close old writer failed", e);
730-
} finally {
731-
inflightWALClosures.remove(path.getName());
732-
}
733-
});
734-
return fileLength;
735-
} else {
736-
return 0L;
737-
}
717+
private void closeWriter(AsyncWriter writer, Path path) {
718+
inflightWALClosures.put(path.getName(), writer);
719+
closeExecutor.execute(() -> {
720+
try {
721+
writer.close();
722+
} catch (IOException e) {
723+
LOG.warn("close old writer failed", e);
724+
} finally {
725+
// call this even if the above close fails, as there is no other chance we can set closed to
726+
// true, it will not cause big problems.
727+
markClosedAndClean(path);
728+
inflightWALClosures.remove(path.getName());
729+
}
730+
});
738731
}
739732

740733
@Override
741734
protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
742735
throws IOException {
743736
Preconditions.checkNotNull(nextWriter);
744737
waitForSafePoint();
745-
long oldFileLen = closeWriter(this.writer, oldPath);
746-
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
738+
// we will call rollWriter in init method, where we want to create the first writer and
739+
// obviously the previous writer is null, so here we need this null check. And why we must call
740+
// logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after
741+
// closing the writer asynchronously, we need to make sure the WALProps is put into
742+
// walFile2Props before we call markClosedAndClean
743+
if (writer != null) {
744+
long oldFileLen = writer.getLength();
745+
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
746+
closeWriter(writer, oldPath);
747+
} else {
748+
logRollAndSetupWalProps(oldPath, newPath, 0);
749+
}
750+
747751
this.writer = nextWriter;
748752
if (nextWriter instanceof AsyncProtobufLogWriter) {
749753
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
import java.util.List;
3737
import java.util.concurrent.BlockingQueue;
3838
import java.util.concurrent.CountDownLatch;
39-
import java.util.concurrent.ExecutorService;
40-
import java.util.concurrent.Executors;
4139
import java.util.concurrent.LinkedBlockingQueue;
4240
import java.util.concurrent.TimeUnit;
4341
import java.util.concurrent.atomic.AtomicInteger;
@@ -169,8 +167,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
169167
private final AtomicInteger closeErrorCount = new AtomicInteger();
170168

171169
private final int waitOnShutdownInSeconds;
172-
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
173-
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
174170

175171
/**
176172
* Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
@@ -376,28 +372,44 @@ protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) th
376372
LOG.warn(
377373
"Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage());
378374
}
379-
long oldFileLen = 0L;
380375
// It is at the safe point. Swap out writer from under the blocked writer thread.
376+
// we will call rollWriter in init method, where we want to create the first writer and
377+
// obviously the previous writer is null, so here we need this null check. And why we must
378+
// call logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean
379+
// after closing the writer asynchronously, we need to make sure the WALProps is put into
380+
// walFile2Props before we call markClosedAndClean
381381
if (this.writer != null) {
382-
oldFileLen = this.writer.getLength();
382+
long oldFileLen = this.writer.getLength();
383+
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
383384
// In case of having unflushed entries or we already reached the
384385
// closeErrorsTolerated count, call the closeWriter inline rather than in async
385386
// way so that in case of an IOE we will throw it back and abort RS.
386387
inflightWALClosures.put(oldPath.getName(), writer);
387388
if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) {
388-
closeWriter(this.writer, oldPath, true);
389+
try {
390+
closeWriter(this.writer, oldPath, true);
391+
} finally {
392+
inflightWALClosures.remove(oldPath.getName());
393+
}
389394
} else {
390395
Writer localWriter = this.writer;
391396
closeExecutor.execute(() -> {
392397
try {
393398
closeWriter(localWriter, oldPath, false);
394399
} catch (IOException e) {
395-
// We will never reach here.
400+
LOG.warn("close old writer failed", e);
401+
} finally {
402+
// call this even if the above close fails, as there is no other chance we can set
403+
// closed to true, it will not cause big problems.
404+
markClosedAndClean(oldPath);
405+
inflightWALClosures.remove(oldPath.getName());
396406
}
397407
});
398408
}
409+
} else {
410+
logRollAndSetupWalProps(oldPath, newPath, 0);
399411
}
400-
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
412+
401413
this.writer = nextWriter;
402414
if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) {
403415
this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream();
@@ -452,8 +464,6 @@ private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws
452464
}
453465
LOG.warn("Riding over failed WAL close of " + path
454466
+ "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
455-
} finally {
456-
inflightWALClosures.remove(path.getName());
457467
}
458468
}
459469

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,10 +394,11 @@ public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
394394
serverName = ServerName.parseServerName(logDirName);
395395
} catch (IllegalArgumentException | IllegalStateException ex) {
396396
serverName = null;
397-
LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
397+
LOG.warn("Cannot parse a server name from path={}", logFile, ex);
398398
}
399399
if (serverName != null && serverName.getStartCode() < 0) {
400-
LOG.warn("Invalid log file path=" + logFile);
400+
LOG.warn("Invalid log file path={}, start code {} is less than 0", logFile,
401+
serverName.getStartCode());
401402
serverName = null;
402403
}
403404
return serverName;
@@ -462,6 +463,11 @@ public static Path findArchivedLog(Path path, Configuration conf) throws IOExcep
462463
}
463464

464465
ServerName serverName = getServerNameFromWALDirectoryName(path);
466+
if (serverName == null) {
467+
LOG.warn("Can not extract server name from path {}, "
468+
+ "give up searching the separated old log dir", path);
469+
return null;
470+
}
465471
// Try finding the log in separate old log dir
466472
oldLogDir = new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
467473
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -419,9 +419,11 @@ public void testWALRollWriting() throws Exception {
419419
r.flush(true);
420420
}
421421
ADMIN.rollWALWriter(regionServer.getServerName());
422-
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
423-
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
424-
assertTrue(("actual count: " + count), count <= 2);
422+
TEST_UTIL.waitFor(5000, () -> {
423+
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
424+
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
425+
return count <= 2;
426+
});
425427
}
426428

427429
private void setUpforLogRolling() {

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,11 @@ public void testRollWALWALWriter() throws Exception {
152152
r.flush(true);
153153
}
154154
admin.rollWALWriter(regionServer.getServerName()).join();
155-
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
156-
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
157-
assertTrue(("actual count: " + count), count <= 2);
155+
TEST_UTIL.waitFor(5000, () -> {
156+
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
157+
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
158+
return count <= 2;
159+
});
158160
}
159161

160162
private void setUpforLogRolling() {

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -488,9 +488,8 @@ public void testFlushingWhenLogRolling() throws Exception {
488488
// Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
489489
int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
490490
assertNull(getWAL(desiredRegion).rollWriter());
491-
while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) {
492-
Thread.sleep(100);
493-
}
491+
TEST_UTIL.waitFor(60000,
492+
() -> getNumRolledLogFiles(desiredRegion) > currentNumRolledLogFiles);
494493
}
495494
assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
496495
assertTrue(
@@ -529,7 +528,7 @@ public String explainFailure() throws Exception {
529528
desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize());
530529
// let WAL cleanOldLogs
531530
assertNull(getWAL(desiredRegion).rollWriter(true));
532-
assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);
531+
TEST_UTIL.waitFor(60000, () -> getNumRolledLogFiles(desiredRegion) < maxLogs);
533532
} finally {
534533
TEST_UTIL.shutdownMiniCluster();
535534
}

0 commit comments

Comments
 (0)