From bfef1dc1ac9adbedd64de3e7cfe92889d263401c Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sat, 23 Jul 2022 19:55:10 -0700 Subject: [PATCH] YARN-11180. Refactor some code of getNewApplication, submitApplication, forceKillApplication, getApplicationReport. --- .../clientrm/FederationClientInterceptor.java | 95 +++++++++---------- .../TestFederationClientInterceptor.java | 5 +- 2 files changed, 48 insertions(+), 52 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 45cec6415027c..76ca3e035378b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -227,35 +227,29 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster( ApplicationClientProtocol clientRMProxy = null; try { boolean serviceAuthEnabled = getConf().getBoolean( - CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); UserGroupInformation realUser = user; if (serviceAuthEnabled) { realUser = UserGroupInformation.createProxyUser( - user.getShortUserName(), UserGroupInformation.getLoginUser()); + user.getShortUserName(), UserGroupInformation.getLoginUser()); } clientRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(), ApplicationClientProtocol.class, subClusterId, realUser); } catch (Exception e) { RouterServerUtil.logAndThrowException( - "Unable to create the interface to reach the SubCluster " - + subClusterId, - e); + "Unable to create the interface to reach the SubCluster " + subClusterId, e); } - clientRMProxies.put(subClusterId, clientRMProxy); return clientRMProxy; } private SubClusterId getRandomActiveSubCluster( - Map activeSubclusters) - throws YarnException { - - if (activeSubclusters == null || activeSubclusters.size() < 1) { + Map activeSubClusters) throws YarnException { + if (activeSubClusters == null || activeSubClusters.size() < 1) { RouterServerUtil.logAndThrowException( FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); } - List list = new ArrayList<>(activeSubclusters.keySet()); - + List list = new ArrayList<>(activeSubClusters.keySet()); return list.get(rand.nextInt(list.size())); } @@ -280,47 +274,50 @@ private SubClusterId getRandomActiveSubCluster( public GetNewApplicationResponse getNewApplication( GetNewApplicationRequest request) throws YarnException, IOException { - long startTime = clock.getTime(); + if(request == null) { + routerMetrics.incrAppsFailedCreated(); + String errMsg = "Missing getNewApplication request."; + RouterAuditLogger.logFailure(user.getShortUserName(), + RouterAuditLogger.AuditConstants.GET_NEW_APP, "UNKNOWN", + "RouterClientRMService", errMsg); + RouterServerUtil.logAndThrowException(errMsg, null); + } + long startTime = clock.getTime(); Map subClustersActive = federationFacade.getSubClusters(true); + GetNewApplicationResponse response = null; + for (int i = 0; i < numSubmitRetries; ++i) { SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); - LOG.debug( - "getNewApplication try #{} on SubCluster {}", i, subClusterId); - ApplicationClientProtocol clientRMProxy = - getClientRMProxyForSubCluster(subClusterId); - GetNewApplicationResponse response = null; + LOG.info("getNewApplication try #{} on SubCluster {}", i, subClusterId); + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + response = null; try { response = clientRMProxy.getNewApplication(request); } catch (Exception e) { - LOG.warn("Unable to create a new ApplicationId in SubCluster " - + subClusterId.getId(), e); + LOG.warn("Unable to create a new ApplicationId in SubCluster {}.", subClusterId.getId(), e); + subClustersActive.remove(subClusterId); } if (response != null) { - long stopTime = clock.getTime(); routerMetrics.succeededAppsCreated(stopTime - startTime); RouterAuditLogger.logSuccess(user.getShortUserName(), RouterAuditLogger.AuditConstants.GET_NEW_APP, "RouterClientRMService", response.getApplicationId()); return response; - } else { - // Empty response from the ResourceManager. - // Blacklist this subcluster for this request. - subClustersActive.remove(subClusterId); } - } routerMetrics.incrAppsFailedCreated(); - String errMsg = "Fail to create a new application."; + String errMsg = "Failed to create a new application."; RouterAuditLogger.logFailure(user.getShortUserName(), RouterAuditLogger.AuditConstants.GET_NEW_APP, "UNKNOWN", "RouterClientRMService", errMsg); - throw new YarnException(errMsg); + RouterServerUtil.logAndThrowException(errMsg, null); + return response; } /** @@ -392,32 +389,31 @@ public GetNewApplicationResponse getNewApplication( public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException, IOException { - long startTime = clock.getTime(); - if (request == null || request.getApplicationSubmissionContext() == null - || request.getApplicationSubmissionContext() - .getApplicationId() == null) { + || request.getApplicationSubmissionContext().getApplicationId() == null) { routerMetrics.incrAppsFailedSubmitted(); String errMsg = - "Missing submitApplication request or applicationSubmissionContext " - + "information."; + "Missing submitApplication request or applicationSubmissionContext information."; RouterAuditLogger.logFailure(user.getShortUserName(), RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", "RouterClientRMService", errMsg); - throw new YarnException(errMsg); + RouterServerUtil.logAndThrowException(errMsg, null); } + long startTime = clock.getTime(); + ApplicationId applicationId = request.getApplicationSubmissionContext().getApplicationId(); - List blacklist = new ArrayList(); + List blacklist = new ArrayList<>(); for (int i = 0; i < numSubmitRetries; ++i) { SubClusterId subClusterId = policyFacade.getHomeSubcluster( request.getApplicationSubmissionContext(), blacklist); - LOG.info("submitApplication appId {} try #{} on SubCluster {}.", applicationId, i, - subClusterId); + + LOG.info("submitApplication appId {} try #{} on SubCluster {}.", + applicationId, i, subClusterId); ApplicationHomeSubCluster appHomeSubCluster = ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); @@ -430,12 +426,13 @@ public SubmitApplicationResponse submitApplication( federationFacade.addApplicationHomeSubCluster(appHomeSubCluster); } catch (YarnException e) { routerMetrics.incrAppsFailedSubmitted(); - String message = "Unable to insert the ApplicationId " + applicationId - + " into the FederationStateStore"; + String message = + String.format("Unable to insert the ApplicationId %s into the FederationStateStore.", + applicationId); RouterAuditLogger.logFailure(user.getShortUserName(), RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", "RouterClientRMService", message, applicationId, subClusterId); - throw new YarnException(message, e); + RouterServerUtil.logAndThrowException(message, e); } } else { try { @@ -443,19 +440,20 @@ public SubmitApplicationResponse submitApplication( // the new subClusterId we have selected federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster); } catch (YarnException e) { - String message = "Unable to update the ApplicationId " + applicationId - + " into the FederationStateStore"; + String message = + String.format("Unable to update the ApplicationId %s into the FederationStateStore.", + applicationId); SubClusterId subClusterIdInStateStore = federationFacade.getApplicationHomeSubCluster(applicationId); if (subClusterId == subClusterIdInStateStore) { - LOG.info("Application {} already submitted on SubCluster {}.", applicationId, - subClusterId); + LOG.info("Application {} already submitted on SubCluster {}.", + applicationId, subClusterId); } else { routerMetrics.incrAppsFailedSubmitted(); RouterAuditLogger.logFailure(user.getShortUserName(), RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", "RouterClientRMService", message, applicationId, subClusterId); - throw new YarnException(message, e); + RouterServerUtil.logAndThrowException(message, e); } } } @@ -489,9 +487,8 @@ public SubmitApplicationResponse submitApplication( } routerMetrics.incrAppsFailedSubmitted(); - String errMsg = "Application " - + request.getApplicationSubmissionContext().getApplicationName() - + " with appId " + applicationId + " failed to be submitted."; + String errMsg = String.format("Application %s with appId %s failed to be submitted.", + request.getApplicationSubmissionContext().getApplicationName(), applicationId); RouterAuditLogger.logFailure(user.getShortUserName(), RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", "RouterClientRMService", errMsg, applicationId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 49872e5a41beb..90f1e978824fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -152,7 +152,7 @@ public void setUp() { interceptor.setConf(this.getConf()); interceptor.init(user); - subClusters = new ArrayList(); + subClusters = new ArrayList<>(); try { for (int i = 0; i < NUM_SUBCLUSTER; i++) { @@ -201,8 +201,7 @@ protected YarnConfiguration createConfiguration() { * ApplicationId has to belong to one of the SubCluster in the cluster. */ @Test - public void testGetNewApplication() - throws YarnException, IOException, InterruptedException { + public void testGetNewApplication() throws YarnException, IOException { LOG.info("Test FederationClientInterceptor: Get New Application"); GetNewApplicationRequest request = GetNewApplicationRequest.newInstance();