Skip to content

HDFS-16917 Add transfer rate quantile metrics for DataNode reads #5397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
|:---- |:---- |
| `BytesWritten` | Total number of bytes written to DataNode |
| `BytesRead` | Total number of bytes read from DataNode |
| `ReadTransferRateNumOps` | Total number of data read transfers |
| `ReadTransferRateAvgTime` | Average transfer rate of bytes read from DataNode, measured in bytes per second. |
| `ReadTransferRate`*num*`s(50/75/90/95/99)thPercentileRate` | The 50/75/90/95/99th percentile of the transfer rate of bytes read from DataNode, measured in bytes per second. |
| `BlocksWritten` | Total number of blocks written to DataNode |
| `BlocksRead` | Total number of blocks read from DataNode |
| `BlocksReplicated` | Total number of blocks replicated |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
Expand Down Expand Up @@ -1936,4 +1937,18 @@ public static boolean isParentEntry(final String path, final String parent) {
return path.charAt(parent.length()) == Path.SEPARATOR_CHAR
|| parent.equals(Path.SEPARATOR);
}

/**
* Add transfer rate metrics for valid data read and duration values.
* @param metrics metrics for datanodes
* @param read bytes read
* @param duration read duration
*/
public static void addTransferRateMetric(final DataNodeMetrics metrics, final long read, final long duration) {
if (read >= 0 && duration > 0) {
metrics.addReadTransferRate(read * 1000 / duration);
} else {
LOG.warn("Unexpected value for data transfer bytes={} duration={}", read, duration);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
import javax.crypto.SecretKey;
Expand Down Expand Up @@ -632,6 +633,7 @@ public void readBlock(final ExtendedBlock block,
datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead();
datanode.metrics.incrTotalReadTime(duration);
DFSUtil.addTransferRateMetric(datanode.metrics, read, duration);
} catch ( SocketException ignored ) {
LOG.trace("{}:Ignoring exception while serving {} to {}",
dnR, block, remoteAddress, ignored);
Expand Down Expand Up @@ -1122,6 +1124,7 @@ public void copyBlock(final ExtendedBlock block,
datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead();
datanode.metrics.incrTotalReadTime(duration);
DFSUtil.addTransferRateMetric(datanode.metrics, read, duration);

LOG.info("Copied {} to {}", block, peer.getRemoteAddressString());
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class DataNodeMetrics {
@Metric MutableCounterLong bytesRead;
@Metric("Milliseconds spent reading")
MutableCounterLong totalReadTime;
@Metric private MutableRate readTransferRate;
final private MutableQuantiles[] readTransferRateQuantiles;
@Metric MutableCounterLong blocksWritten;
@Metric MutableCounterLong blocksRead;
@Metric MutableCounterLong blocksReplicated;
Expand Down Expand Up @@ -227,6 +229,7 @@ public DataNodeMetrics(String name, String sessionId, int[] intervals,
sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
readTransferRateQuantiles = new MutableQuantiles[len];

for (int i = 0; i < len; i++) {
int interval = intervals[i];
Expand Down Expand Up @@ -255,6 +258,10 @@ public DataNodeMetrics(String name, String sessionId, int[] intervals,
"ramDiskBlocksLazyPersistWindows" + interval + "s",
"Time between the RamDisk block write and disk persist in ms",
"ops", "latency", interval);
readTransferRateQuantiles[i] = registry.newQuantiles(
"readTransferRate" + interval + "s",
"Rate at which bytes are read from datanode calculated in bytes per second",
"ops", "rate", interval);
}
}

Expand Down Expand Up @@ -316,6 +323,13 @@ public void addIncrementalBlockReport(long latency,
}
}

public void addReadTransferRate(long readTransferRate) {
this.readTransferRate.add(readTransferRate);
for (MutableQuantiles q : readTransferRateQuantiles) {
q.add(readTransferRate);
}
}

public void addCacheReport(long latency) {
cacheReports.add(latency);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;

import java.io.File;
import java.io.IOException;
Expand All @@ -71,6 +72,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.http.HttpConfig;
Expand Down Expand Up @@ -1108,4 +1110,18 @@ public void testErrorMessageForInvalidNameservice() throws Exception {
LambdaTestUtils.intercept(IOException.class, expectedErrorMessage,
()->DFSUtil.getNNServiceRpcAddressesForCluster(conf));
}

@Test
public void testAddTransferRateMetricForValidValues() {
DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
DFSUtil.addTransferRateMetric(mockMetrics, 100, 10);
verify(mockMetrics).addReadTransferRate(10000);
}

@Test
public void testAddTransferRateMetricForInvalidValue() {
DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
DFSUtil.addTransferRateMetric(mockMetrics, 100, 0);
verify(mockMetrics, times(0)).addReadTransferRate(anyLong());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ public void testTimeoutMetric() throws Exception {
@Test(timeout=120000)
public void testDataNodeTimeSpend() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + 60);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
final FileSystem fs = cluster.getFileSystem();
Expand All @@ -391,6 +392,7 @@ public void testDataNodeTimeSpend() throws Exception {

final long startWriteValue = getLongCounter("TotalWriteTime", rb);
final long startReadValue = getLongCounter("TotalReadTime", rb);
assertCounter("ReadTransferRateNumOps", 0L, rb);
final AtomicInteger x = new AtomicInteger(0);

// Lets Metric system update latest metrics
Expand All @@ -410,6 +412,8 @@ public Boolean get() {
MetricsRecordBuilder rbNew = getMetrics(datanode.getMetrics().name());
final long endWriteValue = getLongCounter("TotalWriteTime", rbNew);
final long endReadValue = getLongCounter("TotalReadTime", rbNew);
assertCounter("ReadTransferRateNumOps", 1L, rbNew);
assertQuantileGauges("ReadTransferRate" + "60s", rbNew, "Rate");
return endWriteValue > startWriteValue
&& endReadValue > startReadValue;
}
Expand Down