Skip to content

Commit

Permalink
Addressing review comments and some other formatting issues
Browse files Browse the repository at this point in the history
  • Loading branch information
akshatb1 committed Jun 25, 2021
1 parent d662c59 commit 0497bfc
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void initializeMemberVariables() {
configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY);
configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED);
.add(YarnConfiguration.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED);
configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
configurationPrefixToSkipCompare
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand Down Expand Up @@ -635,9 +636,8 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
federationFacade.getSubClusters(true);
ClientMethod remoteMethod = new ClientMethod("getApplications",
new Class[] {GetApplicationsRequest.class}, new Object[] {request});
ArrayList<SubClusterId> clusterIds = new ArrayList<>(subclusters.keySet());
Map<SubClusterId, GetApplicationsResponse> applications =
invokeConcurrent(clusterIds, remoteMethod,
invokeConcurrent(subclusters.keySet(), remoteMethod,
GetApplicationsResponse.class);

// Merge the Application Reports
Expand Down Expand Up @@ -715,6 +715,11 @@ public Object call() throws Exception {
}
return results;
}
<R> Map<SubClusterId, R> invokeConcurrent(Collection<SubClusterId> clusterIds,
ClientMethod request, Class<R> clazz) throws YarnException, IOException {
ArrayList<SubClusterId> clusterIdList = new ArrayList<>(clusterIds);
return invokeConcurrent(clusterIdList, request, clazz);
}

