From 850d3eac7765113c609e2d519fe260f42ede961b Mon Sep 17 00:00:00 2001 From: huhaiyang Date: Mon, 4 Dec 2023 21:19:33 +0800 Subject: [PATCH] HDFS-17250. EditLogTailer#triggerActiveLogRoll should handle thread Interrupted (#6266). Contributed by Haiyang Hu. Reviewed-by: ZanderXu Signed-off-by: He Xiaoqiao --- .../server/namenode/ha/EditLogTailer.java | 27 +++++++ .../server/namenode/ha/TestEditLogTailer.java | 75 +++++++++++++++++++ 2 files changed, 102 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index d43035ba731e2..d14d8a8892202 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -612,8 +612,15 @@ public T call() throws IOException { private NamenodeProtocol getActiveNodeProxy() throws IOException { if (cachedActiveProxy == null) { while (true) { + // If the thread is interrupted, quit by returning null. + if (Thread.currentThread().isInterrupted()) { + LOG.warn("Interrupted while trying to getActiveNodeProxy."); + return null; + } + // if we have reached the max loop count, quit by returning null if ((nnLoopCount / nnCount) >= maxRetries) { + LOG.warn("Have reached the max loop count ({}).", nnLoopCount); return null; } @@ -638,4 +645,24 @@ private NamenodeProtocol getActiveNodeProxy() throws IOException { return cachedActiveProxy; } } + + @VisibleForTesting + public NamenodeProtocol getCachedActiveProxy() { + return cachedActiveProxy; + } + + @VisibleForTesting + public long getLastRollTimeMs() { + return lastRollTimeMs; + } + + @VisibleForTesting + public RemoteNameNodeInfo getCurrentNN() { + return currentNN; + } + + @VisibleForTesting + public void setShouldRunForTest(boolean shouldRun) { + this.tailerThread.setShouldRun(shouldRun); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java index 23569130c84a8..4e88cd389ea3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.BindException; import java.net.URI; import java.util.ArrayList; @@ -462,6 +463,80 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits() } } + @Test + public void testRollEditLogHandleThreadInterruption() + throws IOException, InterruptedException, TimeoutException { + Configuration conf = getConf(); + // RollEdits timeout 1s. + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, 1); + + MiniDFSCluster cluster = null; + try { + cluster = createMiniDFSCluster(conf, 3); + cluster.transitionToActive(2); + EditLogTailer tailer = Mockito.spy( + cluster.getNamesystem(0).getEditLogTailer()); + + // Stop the edit log tail thread for testing. + tailer.setShouldRunForTest(false); + + final AtomicInteger invokedTimes = new AtomicInteger(0); + + // For nn0 run triggerActiveLogRoll, nns is [nn1,nn2]. + // Mock the NameNodeProxy for testing. + // An InterruptedIOException will be thrown when requesting to nn1. + when(tailer.getNameNodeProxy()).thenReturn( + tailer.new MultipleNameNodeProxy() { + @Override + protected Void doWork() throws IOException { + invokedTimes.getAndIncrement(); + if (tailer.getCurrentNN().getNameNodeID().equals("nn1")) { + while (true) { + Thread.yield(); + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedIOException("It is an Interrupted IOException."); + } + } + } else { + tailer.getCachedActiveProxy().rollEditLog(); + return null; + } + } + } + ); + + // Record the initial LastRollTimeMs value. + // This time will be updated only when triggerActiveLogRoll is executed successfully. + long initLastRollTimeMs = tailer.getLastRollTimeMs(); + + // Execute triggerActiveLogRoll for the first time. + // The MultipleNameNodeProxy uses round-robin to look for an active NN to roll the edit log. + // Here, a request will be made to nn1, and the main thread will trigger a Timeout and + // the doWork() method will throw an InterruptedIOException. + // The getActiveNodeProxy() method will determine that the thread is interrupted + // and will return null. + tailer.triggerActiveLogRoll(); + + // Execute triggerActiveLogRoll for the second time. + // A request will be made to nn2 and the rollEditLog will be successfully finished and + // lastRollTimeMs will be updated. + tailer.triggerActiveLogRoll(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return tailer.getLastRollTimeMs() > initLastRollTimeMs; + } + }, 100, 10000); + + // The total number of invoked times should be 2. + assertEquals(2, invokedTimes.get()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + private static void waitForStandbyToCatchUpWithInProgressEdits( final NameNode standby, final long activeTxId, int maxWaitSec) throws Exception {