Skip to content

Commit

Permalink
HDFS-17250. EditLogTailer#triggerActiveLogRoll should handle thread I…
Browse files Browse the repository at this point in the history
…nterrupted (apache#6266). Contributed by Haiyang Hu.

Reviewed-by: ZanderXu <zanderxu@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
  • Loading branch information
haiyang1987 authored and jiajunmao committed Feb 6, 2024
1 parent e8d8af3 commit 850d3ea
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,15 @@ public T call() throws IOException {
private NamenodeProtocol getActiveNodeProxy() throws IOException {
if (cachedActiveProxy == null) {
while (true) {
// If the thread is interrupted, quit by returning null.
if (Thread.currentThread().isInterrupted()) {
LOG.warn("Interrupted while trying to getActiveNodeProxy.");
return null;
}

// if we have reached the max loop count, quit by returning null
if ((nnLoopCount / nnCount) >= maxRetries) {
LOG.warn("Have reached the max loop count ({}).", nnLoopCount);
return null;
}

Expand All @@ -638,4 +645,24 @@ private NamenodeProtocol getActiveNodeProxy() throws IOException {
return cachedActiveProxy;
}
}

@VisibleForTesting
public NamenodeProtocol getCachedActiveProxy() {
return cachedActiveProxy;
}

@VisibleForTesting
public long getLastRollTimeMs() {
return lastRollTimeMs;
}

@VisibleForTesting
public RemoteNameNodeInfo getCurrentNN() {
return currentNN;
}

@VisibleForTesting
public void setShouldRunForTest(boolean shouldRun) {
this.tailerThread.setShouldRun(shouldRun);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.BindException;
import java.net.URI;
import java.util.ArrayList;
Expand Down Expand Up @@ -462,6 +463,80 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits()
}
}

@Test
public void testRollEditLogHandleThreadInterruption()
throws IOException, InterruptedException, TimeoutException {
Configuration conf = getConf();
// RollEdits timeout 1s.
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, 1);

MiniDFSCluster cluster = null;
try {
cluster = createMiniDFSCluster(conf, 3);
cluster.transitionToActive(2);
EditLogTailer tailer = Mockito.spy(
cluster.getNamesystem(0).getEditLogTailer());

// Stop the edit log tail thread for testing.
tailer.setShouldRunForTest(false);

final AtomicInteger invokedTimes = new AtomicInteger(0);

// For nn0 run triggerActiveLogRoll, nns is [nn1,nn2].
// Mock the NameNodeProxy for testing.
// An InterruptedIOException will be thrown when requesting to nn1.
when(tailer.getNameNodeProxy()).thenReturn(
tailer.new MultipleNameNodeProxy<Void>() {
@Override
protected Void doWork() throws IOException {
invokedTimes.getAndIncrement();
if (tailer.getCurrentNN().getNameNodeID().equals("nn1")) {
while (true) {
Thread.yield();
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedIOException("It is an Interrupted IOException.");
}
}
} else {
tailer.getCachedActiveProxy().rollEditLog();
return null;
}
}
}
);

// Record the initial LastRollTimeMs value.
// This time will be updated only when triggerActiveLogRoll is executed successfully.
long initLastRollTimeMs = tailer.getLastRollTimeMs();

// Execute triggerActiveLogRoll for the first time.
// The MultipleNameNodeProxy uses round-robin to look for an active NN to roll the edit log.
// Here, a request will be made to nn1, and the main thread will trigger a Timeout and
// the doWork() method will throw an InterruptedIOException.
// The getActiveNodeProxy() method will determine that the thread is interrupted
// and will return null.
tailer.triggerActiveLogRoll();

// Execute triggerActiveLogRoll for the second time.
// A request will be made to nn2 and the rollEditLog will be successfully finished and
// lastRollTimeMs will be updated.
tailer.triggerActiveLogRoll();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return tailer.getLastRollTimeMs() > initLastRollTimeMs;
}
}, 100, 10000);

// The total number of invoked times should be 2.
assertEquals(2, invokedTimes.get());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}

private static void waitForStandbyToCatchUpWithInProgressEdits(
final NameNode standby, final long activeTxId,
int maxWaitSec) throws Exception {
Expand Down

0 comments on commit 850d3ea

Please sign in to comment.