Skip to content

HDFS-16521. DFS API to retrieve slow datanodes #4107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3491,4 +3491,12 @@ public void removeLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
private boolean isLocatedBlocksRefresherEnabled() {
return clientContext.isLocatedBlocksRefresherEnabled();
}

public DatanodeInfo[] slowDatanodeReport() throws IOException {
checkOpen();
try (TraceScope ignored = tracer.newScope("slowDatanodeReport")) {
return namenode.getSlowDatanodeReport();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3887,4 +3887,15 @@ public MultipartUploaderBuilder createMultipartUploader(final Path basePath)
throws IOException {
return new FileSystemMultipartUploaderBuilder(this, basePath);
}

/**
* Retrieve stats for slow running datanodes.
*
* @return An array of slow datanode info.
* @throws IOException If an I/O error occurs.
*/
public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
return dfs.slowDatanodeReport();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2318,4 +2318,14 @@ public long getUsed() throws IOException {
}
return this.vfs.getUsed();
}

@Override
public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
if (this.vfs == null) {
return super.getSlowDatanodeStats();
}
checkDefaultDFS(defaultDFS, "getSlowDatanodeStats");
return defaultDFS.getSlowDatanodeStats();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1868,4 +1868,16 @@ BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
*/
@AtMostOnce
void satisfyStoragePolicy(String path) throws IOException;

/**
* Get report on all of the slow Datanodes. Slow running datanodes are identified based on
* the Outlier detection algorithm, if slow peer tracking is enabled for the DFS cluster.
*
* @return Datanode report for slow running datanodes.
* @throws IOException If an I/O error occurs.
*/
@Idempotent
@ReadOnly
DatanodeInfo[] getSlowDatanodeReport() throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to check with every one that it is okay to have an array of objects as the return value.
I think it's fine but just want to check with every one, because once we decide the the interface it can't be changed later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought List is also fine but kept it Array to keep the API contract in line with getDatanodeReport() so that both APIs can use same underlying utility methods (e.g. getDatanodeInfoFromDescriptors() ).


}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto;
Expand Down Expand Up @@ -2065,6 +2066,18 @@ public void satisfyStoragePolicy(String src) throws IOException {
}
}

@Override
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
GetSlowDatanodeReportRequestProto req =
GetSlowDatanodeReportRequestProto.newBuilder().build();
try {
return PBHelperClient.convert(
rpcProxy.getSlowDatanodeReport(null, req).getDatanodeInfoProtoList());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}

@Override
public HAServiceProtocol.HAServiceState getHAServiceState()
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,13 @@ message GetPreferredBlockSizeResponseProto {
required uint64 bsize = 1;
}

message GetSlowDatanodeReportRequestProto {
}

message GetSlowDatanodeReportResponseProto {
repeated DatanodeInfoProto datanodeInfoProto = 1;
}

