diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 35875095b946cb..756b273c8ac1f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1102,7 +1102,8 @@ public PipelineAck.ECN getECN() { double load = ManagementFactory.getOperatingSystemMXBean() .getSystemLoadAverage(); double threshold = NUM_CORES * congestionRatio; - if (load > threshold) { + + if (load > threshold || DataNodeFaultInjector.get().mockCongestedForTest()) { metrics.incrCongestedCount(); } return load > threshold ? PipelineAck.ECN.CONGESTED : diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 7b116d9e566f3c..4c022535118bd2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -162,4 +162,8 @@ public void markSlow(String dnAddr, int[] replies) {} * Just delay delete replica a while. */ public void delayDeleteReplica() {} + + public boolean mockCongestedForTest() { + return false; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index 3a0b5238360a23..3b46c29330f2b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -42,9 +42,11 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.util.Lists; +import org.junit.Assert; import org.junit.Assume; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -816,4 +818,30 @@ public Boolean get() { }, 100, 10000); } } + + @Test + public void testCongestedCount() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED, true); + MiniDFSCluster cluster = null; + DataNodeFaultInjector old = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + old = DataNodeFaultInjector.get(); + DataNodeFaultInjector.set(new DataNodeFaultInjector(){ + @Override + public boolean mockCongestedForTest() { + return true; + } + }); + PipelineAck.ECN ecn = cluster.getDataNodes().get(0).getECN(); + MetricsRecordBuilder dnMetrics = getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); + Assert.assertEquals(1L, getLongCounter("CongestedCount", dnMetrics)); + } finally { + if (cluster != null) { + DataNodeFaultInjector.set(old); + cluster.shutdown(); + } + } + } }