@Override
public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ public static GetApplicationsResponse mergeApplications(
mergeAMWithUAM(federationAM.get(appId), appReport);
} else if (federationUAMSum.containsKey(appId)) {
// Merge the current UAM with its own UAM and update the list of UAM
federationUAMSum.put(appId,
mergeUAMWithUAM(federationUAMSum.get(appId), appReport));
ApplicationReport mergedUAMReport =
mergeUAMWithUAM(federationUAMSum.get(appId), appReport);
federationUAMSum.put(appId, mergedUAMReport);
} else {
// Insert in the list of UAM
federationUAMSum.put(appId, appReport);
Expand Down Expand Up @@ -180,9 +181,13 @@ private static void mergeAMWithUAM(ApplicationReport am,
*/
private static boolean mergeUamToReport(String appName,
boolean returnPartialResult){

return returnPartialResult || (appName != null &&
!(appName.startsWith(UnmanagedApplicationManager.APP_NAME) ||
appName.startsWith(PARTIAL_REPORT)));
if (returnPartialResult) {
return true;
}
if (appName == null) {
return false;
}
return !(appName.startsWith(UnmanagedApplicationManager.APP_NAME) ||
appName.startsWith(PARTIAL_REPORT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,7 @@ public void testGetClusterMetricsRequest() throws YarnException, IOException {
@Test
public void testGetApplicationsResponse()
throws YarnException, IOException, InterruptedException {
LOG.info("Test FederationClientInterceptor: " +
"Get Applications Response");
LOG.info("Test FederationClientInterceptor: Get Applications Response");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);

Expand All @@ -573,7 +572,7 @@ public void testGetApplicationsResponse()
*/
@Test
public void testGetApplicationsNullRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Applications request");
LOG.info("Test FederationClientInterceptor: Get Applications request");
LambdaTestUtils.intercept(YarnException.class,
"Missing getApplications request.",
() -> interceptor.getApplications(null));
Expand All @@ -586,11 +585,11 @@ public void testGetApplicationsNullRequest() throws Exception {
*/
@Test
public void testGetApplicationsApplicationTypeNotExists() throws Exception{
LOG.info("Test FederationClientInterceptor :" +
" Application with type does not exist");
LOG.info("Test FederationClientInterceptor: Application with type does "
+ "not exist");

ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationId.newInstance(System.currentTimeMillis(), 1);

SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
SubmitApplicationResponse response = interceptor.submitApplication(request);
Expand All @@ -604,7 +603,7 @@ public void testGetApplicationsApplicationTypeNotExists() throws Exception{
GetApplicationsRequest.newInstance(appTypes);

GetApplicationsResponse responseGet =
interceptor.getApplications(requestGet);
interceptor.getApplications(requestGet);

Assert.assertNotNull(responseGet);
Assert.assertTrue(responseGet.getApplicationList().isEmpty());
Expand All @@ -617,11 +616,11 @@ public void testGetApplicationsApplicationTypeNotExists() throws Exception{
*/
@Test
public void testGetApplicationsApplicationStateNotExists() throws Exception{
LOG.info("Test FederationClientInterceptor :" +
" Application with state does not exist");
LOG.info("Test FederationClientInterceptor:" +
" Application with state does not exist");

ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationId.newInstance(System.currentTimeMillis(), 1);

SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
SubmitApplicationResponse response = interceptor.submitApplication(request);
Expand All @@ -630,16 +629,16 @@ public void testGetApplicationsApplicationStateNotExists() throws Exception{
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));

EnumSet<YarnApplicationState> applicationStates = EnumSet.noneOf(
YarnApplicationState.class);
YarnApplicationState.class);
applicationStates.add(YarnApplicationState.KILLED);

GetApplicationsRequest requestGet =
GetApplicationsRequest.newInstance(applicationStates);

GetApplicationsResponse responseGet =
interceptor.getApplications(requestGet);
interceptor.getApplications(requestGet);

Assert.assertNotNull(responseGet);
Assert.assertEquals(0, responseGet.getApplicationList().size());
Assert.assertTrue(responseGet.getApplicationList().isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
*/
public class TestRouterYarnClientUtils {

private final static String PARTIAL_REPORT = "Partial Report ";

@Test
public void testClusterMetricsMerge() {
ArrayList<GetClusterMetricsResponse> responses = new ArrayList<>();
Expand Down Expand Up @@ -72,27 +74,54 @@ public GetClusterMetricsResponse getClusterMetricsResponse(int value) {
@Test
public void testMergeApplications() {
ArrayList<GetApplicationsResponse> responses = new ArrayList<>();
responses.add(getApplicationsResponse(1));
responses.add(getApplicationsResponse(2));
responses.add(getApplicationsResponse(1, false));
responses.add(getApplicationsResponse(2, false));
GetApplicationsResponse result = RouterYarnClientUtils.
mergeApplications(responses, false);
Assert.assertNotNull(result);
Assert.assertEquals(2, result.getApplicationList().size());
}

/**
* This test validates the correctness of
* RouterYarnClientUtils#mergeApplications.
*/
@Test
public void testMergeUnmanagedApplications() {
ArrayList<GetApplicationsResponse> responses = new ArrayList<>();
responses.add(getApplicationsResponse(1, true));

// Check response if partial results are disabled
GetApplicationsResponse result = RouterYarnClientUtils.
mergeApplications(responses, false);
Assert.assertNotNull(result);
Assert.assertTrue(result.getApplicationList().isEmpty());

// Check response if partial results are enabled
result = RouterYarnClientUtils.
mergeApplications(responses, true);
Assert.assertNotNull(result);
Assert.assertEquals(1, result.getApplicationList().size());
String appName = result.getApplicationList().iterator().next().getName();
Assert.assertTrue(appName.startsWith(PARTIAL_REPORT));
}

/**
* This generates a GetApplicationsResponse with 2 applications with
* same ApplicationId. One of them is added with host value equal to
* null to validate unmanaged application merge with managed application.
* same ApplicationId.
* @param value Used as Id in ApplicationId
* @param uamOnly If set to true, only unmanaged applications are added in
* response, else one managed and one unmanaged applications
* are added with same ApplicationId.
* @return GetApplicationsResponse
*/
private GetApplicationsResponse getApplicationsResponse(int value) {
private GetApplicationsResponse getApplicationsResponse(int value,
boolean uamOnly) {
String host = uamOnly? null: "host";
List<ApplicationReport> applications = new ArrayList<>();

//Add managed application to list
ApplicationId appId = ApplicationId.newInstance(1234,
value);
ApplicationId appId = ApplicationId.newInstance(1234, value);
Resource resource = Resource.newInstance(1024, 1);
ApplicationResourceUsageReport appResourceUsageReport =
ApplicationResourceUsageReport.newInstance(
Expand All @@ -101,27 +130,24 @@ private GetApplicationsResponse getApplicationsResponse(int value) {
0.1f, null);

ApplicationReport appReport = ApplicationReport.newInstance(
appId, ApplicationAttemptId.newInstance(appId,
1),
"user", "queue", "appname", "host",
appId, ApplicationAttemptId.newInstance(appId, 1),
"user", "queue", "appname", host,
124, null, YarnApplicationState.RUNNING,
"diagnostics", "url", 0, 0,
0, FinalApplicationStatus.SUCCEEDED,
appResourceUsageReport,
"N/A", 0.53789f, "YARN",
null);

//Add unmanaged application to list
ApplicationId appId2 = ApplicationId.newInstance(1234,
value);
// Add unmanaged application to list
ApplicationId appId2 = ApplicationId.newInstance(1234, value);
ApplicationReport appReport2 = ApplicationReport.newInstance(
appId2, ApplicationAttemptId.newInstance(appId2,
1),
appId2, ApplicationAttemptId.newInstance(appId2, 1),
"user", "queue", "appname", null, 124,
null, YarnApplicationState.RUNNING,
"diagnostics", "url", 0, 0,
0, FinalApplicationStatus.SUCCEEDED,
appResourceUsageReport, "N/A", 0.53789f,
0, FinalApplicationStatus.SUCCEEDED, appResourceUsageReport,
"N/A", 0.53789f,
"YARN", null);

applications.add(appReport);
Expand Down

0 comments on commit 0497bfc

Please sign in to comment.