enum SafeModeActionProto {
SAFEMODE_LEAVE = 1;
SAFEMODE_ENTER = 2;
Expand Down Expand Up @@ -1070,4 +1077,6 @@ service ClientNamenodeProtocol {
returns(SatisfyStoragePolicyResponseProto);
rpc getHAServiceState(HAServiceStateRequestProto)
returns(HAServiceStateResponseProto);
rpc getSlowDatanodeReport(GetSlowDatanodeReportRequestProto)
returns(GetSlowDatanodeReportResponseProto);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public class TestReadOnly {
"getQuotaUsage",
"msync",
"getHAServiceState",
"getECTopologyResultForPolicies"
"getECTopologyResultForPolicies",
"getSlowDatanodeReport"
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1815,6 +1815,12 @@ public void satisfyStoragePolicy(String path) throws IOException {
storagePolicy.satisfyStoragePolicy(path);
}

@Override
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
return rpcServer.getSlowDatanodeReport(true, 0);
}

@Override
public HAServiceProtocol.HAServiceState getHAServiceState() {
if (rpcServer.isSafeMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1095,24 +1095,7 @@ public DatanodeInfo[] getDatanodeReport(
Map<FederationNamespaceInfo, DatanodeInfo[]> results =
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
timeOutMs, DatanodeInfo[].class);
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
results.entrySet()) {
FederationNamespaceInfo ns = entry.getKey();
DatanodeInfo[] result = entry.getValue();
for (DatanodeInfo node : result) {
String nodeId = node.getXferAddr();
DatanodeInfo dn = datanodesMap.get(nodeId);
if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
// Add the subcluster as a suffix to the network location
node.setNetworkLocation(
NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
node.getNetworkLocation());
datanodesMap.put(nodeId, node);
} else {
LOG.debug("{} is in multiple subclusters", nodeId);
}
}
}
updateDnMap(results, datanodesMap);
// Map -> Array
Collection<DatanodeInfo> datanodes = datanodesMap.values();
return toArray(datanodes, DatanodeInfo.class);
Expand Down Expand Up @@ -1578,6 +1561,11 @@ public void satisfyStoragePolicy(String path) throws IOException {
clientProto.satisfyStoragePolicy(path);
}

@Override // ClientProtocol
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
return clientProto.getSlowDatanodeReport();
}

@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
long minBlockSize, long hotBlockTimeInterval) throws IOException {
Expand Down Expand Up @@ -1994,6 +1982,53 @@ public String refreshFairnessPolicyController() {
return rpcClient.refreshFairnessPolicyController(new Configuration());
}

/**
* Get the slow running datanodes report with a timeout.
*
* @param requireResponse If we require all the namespaces to report.
* @param timeOutMs Time out for the reply in milliseconds.
* @return List of datanodes.
* @throws IOException If it cannot get the report.
*/
public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOutMs)
throws IOException {
checkOperation(OperationCategory.UNCHECKED);

Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
RemoteMethod method = new RemoteMethod("getSlowDatanodeReport");

Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, DatanodeInfo[]> results =
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
timeOutMs, DatanodeInfo[].class);
updateDnMap(results, datanodesMap);
// Map -> Array
Collection<DatanodeInfo> datanodes = datanodesMap.values();
return toArray(datanodes, DatanodeInfo.class);
}

private void updateDnMap(Map<FederationNamespaceInfo, DatanodeInfo[]> results,
Map<String, DatanodeInfo> datanodesMap) {
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
results.entrySet()) {
FederationNamespaceInfo ns = entry.getKey();
DatanodeInfo[] result = entry.getValue();
for (DatanodeInfo node : result) {
String nodeId = node.getXferAddr();
DatanodeInfo dn = datanodesMap.get(nodeId);
if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
// Add the subcluster as a suffix to the network location
node.setNetworkLocation(
NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
node.getNetworkLocation());
datanodesMap.put(nodeId, node);
} else {
LOG.debug("{} is in multiple subclusters", nodeId);
}
}
}
}

