Skip to content

Commit

Permalink
YARN-11180. Refactor some code of getNewApplication, submitApplicatio…
Browse files Browse the repository at this point in the history
…n, forceKillApplication, getApplicationReport.
  • Loading branch information
slfan1989 committed Jul 24, 2022
1 parent 8f83d9f commit bfef1dc
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,35 +227,29 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster(
ApplicationClientProtocol clientRMProxy = null;
try {
boolean serviceAuthEnabled = getConf().getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
UserGroupInformation realUser = user;
if (serviceAuthEnabled) {
realUser = UserGroupInformation.createProxyUser(
user.getShortUserName(), UserGroupInformation.getLoginUser());
user.getShortUserName(), UserGroupInformation.getLoginUser());
}
clientRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(),
ApplicationClientProtocol.class, subClusterId, realUser);
} catch (Exception e) {
RouterServerUtil.logAndThrowException(
"Unable to create the interface to reach the SubCluster "
+ subClusterId,
e);
"Unable to create the interface to reach the SubCluster " + subClusterId, e);
}

clientRMProxies.put(subClusterId, clientRMProxy);
return clientRMProxy;
}

private SubClusterId getRandomActiveSubCluster(
Map<SubClusterId, SubClusterInfo> activeSubclusters)
throws YarnException {

if (activeSubclusters == null || activeSubclusters.size() < 1) {
Map<SubClusterId, SubClusterInfo> activeSubClusters) throws YarnException {
if (activeSubClusters == null || activeSubClusters.size() < 1) {
RouterServerUtil.logAndThrowException(
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
}
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());

List<SubClusterId> list = new ArrayList<>(activeSubClusters.keySet());
return list.get(rand.nextInt(list.size()));
}

Expand All @@ -280,47 +274,50 @@ private SubClusterId getRandomActiveSubCluster(
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request) throws YarnException, IOException {

long startTime = clock.getTime();
if(request == null) {
routerMetrics.incrAppsFailedCreated();
String errMsg = "Missing getNewApplication request.";
RouterAuditLogger.logFailure(user.getShortUserName(),
RouterAuditLogger.AuditConstants.GET_NEW_APP, "UNKNOWN",
"RouterClientRMService", errMsg);
RouterServerUtil.logAndThrowException(errMsg, null);
}

long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClustersActive =
federationFacade.getSubClusters(true);

GetNewApplicationResponse response = null;

for (int i = 0; i < numSubmitRetries; ++i) {
SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
LOG.debug(
"getNewApplication try #{} on SubCluster {}", i, subClusterId);
ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
GetNewApplicationResponse response = null;
LOG.info("getNewApplication try #{} on SubCluster {}", i, subClusterId);
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
response = null;
try {
response = clientRMProxy.getNewApplication(request);
} catch (Exception e) {
LOG.warn("Unable to create a new ApplicationId in SubCluster "
+ subClusterId.getId(), e);
LOG.warn("Unable to create a new ApplicationId in SubCluster {}.", subClusterId.getId(), e);
subClustersActive.remove(subClusterId);
}

if (response != null) {

long stopTime = clock.getTime();
routerMetrics.succeededAppsCreated(stopTime - startTime);
RouterAuditLogger.logSuccess(user.getShortUserName(),
RouterAuditLogger.AuditConstants.GET_NEW_APP,
"RouterClientRMService", response.getApplicationId());
return response;
} else {
// Empty response from the ResourceManager.
// Blacklist this subcluster for this request.
subClustersActive.remove(subClusterId);
}

}

routerMetrics.incrAppsFailedCreated();
String errMsg = "Fail to create a new application.";
String errMsg = "Failed to create a new application.";
RouterAuditLogger.logFailure(user.getShortUserName(),
RouterAuditLogger.AuditConstants.GET_NEW_APP, "UNKNOWN",
"RouterClientRMService", errMsg);
throw new YarnException(errMsg);
RouterServerUtil.logAndThrowException(errMsg, null);
return response;
}

/**
Expand Down Expand Up @@ -392,32 +389,31 @@ public GetNewApplicationResponse getNewApplication(
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException, IOException {

long startTime = clock.getTime();

if (request == null || request.getApplicationSubmissionContext() == null
|| request.getApplicationSubmissionContext()
.getApplicationId() == null) {
|| request.getApplicationSubmissionContext().getApplicationId() == null) {
routerMetrics.incrAppsFailedSubmitted();
String errMsg =
"Missing submitApplication request or applicationSubmissionContext "
+ "information.";
"Missing submitApplication request or applicationSubmissionContext information.";
RouterAuditLogger.logFailure(user.getShortUserName(),
RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN",
"RouterClientRMService", errMsg);
throw new YarnException(errMsg);
RouterServerUtil.logAndThrowException(errMsg, null);
}

long startTime = clock.getTime();

ApplicationId applicationId =
request.getApplicationSubmissionContext().getApplicationId();

List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
List<SubClusterId> blacklist = new ArrayList<>();

for (int i = 0; i < numSubmitRetries; ++i) {

SubClusterId subClusterId = policyFacade.getHomeSubcluster(
request.getApplicationSubmissionContext(), blacklist);
LOG.info("submitApplication appId {} try #{} on SubCluster {}.", applicationId, i,
subClusterId);

LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
applicationId, i, subClusterId);

ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
Expand All @@ -430,32 +426,34 @@ public SubmitApplicationResponse submitApplication(
federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) {
routerMetrics.incrAppsFailedSubmitted();
String message = "Unable to insert the ApplicationId " + applicationId
+ " into the FederationStateStore";
String message =
String.format("Unable to insert the ApplicationId %s into the FederationStateStore.",
applicationId);
RouterAuditLogger.logFailure(user.getShortUserName(),
RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN",
"RouterClientRMService", message, applicationId, subClusterId);
throw new YarnException(message, e);
RouterServerUtil.logAndThrowException(message, e);
}
} else {
try {
// update the mapping of applicationId and the home subClusterId to
// the new subClusterId we have selected
federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) {
String message = "Unable to update the ApplicationId " + applicationId
+ " into the FederationStateStore";
String message =
String.format("Unable to update the ApplicationId %s into the FederationStateStore.",
applicationId);
SubClusterId subClusterIdInStateStore =
federationFacade.getApplicationHomeSubCluster(applicationId);
if (subClusterId == subClusterIdInStateStore) {
LOG.info("Application {} already submitted on SubCluster {}.", applicationId,
subClusterId);
LOG.info("Application {} already submitted on SubCluster {}.",
applicationId, subClusterId);
} else {
routerMetrics.incrAppsFailedSubmitted();
RouterAuditLogger.logFailure(user.getShortUserName(),
RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN",
"RouterClientRMService", message, applicationId, subClusterId);
throw new YarnException(message, e);
RouterServerUtil.logAndThrowException(message, e);
}
}
}
Expand Down Expand Up @@ -489,9 +487,8 @@ public SubmitApplicationResponse submitApplication(
}

routerMetrics.incrAppsFailedSubmitted();
String errMsg = "Application "
+ request.getApplicationSubmissionContext().getApplicationName()
+ " with appId " + applicationId + " failed to be submitted.";
String errMsg = String.format("Application %s with appId %s failed to be submitted.",
request.getApplicationSubmissionContext().getApplicationName(), applicationId);
RouterAuditLogger.logFailure(user.getShortUserName(),
RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN",
"RouterClientRMService", errMsg, applicationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void setUp() {
interceptor.setConf(this.getConf());
interceptor.init(user);

subClusters = new ArrayList<SubClusterId>();
subClusters = new ArrayList<>();

try {
for (int i = 0; i < NUM_SUBCLUSTER; i++) {
Expand Down Expand Up @@ -201,8 +201,7 @@ protected YarnConfiguration createConfiguration() {
* ApplicationId has to belong to one of the SubCluster in the cluster.
*/
@Test
public void testGetNewApplication()
throws YarnException, IOException, InterruptedException {
public void testGetNewApplication() throws YarnException, IOException {
LOG.info("Test FederationClientInterceptor: Get New Application");

GetNewApplicationRequest request = GetNewApplicationRequest.newInstance();
Expand Down

0 comments on commit bfef1dc

Please sign in to comment.