Skip to content

Commit

Permalink
HDFS-16484. [SPS]: Fix an infinite loop bug in SPSPathIdProcessor thr…
Browse files Browse the repository at this point in the history
…ead (#4032)

(cherry picked from commit 4539443)
  • Loading branch information
liubingxing authored and tasanuma committed Apr 13, 2022
1 parent 2d2631a commit cb14e8d
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode.sps;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -228,15 +229,18 @@ public synchronized void clearQueuesWithNotification() {
* ID's to process for satisfy the policy.
*/
private class SPSPathIdProcessor implements Runnable {
private static final int MAX_RETRY_COUNT = 3;

@Override
public void run() {
LOG.info("Starting SPSPathIdProcessor!.");
Long startINode = null;
int retryCount = 0;
while (ctxt.isRunning()) {
try {
if (!ctxt.isInSafeMode()) {
if (startINode == null) {
retryCount = 0;
startINode = ctxt.getNextSPSPath();
} // else same id will be retried
if (startINode == null) {
Expand All @@ -249,7 +253,12 @@ public void run() {
pendingWorkForDirectory.get(startINode);
if (dirPendingWorkInfo != null
&& dirPendingWorkInfo.isDirWorkDone()) {
ctxt.removeSPSHint(startINode);
try {
ctxt.removeSPSHint(startINode);
} catch (FileNotFoundException e) {
// ignore if the file doesn't already exist
startINode = null;
}
pendingWorkForDirectory.remove(startINode);
}
}
Expand All @@ -269,6 +278,11 @@ public void run() {
LOG.info("Interrupted while waiting in SPSPathIdProcessor", t);
break;
}
retryCount++;
if (retryCount >= MAX_RETRY_COUNT) {
LOG.warn("Skipping this inode {} due to too many retries.", startINode);
startINode = null;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,20 @@ private void shutdownCluster() {
}
}

private void stopExternalSps() {
if (externalSps != null) {
externalSps.stopGracefully();
}
}

private void startExternalSps() {
externalSps = new StoragePolicySatisfier(getConf());
externalCtxt = new ExternalSPSContext(externalSps, nnc);

externalSps.init(externalCtxt);
externalSps.start(StoragePolicySatisfierMode.EXTERNAL);
}

private void createCluster() throws IOException {
getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES,
Expand Down Expand Up @@ -1341,6 +1355,45 @@ public void testSPSWithDirectoryTreeWithoutFile() throws Exception {
}
}

/**
* Test SPS that satisfy the files and then delete the files before start SPS.
*/
@Test(timeout = 300000)
public void testSPSSatisfyAndThenDeleteFileBeforeStartSPS() throws Exception {
try {
createCluster();
HdfsAdmin hdfsAdmin =
new HdfsAdmin(FileSystem.getDefaultUri(config), config);

StorageType[][] newtypes =
new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE}};
startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);

stopExternalSps();

dfs.setStoragePolicy(new Path(FILE), COLD);
hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
dfs.delete(new Path(FILE), true);

startExternalSps();

String file1 = "/testMoveToSatisfyStoragePolicy_1";
writeContent(file1);
dfs.setStoragePolicy(new Path(file1), COLD);
hdfsAdmin.satisfyStoragePolicy(new Path(file1));

hdfsCluster.triggerHeartbeats();
DFSTestUtil.waitExpectedStorageType(file1, StorageType.ARCHIVE, 3, 30000,
dfs);
} finally {
shutdownCluster();
}
}


/**
* Test SPS for directory which has multilevel directories.
*/
Expand Down

0 comments on commit cb14e8d

Please sign in to comment.