Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11122. Support getClusterNodes API in FederationClientInterceptor #4274

Merged
merged 17 commits into from
May 15, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public final class RouterMetrics {
private MutableGaugeInt numAppAttemptsFailedRetrieved;
@Metric("# of getClusterMetrics failed to be retrieved")
private MutableGaugeInt numGetClusterMetricsFailedRetrieved;
@Metric("# of getClusterNodes failed to be retrieved")
private MutableGaugeInt numGetClusterNodesFailedRetrieved;

// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
Expand All @@ -74,7 +76,8 @@ public final class RouterMetrics {
@Metric("Total number of successful Retrieved getClusterMetrics and "
+ "latency(ms)")
private MutableRate totalSucceededGetClusterMetricsRetrieved;

@Metric("Total number of successful Retrieved getClusterNodes and latency(ms)")
private MutableRate totalSucceededGetClusterNodesRetrieved;

/**
* Provide quantile counters for all latencies.
Expand All @@ -86,6 +89,7 @@ public final class RouterMetrics {
private MutableQuantiles getApplicationsReportLatency;
private MutableQuantiles getApplicationAttemptReportLatency;
private MutableQuantiles getClusterMetricsLatency;
private MutableQuantiles getClusterNodesLatency;

private static volatile RouterMetrics INSTANCE = null;
private static MetricsRegistry registry;
Expand All @@ -112,6 +116,10 @@ private RouterMetrics() {
getClusterMetricsLatency =
registry.newQuantiles("getClusterMetricsLatency",
"latency of get cluster metrics", "ops", "latency", 10);

getClusterNodesLatency =
registry.newQuantiles("getClusterNodesLatency",
"latency of get cluster nodes", "ops", "latency", 10);
}

public static RouterMetrics getMetrics() {
Expand Down Expand Up @@ -168,6 +176,11 @@ public long getNumSucceededGetClusterMetricsRetrieved(){
return totalSucceededGetClusterMetricsRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetClusterNodesRetrieved(){
return totalSucceededGetClusterNodesRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
Expand Down Expand Up @@ -203,6 +216,11 @@ public double getLatencySucceededGetClusterMetricsRetrieved() {
return totalSucceededGetClusterMetricsRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetClusterNodesRetrieved() {
return totalSucceededGetClusterNodesRetrieved.lastStat().mean();
}

@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
Expand Down Expand Up @@ -238,6 +256,11 @@ public int getClusterMetricsFailedRetrieved() {
return numGetClusterMetricsFailedRetrieved.value();
}

@VisibleForTesting
public int getClusterNodesFailedRetrieved() {
return numGetClusterNodesFailedRetrieved.value();
}

public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
Expand Down Expand Up @@ -273,6 +296,11 @@ public void succeededGetClusterMetricsRetrieved(long duration) {
getClusterMetricsLatency.add(duration);
}

public void succeededGetClusterNodesRetrieved(long duration) {
totalSucceededGetClusterNodesRetrieved.add(duration);
getClusterNodesLatency.add(duration);
}

public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
Expand Down Expand Up @@ -301,4 +329,7 @@ public void incrGetClusterMetricsFailedRetrieved() {
numGetClusterMetricsFailedRetrieved.incr();
}

public void incrClusterNodesFailedRetrieved() {
numGetClusterNodesFailedRetrieved.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server.router.clientrm;

import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.reflect.Method;
Expand Down Expand Up @@ -791,7 +792,30 @@ <R> Map<SubClusterId, R> invokeConcurrent(Collection<SubClusterId> clusterIds,
@Override
public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null) {
routerMetrics.incrClusterNodesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getClusterNodes request.", null);
}
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClusters =
federationFacade.getSubClusters(true);
Map<SubClusterId, GetClusterNodesResponse> clusterNodes = Maps.newHashMap();
for (SubClusterId subClusterId : subClusters.keySet()) {
ApplicationClientProtocol client;
try {
client = getClientRMProxyForSubCluster(subClusterId);
GetClusterNodesResponse response = client.getClusterNodes(request);
clusterNodes.put(subClusterId, response);
} catch (Exception ex) {
routerMetrics.incrClusterNodesFailedRetrieved();
LOG.error("Unable to get cluster nodes due to exception.", ex);
throw ex;
}
}
long stopTime = clock.getTime();
routerMetrics.succeededGetClusterNodesRetrieved(stopTime - startTime);
// Merge the NodesResponse
return RouterYarnClientUtils.mergeClusterNodesResponse(clusterNodes.values());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;

import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
Expand Down Expand Up @@ -194,4 +199,23 @@ private static boolean mergeUamToReport(String appName,
return !(appName.startsWith(UnmanagedApplicationManager.APP_NAME) ||
appName.startsWith(PARTIAL_REPORT));
}

/**
* Merges a list of GetClusterNodesResponse.
*
* @param responses a list of GetClusterNodesResponse to merge.
* @return the merged GetClusterNodesResponse.
*/
public static GetClusterNodesResponse mergeClusterNodesResponse(
Collection<GetClusterNodesResponse> responses) {
GetClusterNodesResponse clusterNodesResponse = Records.newRecord(GetClusterNodesResponse.class);
List<NodeReport> nodeReports = new ArrayList<>();
for (GetClusterNodesResponse response : responses) {
if (response != null && response.getNodeReports() != null) {
nodeReports.addAll(response.getNodeReports());
}
}
clusterNodesResponse.setNodeReports(nodeReports);
return clusterNodesResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class TestRouterMetrics {

private static RouterMetrics metrics = RouterMetrics.getMetrics();

private static final Double ASSERT_DOUBLE_DELTA = 0.01;

@BeforeClass
public static void init() {

Expand Down Expand Up @@ -346,6 +348,11 @@ public void getClusterMetrics() {
LOG.info("Mocked: failed getClusterMetrics call");
metrics.incrGetClusterMetricsFailedRetrieved();
}

public void getClusterNodes() {
LOG.info("Mocked: failed getClusterNodes call");
goiri marked this conversation as resolved.
Show resolved Hide resolved
metrics.incrClusterNodesFailedRetrieved();
}
}

// Records successes for all calls
Expand Down Expand Up @@ -392,5 +399,28 @@ public void getClusterMetrics(long duration){
duration);
metrics.succeededGetClusterMetricsRetrieved(duration);
}

public void getClusterNodes(long duration) {
LOG.info("Mocked: successful getClusterNodes call with duration {}", duration);
metrics.succeededGetClusterNodesRetrieved(duration);
}
}

@Test
public void testSucceededGetClusterNodes() {
long totalGoodBefore = metrics.getNumSucceededGetClusterNodesRetrieved();
goodSubCluster.getClusterNodes(150);
Assert.assertEquals(totalGoodBefore + 1, metrics.getNumSucceededGetClusterNodesRetrieved());
Assert.assertEquals(150, metrics.getLatencySucceededGetClusterNodesRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getClusterNodes(300);
Assert.assertEquals(totalGoodBefore + 2, metrics.getNumSucceededGetClusterNodesRetrieved());
Assert.assertEquals(225, metrics.getLatencySucceededGetClusterNodesRetrieved(), ASSERT_DOUBLE_DELTA);
}

@Test
public void testGetClusterNodesFailed() {
long totalBadBefore = metrics.getClusterNodesFailedRetrieved();
badSubCluster.getClusterNodes();
Assert.assertEquals(totalBadBefore + 1, metrics.getClusterNodesFailedRetrieved());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
Expand Down Expand Up @@ -641,4 +643,16 @@ public void testGetApplicationsApplicationStateNotExists() throws Exception{
Assert.assertNotNull(responseGet);
Assert.assertTrue(responseGet.getApplicationList().isEmpty());
}

@Test
public void testGetClusterNodesRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Cluster Nodeds request");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodes request.",
() -> interceptor.getClusterNodes(null));
// normal request.
GetClusterNodesResponse response =
interceptor.getClusterNodes(GetClusterNodesRequest.newInstance());
Assert.assertEquals(subClusters.size(), response.getNodeReports().size());
}
}