Skip to content

Commit

Permalink
YARN-10829. Support getApplications API in FederationClientInterceptor (
Browse files Browse the repository at this point in the history
#3135)

YARN-10829. Support getApplications API in FederationClientInterceptor (#3135)
  • Loading branch information
akshatb1 authored Jul 23, 2021
1 parent 3a52bfc commit aa1a5dd
Show file tree
Hide file tree
Showing 7 changed files with 438 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.yarn.api.protocolrecords;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.apache.hadoop.classification.InterfaceAudience.Private;
Expand Down Expand Up @@ -52,6 +54,16 @@ public static GetApplicationsResponse newInstance(
return response;
}

@Private
@Unstable
public static GetApplicationsResponse newInstance(
Collection<ApplicationReport> applications) {
GetApplicationsResponse response =
Records.newRecord(GetApplicationsResponse.class);
response.setApplicationList(new ArrayList<>(applications));
return response;
}

/**
* Get <code>ApplicationReport</code> for applications.
* @return <code>ApplicationReport</code> for applications
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3991,6 +3991,15 @@ public static boolean isAclEnabled(Configuration conf) {
ROUTER_PREFIX + "submit.retry";
public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3;

/**
* The interceptor class used in FederationClientInterceptor should return
* partial ApplicationReports.
*/
public static final String ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED =
ROUTER_PREFIX + "partial-result.enabled";
public static final boolean DEFAULT_ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED =
false;

public static final String ROUTER_WEBAPP_PREFIX = ROUTER_PREFIX + "webapp.";

public static final String ROUTER_USER_CLIENT_THREADS_SIZE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ public void initializeMemberVariables() {

configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY);
configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED);
configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
configurationPrefixToSkipCompare
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand Down Expand Up @@ -161,6 +162,7 @@ public class FederationClientInterceptor
private RouterMetrics routerMetrics;
private ThreadPoolExecutor executorService;
private final Clock clock = new MonotonicClock();
private boolean returnPartialReport;

@Override
public void init(String userName) {
Expand Down Expand Up @@ -196,6 +198,10 @@ public void init(String userName) {
clientRMProxies =
new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
routerMetrics = RouterMetrics.getMetrics();

returnPartialReport = conf.getBoolean(
YarnConfiguration.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED);
}

@Override
Expand Down Expand Up @@ -599,10 +605,44 @@ public GetApplicationReportResponse getApplicationReport(
return response;
}

/**
* The Yarn Router will forward the request to all the Yarn RMs in parallel,
* after that it will group all the ApplicationReports by the ApplicationId.
*
* Possible failure:
*
* Client: identical behavior as {@code ClientRMService}.
*
* Router: the Client will timeout and resubmit the request.
*
* ResourceManager: the Router calls each Yarn RM in parallel. In case a
* Yarn RM fails, a single call will timeout. However the Router will
* merge the ApplicationReports it got, and provides a partial list to
* the client.
*
* State Store: the Router will timeout and it will retry depending on the
* FederationFacade settings - if the failure happened before the select
* operation.
*/
@Override
public GetApplicationsResponse getApplications(GetApplicationsRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null) {
RouterServerUtil.logAndThrowException(
"Missing getApplications request.",
null);
}
Map<SubClusterId, SubClusterInfo> subclusters =
federationFacade.getSubClusters(true);
ClientMethod remoteMethod = new ClientMethod("getApplications",
new Class[] {GetApplicationsRequest.class}, new Object[] {request});
Map<SubClusterId, GetApplicationsResponse> applications =
invokeConcurrent(subclusters.keySet(), remoteMethod,
GetApplicationsResponse.class);

// Merge the Application Reports
return RouterYarnClientUtils.mergeApplications(applications.values(),
returnPartialReport);
}

@Override
Expand Down Expand Up @@ -676,6 +716,12 @@ public Object call() throws Exception {
return results;
}

<R> Map<SubClusterId, R> invokeConcurrent(Collection<SubClusterId> clusterIds,
ClientMethod request, Class<R> clazz) throws YarnException, IOException {
ArrayList<SubClusterId> clusterIdList = new ArrayList<>(clusterIds);
return invokeConcurrent(clusterIdList, request, clazz);
}

@Override
public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
throws YarnException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,25 @@
package org.apache.hadoop.yarn.server.router.clientrm;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
* Util class for Router Yarn client API calls.
*/
public final class RouterYarnClientUtils {

private final static String PARTIAL_REPORT = "Partial Report ";

private RouterYarnClientUtils() {

}
Expand All @@ -52,4 +63,130 @@ public static GetClusterMetricsResponse merge(
}
return GetClusterMetricsResponse.newInstance(tmp);
}

/**
* Merges a list of ApplicationReports grouping by ApplicationId.
* Our current policy is to merge the application reports from the reachable
* SubClusters.
* @param responses a list of ApplicationResponse to merge
* @param returnPartialResult if the merge ApplicationReports should contain
* partial result or not
* @return the merged ApplicationsResponse
*/
public static GetApplicationsResponse mergeApplications(
Collection<GetApplicationsResponse> responses,
boolean returnPartialResult){
Map<ApplicationId, ApplicationReport> federationAM = new HashMap<>();
Map<ApplicationId, ApplicationReport> federationUAMSum = new HashMap<>();

for (GetApplicationsResponse appResponse : responses){
for (ApplicationReport appReport : appResponse.getApplicationList()){
ApplicationId appId = appReport.getApplicationId();
// Check if this ApplicationReport is an AM
if (!appReport.isUnmanagedApp()) {
// Insert in the list of AM
federationAM.put(appId, appReport);
// Check if there are any UAM found before
if (federationUAMSum.containsKey(appId)) {
// Merge the current AM with the found UAM
mergeAMWithUAM(appReport, federationUAMSum.get(appId));
// Remove the sum of the UAMs
federationUAMSum.remove(appId);
}
// This ApplicationReport is an UAM
} else if (federationAM.containsKey(appId)) {
// Merge the current UAM with its own AM
mergeAMWithUAM(federationAM.get(appId), appReport);
} else if (federationUAMSum.containsKey(appId)) {
// Merge the current UAM with its own UAM and update the list of UAM
ApplicationReport mergedUAMReport =
mergeUAMWithUAM(federationUAMSum.get(appId), appReport);
federationUAMSum.put(appId, mergedUAMReport);
} else {
// Insert in the list of UAM
federationUAMSum.put(appId, appReport);
}
}
}
// Check the remaining UAMs are depending or not from federation
for (ApplicationReport appReport : federationUAMSum.values()) {
if (mergeUamToReport(appReport.getName(), returnPartialResult)) {
federationAM.put(appReport.getApplicationId(), appReport);
}
}

return GetApplicationsResponse.newInstance(federationAM.values());
}

private static ApplicationReport mergeUAMWithUAM(ApplicationReport uam1,
ApplicationReport uam2){
uam1.setName(PARTIAL_REPORT + uam1.getApplicationId());
mergeAMWithUAM(uam1, uam2);
return uam1;
}

private static void mergeAMWithUAM(ApplicationReport am,
ApplicationReport uam){
ApplicationResourceUsageReport amResourceReport =
am.getApplicationResourceUsageReport();

ApplicationResourceUsageReport uamResourceReport =
uam.getApplicationResourceUsageReport();

amResourceReport.setNumUsedContainers(
amResourceReport.getNumUsedContainers() +
uamResourceReport.getNumUsedContainers());

amResourceReport.setNumReservedContainers(
amResourceReport.getNumReservedContainers() +
uamResourceReport.getNumReservedContainers());

amResourceReport.setUsedResources(Resources.add(
amResourceReport.getUsedResources(),
uamResourceReport.getUsedResources()));

amResourceReport.setReservedResources(Resources.add(
amResourceReport.getReservedResources(),
uamResourceReport.getReservedResources()));

amResourceReport.setNeededResources(Resources.add(
amResourceReport.getNeededResources(),
uamResourceReport.getNeededResources()));

amResourceReport.setMemorySeconds(
amResourceReport.getMemorySeconds() +
uamResourceReport.getMemorySeconds());

amResourceReport.setVcoreSeconds(
amResourceReport.getVcoreSeconds() +
uamResourceReport.getVcoreSeconds());

amResourceReport.setQueueUsagePercentage(
amResourceReport.getQueueUsagePercentage() +
uamResourceReport.getQueueUsagePercentage());

amResourceReport.setClusterUsagePercentage(
amResourceReport.getClusterUsagePercentage() +
uamResourceReport.getClusterUsagePercentage());

am.setApplicationResourceUsageReport(amResourceReport);
}

/**
* Returns whether or not to add an unmanaged application to the report.
* @param appName Application Name
* @param returnPartialResult if the merge ApplicationReports should contain
* partial result or not
*/
private static boolean mergeUamToReport(String appName,
boolean returnPartialResult){
if (returnPartialResult) {
return true;
}
if (appName == null) {
return false;
}
return !(appName.startsWith(UnmanagedApplicationManager.APP_NAME) ||
appName.startsWith(PARTIAL_REPORT));
}
}
Loading

0 comments on commit aa1a5dd

Please sign in to comment.