Skip to content

HDFS-17250. EditLogTailer#triggerActiveLogRoll should handle thread Interrupted #6266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,15 @@ public T call() throws IOException {
private NamenodeProtocol getActiveNodeProxy() throws IOException {
if (cachedActiveProxy == null) {
while (true) {
// If the thread is interrupted, quit by returning null.
if (Thread.currentThread().isInterrupted()) {
LOG.warn("Interrupted while trying to getActiveNodeProxy.");
return null;
}

// if we have reached the max loop count, quit by returning null
if ((nnLoopCount / nnCount) >= maxRetries) {
LOG.warn("Have reached the max loop count ({}).", nnLoopCount);
return null;
}

Expand All @@ -638,4 +645,24 @@ private NamenodeProtocol getActiveNodeProxy() throws IOException {
return cachedActiveProxy;
}
}

@VisibleForTesting
public NamenodeProtocol getCachedActiveProxy() {
return cachedActiveProxy;
}

@VisibleForTesting
public long getLastRollTimeMs() {
return lastRollTimeMs;
}

@VisibleForTesting
public RemoteNameNodeInfo getCurrentNN() {
return currentNN;
}

@VisibleForTesting
public void setShouldRunForTest(boolean shouldRun) {
this.tailerThread.setShouldRun(shouldRun);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.BindException;
import java.net.URI;
import java.util.ArrayList;
Expand Down Expand Up @@ -462,6 +463,80 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits()
}
}

@Test
public void testRollEditLogHandleThreadInterruption()
throws IOException, InterruptedException, TimeoutException {
Configuration conf = getConf();
// RollEdits timeout 1s.
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, 1);

MiniDFSCluster cluster = null;
try {
cluster = createMiniDFSCluster(conf, 3);
cluster.transitionToActive(2);
EditLogTailer tailer = Mockito.spy(
cluster.getNamesystem(0).getEditLogTailer());

// Stop the edit log tail thread for testing.
tailer.setShouldRunForTest(false);

final AtomicInteger invokedTimes = new AtomicInteger(0);

// For nn0 run triggerActiveLogRoll, nns is [nn1,nn2].
// Mock the NameNodeProxy for testing.
// An InterruptedIOException will be thrown when requesting to nn1.
when(tailer.getNameNodeProxy()).thenReturn(
tailer.new MultipleNameNodeProxy<Void>() {
@Override
protected Void doWork() throws IOException {
invokedTimes.getAndIncrement();
if (tailer.getCurrentNN().getNameNodeID().equals("nn1")) {
while (true) {
Thread.yield();
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedIOException("It is an Interrupted IOException.");
}
}
} else {
tailer.getCachedActiveProxy().rollEditLog();
return null;
}
}
}
);

// Record the initial LastRollTimeMs value.
// This time will be updated only when triggerActiveLogRoll is executed successfully.
long initLastRollTimeMs = tailer.getLastRollTimeMs();

// Execute triggerActiveLogRoll for the first time.
// The MultipleNameNodeProxy uses round-robin to look for an active NN to roll the edit log.
// Here, a request will be made to nn1, and the main thread will trigger a Timeout and
// the doWork() method will throw an InterruptedIOException.
// The getActiveNodeProxy() method will determine that the thread is interrupted
// and will return null.
tailer.triggerActiveLogRoll();

// Execute triggerActiveLogRoll for the second time.
// A request will be made to nn2 and the rollEditLog will be successfully finished and
// lastRollTimeMs will be updated.
tailer.triggerActiveLogRoll();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return tailer.getLastRollTimeMs() > initLastRollTimeMs;
}
}, 100, 10000);

// The total number of invoked times should be 2.
assertEquals(2, invokedTimes.get());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}

private static void waitForStandbyToCatchUpWithInProgressEdits(
final NameNode standby, final long activeTxId,
int maxWaitSec) throws Exception {
Expand Down