Skip to content

Commit

Permalink
HDFS-16171. De-flake testDecommissionStatus (#3280)
Browse files Browse the repository at this point in the history
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
(cherry picked from commit 6342d5e)

 Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
  • Loading branch information
virajjasani authored and sunchao committed Jan 4, 2022
1 parent c6914b1 commit 544dffd
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,28 @@ public static void assertExceptionContains(String expectedText,
public static void waitFor(final Supplier<Boolean> check,
final long checkEveryMillis, final long waitForMillis)
throws TimeoutException, InterruptedException {
waitFor(check, checkEveryMillis, waitForMillis, null);
}

/**
* Wait for the specified test to return true. The test will be performed
* initially and then every {@code checkEveryMillis} until at least
* {@code waitForMillis} time has expired. If {@code check} is null or
* {@code waitForMillis} is less than {@code checkEveryMillis} this method
* will throw an {@link IllegalArgumentException}.
*
* @param check the test to perform.
* @param checkEveryMillis how often to perform the test.
* @param waitForMillis the amount of time after which no more tests will be
* performed.
* @param errorMsg error message to provide in TimeoutException.
* @throws TimeoutException if the test does not return true in the allotted
* time.
* @throws InterruptedException if the method is interrupted while waiting.
*/
public static void waitFor(final Supplier<Boolean> check,
final long checkEveryMillis, final long waitForMillis,
final String errorMsg) throws TimeoutException, InterruptedException {
Objects.requireNonNull(check, ERROR_MISSING_ARGUMENT);
if (waitForMillis < checkEveryMillis) {
throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT);
Expand All @@ -432,9 +454,12 @@ public static void waitFor(final Supplier<Boolean> check,
}

if (!result) {
throw new TimeoutException("Timed out waiting for condition. " +
"Thread diagnostics:\n" +
TimedOutTestsListener.buildThreadDiagnosticString());
final String exceptionErrorMsg = "Timed out waiting for condition. "
+ (org.apache.commons.lang3.StringUtils.isNotEmpty(errorMsg)
? "Error Message: " + errorMsg : "")
+ "\nThread diagnostics:\n" +
TimedOutTestsListener.buildThreadDiagnosticString();
throw new TimeoutException(exceptionErrorMsg);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import java.util.Arrays;
import java.util.List;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
Expand Down Expand Up @@ -57,11 +60,12 @@
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/**
* This class tests the decommissioning of nodes.
Expand All @@ -75,7 +79,8 @@ public class TestDecommissioningStatus {
private FileSystem fileSys;
private HostsFileWriter hostsFileWriter;
private Configuration conf;
private Logger LOG;
private static final Logger LOG =
LoggerFactory.getLogger(TestDecommissioningStatus.class);

final ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);

Expand Down Expand Up @@ -107,8 +112,8 @@ protected Configuration setupConfig() throws Exception {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1);
Logger.getLogger(DatanodeAdminManager.class).setLevel(Level.DEBUG);
LOG = Logger.getLogger(TestDecommissioningStatus.class);
GenericTestUtils.setLogLevel(
LoggerFactory.getLogger(DatanodeAdminManager.class), Level.DEBUG);
return conf;
}

Expand Down Expand Up @@ -163,17 +168,30 @@ protected void decommissionNode(String dnName)

protected void checkDecommissionStatus(DatanodeDescriptor decommNode,
int expectedUnderRep, int expectedDecommissionOnly,
int expectedUnderRepInOpenFiles) {
assertEquals("Unexpected num under-replicated blocks",
expectedUnderRep,
decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks());
assertEquals("Unexpected number of decom-only replicas",
expectedDecommissionOnly,
decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas());
assertEquals(
"Unexpected number of replicas in under-replicated open files",
expectedUnderRepInOpenFiles,
decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles());
int expectedUnderRepInOpenFiles) throws TimeoutException,
InterruptedException {
String errorMsg;
errorMsg = "Under replicated blocks. Expected: "
+ expectedUnderRep + " , Actual: "
+ decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks();
GenericTestUtils.waitFor(
() -> expectedUnderRep == decommNode.getLeavingServiceStatus()
.getUnderReplicatedBlocks(),
1000, TimeUnit.SECONDS.toMillis(10), errorMsg);
errorMsg = "OutOfService only replicas. Expected: "
+ expectedDecommissionOnly + " , Actual: "
+ decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas();
GenericTestUtils.waitFor(
() -> expectedDecommissionOnly == decommNode.getLeavingServiceStatus()
.getOutOfServiceOnlyReplicas(),
1000, TimeUnit.SECONDS.toMillis(10), errorMsg);
errorMsg = "UnderReplicated in open files. Expected: "
+ expectedUnderRepInOpenFiles + " , Actual: "
+ decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles();
GenericTestUtils.waitFor(
() -> expectedUnderRepInOpenFiles == decommNode
.getLeavingServiceStatus().getUnderReplicatedInOpenFiles(),
1000, TimeUnit.SECONDS.toMillis(10), errorMsg);
}

protected void checkDFSAdminDecommissionStatus(
Expand Down Expand Up @@ -268,6 +286,7 @@ public void testDecommissionStatus() throws Exception {

FSNamesystem fsn = cluster.getNamesystem();
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
verifyInitialState(fsn, dm);
for (int iteration = 0; iteration < numDatanodes; iteration++) {
String downnode = decommissionNode(client, iteration);
dm.refreshNodes(conf);
Expand All @@ -276,14 +295,13 @@ public void testDecommissionStatus() throws Exception {
// Block until the admin's monitor updates the number of tracked nodes.
waitForDecommissionedNodes(dm.getDatanodeAdminManager(), iteration + 1);
final List<DatanodeDescriptor> decommissioningNodes = dm.getDecommissioningNodes();
assertEquals(decommissioningNodes.size(), iteration + 1);
if (iteration == 0) {
assertEquals(decommissioningNodes.size(), 1);
DatanodeDescriptor decommNode = decommissioningNodes.get(0);
checkDecommissionStatus(decommNode, 3, 0, 1);
checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 1),
fileSys, admin);
} else {
assertEquals(decommissioningNodes.size(), 2);
DatanodeDescriptor decommNode1 = decommissioningNodes.get(0);
DatanodeDescriptor decommNode2 = decommissioningNodes.get(1);
// This one is still 3,3,1 since it passed over the UC block
Expand All @@ -305,6 +323,69 @@ public void testDecommissionStatus() throws Exception {
AdminStatesBaseTest.cleanupFile(fileSys, file2);
}

// Why do we verify initial state of DataNodes here?
// Before we start actual decommission testing, we should ensure that
// total 8 blocks (original 4 blocks of 2 files and 4 replicas) are
// present over two Datanodes available. If we don't wait until all 8 blocks
// are reported live by BlockManager, we might get to a situation
// where one of the replicas might not yet been present on any of Datanodes
// and we start decommissioning process, and then it would result in
// flaky test because total (no of under replicated blocks, no of outOfService
// only replicas, no of under replicated in open files) counts would be
// incorrect.
protected void verifyInitialState(FSNamesystem fsn, DatanodeManager dm)
throws InterruptedException {
dm.getDatanodes().forEach(datanodeDescriptor -> {
try {
checkDecommissionStatus(datanodeDescriptor, 0, 0, 0);
} catch (TimeoutException | InterruptedException e) {
throw new AssertionError("Datanode not in good state.", e);
}
});
int c = 0;
int totalBlocks;
long totalReplicatedBlocks;
while (true) {
totalBlocks = fsn.getBlockManager().getTotalBlocks();
totalReplicatedBlocks = fsn.getBlockManager().getTotalReplicatedBlocks();
if (totalBlocks == 4 && totalReplicatedBlocks == 4) {
break;
} else {
if (c == 4) {
throw new AssertionError("Unexpected Total blocks " + totalBlocks
+ " and replicated blocks " + totalReplicatedBlocks);
}
Thread.sleep(3000);
}
c++;
}
c = 0;
AtomicInteger total = new AtomicInteger(0);
AtomicInteger sufficientBlocksSuccess = new AtomicInteger(0);
while (true) {
total.set(0);
sufficientBlocksSuccess.set(0);
dm.getDatanodes().forEach(
datanodeDescriptor -> {
total.addAndGet(datanodeDescriptor.numBlocks());
if (datanodeDescriptor.numBlocks() == 4) {
sufficientBlocksSuccess.incrementAndGet();
}
});
if (total.get() == 8 && sufficientBlocksSuccess.get() == 2) {
break;
} else {
if (c == 4) {
throw new AssertionError("Unexpected Total blocks " + total.get()
+ " from Datanode Storage. 4 blocks per Datanode Storage"
+ " expected from each DataNode");
}
Thread.sleep(3000);
}
c++;
}
}

/**
* Verify a DN remains in DECOMMISSION_INPROGRESS state if it is marked
* as dead before decommission has completed. That will allow DN to resume
Expand Down Expand Up @@ -388,8 +469,8 @@ public void testDecommissionStatusAfterDNRestart() throws Exception {
*/
@Test(timeout=120000)
public void testDecommissionDeadDN() throws Exception {
Logger log = Logger.getLogger(DatanodeAdminManager.class);
log.setLevel(Level.DEBUG);
Logger log = LoggerFactory.getLogger(DatanodeAdminManager.class);
GenericTestUtils.setLogLevel(log, Level.DEBUG);
DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId();
String dnName = dnID.getXferAddr();
DataNodeProperties stoppedDN = cluster.stopDataNode(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public void testDecommissionStatus() throws Exception {

FSNamesystem fsn = cluster.getNamesystem();
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
verifyInitialState(fsn, dm);
for (int iteration = 0; iteration < numDatanodes; iteration++) {
String downnode = decommissionNode(client, iteration);
dm.refreshNodes(conf);
Expand Down

0 comments on commit 544dffd

Please sign in to comment.