/**
* Deals with loading datanode report into the cache and refresh.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ public void testProxyGetDatanodeReport() throws Exception {

DatanodeInfo[] combinedData =
routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
assertEquals(0, routerProtocol.getSlowDatanodeReport().length);
final Map<Integer, String> routerDNMap = new TreeMap<>();
for (DatanodeInfo dn : combinedData) {
String subcluster = dn.getNetworkLocation().split("/")[1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto;
Expand Down Expand Up @@ -2058,4 +2060,18 @@ public HAServiceStateResponseProto getHAServiceState(
throw new ServiceException(e);
}
}

@Override
public GetSlowDatanodeReportResponseProto getSlowDatanodeReport(RpcController controller,
GetSlowDatanodeReportRequestProto request) throws ServiceException {
try {
List<? extends DatanodeInfoProto> result =
PBHelperClient.convert(server.getSlowDatanodeReport());
return GetSlowDatanodeReportResponseProto.newBuilder()
.addAllDatanodeInfoProto(result)
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;

import org.apache.hadoop.fs.StorageType;
Expand Down Expand Up @@ -1665,7 +1667,17 @@ public List<DatanodeDescriptor> getDatanodeListForReport(
}
return nodes;
}


public List<DatanodeDescriptor> getAllSlowDataNodes() {
if (slowPeerTracker == null) {
LOG.debug("{} is disabled. Try enabling it first to capture slow peer outliers.",
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
return ImmutableList.of();
}
List<String> slowNodes = slowPeerTracker.getSlowNodes(getNumOfDataNodes());
return getDnDescriptorsFromIpAddr(slowNodes);
}

/**
* Checks if name resolution was successful for the given address. If IP
* address and host name are the same, then it means name resolution has
Expand Down Expand Up @@ -2148,19 +2160,26 @@ public Set<String> getSlowPeersUuidSet() {
List<String> slowNodes;
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
slowNodes = slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
for (String slowNode : slowNodes) {
if (StringUtils.isBlank(slowNode)
|| !slowNode.contains(IP_PORT_SEPARATOR)) {
List<DatanodeDescriptor> datanodeDescriptors = getDnDescriptorsFromIpAddr(slowNodes);
datanodeDescriptors.forEach(
datanodeDescriptor -> slowPeersUuidSet.add(datanodeDescriptor.getDatanodeUuid()));
return slowPeersUuidSet;
}

private List<DatanodeDescriptor> getDnDescriptorsFromIpAddr(List<String> nodes) {
List<DatanodeDescriptor> datanodeDescriptors = new ArrayList<>();
for (String node : nodes) {
if (StringUtils.isBlank(node) || !node.contains(IP_PORT_SEPARATOR)) {
continue;
}
String ipAddr = slowNode.split(IP_PORT_SEPARATOR)[0];
String ipAddr = node.split(IP_PORT_SEPARATOR)[0];
DatanodeDescriptor datanodeByHost =
host2DatanodeMap.getDatanodeByHost(ipAddr);
if (datanodeByHost != null) {
slowPeersUuidSet.add(datanodeByHost.getDatanodeUuid());
datanodeDescriptors.add(datanodeByHost);
}
}
return slowPeersUuidSet;
return datanodeDescriptors;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class DataNodePeerMetrics {

private final String name;

// Strictly to be used by test code only. Source code is not supposed to use this.
private Map<String, Double> testOutlier = null;

private final OutlierDetector slowNodeDetector;

Expand Down Expand Up @@ -142,14 +144,28 @@ public void collectThreadLocalStates() {
* than their peers.
*/
public Map<String, Double> getOutliers() {
// This maps the metric name to the aggregate latency.
// The metric name is the datanode ID.
final Map<String, Double> stats =
sendPacketDownstreamRollingAverages.getStats(
minOutlierDetectionSamples);
LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);

return slowNodeDetector.getOutliers(stats);
// outlier must be null for source code.
if (testOutlier == null) {
// This maps the metric name to the aggregate latency.
// The metric name is the datanode ID.
final Map<String, Double> stats =
sendPacketDownstreamRollingAverages.getStats(minOutlierDetectionSamples);
LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
return slowNodeDetector.getOutliers(stats);
} else {
// this happens only for test code.
return testOutlier;
}
}

/**
* Strictly to be used by test code only. Source code is not supposed to use this. This method
* directly sets outlier mapping so that aggregate latency metrics are not calculated for tests.
*
* @param outlier outlier directly set by tests.
*/
public void setTestOutliers(Map<String, Double> outlier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little awkward? Add comment on the testOutlier data member that it is for test only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's very difficult to reproduce the actual slow node in UT, hence had to do this way. Sure, added comment on testOutlier member as well (in addition to this setter method Javadoc).

this.testOutlier = outlier;
}

public MutableRollingAverages getSendPacketDownstreamRollingAverages() {
Expand Down
Loading