Skip to content

HDFS-16457.Make fs.getspaceused.classname reconfigurable #4069

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Apr 8, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_CLASSNAME;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
Expand Down Expand Up @@ -87,6 +88,9 @@

import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.GetSpaceUsed;
import org.apache.hadoop.fs.WindowsGetSpaceUsed;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;

import java.io.BufferedOutputStream;
Expand Down Expand Up @@ -349,7 +353,8 @@ public class DataNode extends ReconfigurableBase
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY,
FS_DU_INTERVAL_KEY,
FS_GETSPACEUSED_JITTER_KEY));
FS_GETSPACEUSED_JITTER_KEY,
FS_GETSPACEUSED_CLASSNAME));

public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");

Expand Down Expand Up @@ -683,6 +688,7 @@ public String reconfigurePropertyImpl(String property, String newVal)
return reconfSlowDiskParameters(property, newVal);
case FS_DU_INTERVAL_KEY:
case FS_GETSPACEUSED_JITTER_KEY:
case FS_GETSPACEUSED_CLASSNAME:
return reconfDfsUsageParameters(property, newVal);
default:
break;
Expand Down Expand Up @@ -879,7 +885,7 @@ private String reconfDfsUsageParameters(String property, String newVal)
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (BlockPoolSlice value : blockPoolSlices.values()) {
value.updateDfsUsageConfig(interval, null);
value.updateDfsUsageConfig(interval, null, null);
}
}
} else if (property.equals(FS_GETSPACEUSED_JITTER_KEY)) {
Expand All @@ -891,13 +897,33 @@ private String reconfDfsUsageParameters(String property, String newVal)
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (BlockPoolSlice value : blockPoolSlices.values()) {
value.updateDfsUsageConfig(null, jitter);
value.updateDfsUsageConfig(null, jitter, null);
}
}
} else if (property.equals(FS_GETSPACEUSED_CLASSNAME)) {
Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized.");
Class<? extends GetSpaceUsed> klass;
if (newVal == null) {
if (Shell.WINDOWS) {
klass = DU.class;
} else {
klass = WindowsGetSpaceUsed.class;
}
} else {
klass = Class.forName(newVal).asSubclass(GetSpaceUsed.class);
}
result = klass.getName();
List<FsVolumeImpl> volumeList = data.getVolumeList();
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (BlockPoolSlice value : blockPoolSlices.values()) {
value.updateDfsUsageConfig(null, null, klass);
}
}
}
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
return result;
} catch (IllegalArgumentException | IOException e) {
} catch (IllegalArgumentException | IOException | ClassNotFoundException e) {
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_CLASSNAME;

/**
* A block pool slice represents a portion of a block pool stored on a volume.
Expand Down Expand Up @@ -240,7 +241,8 @@ public void run() {
SHUTDOWN_HOOK_PRIORITY);
}

public void updateDfsUsageConfig(Long interval, Long jitter) throws IOException {
public void updateDfsUsageConfig(Long interval, Long jitter, Class<? extends GetSpaceUsed> klass)
throws IOException {
// Close the old dfsUsage if it is CachingGetSpaceUsed.
if (dfsUsage instanceof CachingGetSpaceUsed) {
((CachingGetSpaceUsed) dfsUsage).close();
Expand All @@ -255,6 +257,10 @@ public void updateDfsUsageConfig(Long interval, Long jitter) throws IOException
FS_GETSPACEUSED_JITTER_KEY + " should be larger than or equal to 0");
config.setLong(FS_GETSPACEUSED_JITTER_KEY, jitter);
}

if (klass != null) {
config.setClass(FS_GETSPACEUSED_CLASSNAME, klass, CachingGetSpaceUsed.class);
}
// Start new dfsUsage.
this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid)
.setVolume(volume)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_CLASSNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
Expand Down Expand Up @@ -86,6 +87,7 @@ public class TestDataNodeReconfiguration {
private final int NUM_NAME_NODE = 1;
private final int NUM_DATA_NODE = 10;
private MiniDFSCluster cluster;
private static long counter = 0;

@Before
public void Setup() throws IOException {
Expand Down Expand Up @@ -756,4 +758,33 @@ public void testDfsUsageParameters() throws ReconfigurationException {
}
}
}

public static class DummyCachingGetSpaceUsed extends CachingGetSpaceUsed {
public DummyCachingGetSpaceUsed(Builder builder) throws IOException {
super(builder.setInterval(1000).setJitter(0L));
}

@Override
protected void refresh() {
counter++;
}
}

@Test
public void testDfsUsageKlass() throws ReconfigurationException, InterruptedException {

long lastCounter = counter;
Thread.sleep(5000);
assertEquals(lastCounter, counter);

for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);
dn.reconfigurePropertyImpl(FS_GETSPACEUSED_CLASSNAME,
DummyCachingGetSpaceUsed.class.getName());
}

lastCounter = counter;
Thread.sleep(5000);
assertTrue(counter > lastCounter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("datanode", address, outs, errs);
assertEquals(18, outs.size());
assertEquals(19, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
}

Expand Down