18
18
19
19
package org .apache .hadoop .yarn .server .router .clientrm ;
20
20
21
- import org .apache .commons .collections . CollectionUtils ;
21
+ import org .apache .commons .lang3 . StringUtils ;
22
22
import org .apache .commons .lang3 .tuple .Pair ;
23
23
import org .apache .hadoop .thirdparty .com .google .common .collect .Maps ;
24
24
import org .apache .hadoop .thirdparty .com .google .common .util .concurrent .ThreadFactoryBuilder ;
25
25
import java .io .IOException ;
26
26
import java .lang .reflect .InvocationTargetException ;
27
27
import java .lang .reflect .Method ;
28
- import java .util .*;
28
+ import java .util .ArrayList ;
29
+ import java .util .Collection ;
30
+ import java .util .List ;
31
+ import java .util .Map ;
32
+ import java .util .Random ;
33
+ import java .util .TreeMap ;
34
+ import java .util .Set ;
29
35
import java .util .concurrent .BlockingQueue ;
30
36
import java .util .concurrent .Callable ;
31
37
import java .util .concurrent .ConcurrentHashMap ;
@@ -660,7 +666,7 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
660
666
long startTime = clock .getTime ();
661
667
ClientMethod remoteMethod = new ClientMethod ("getApplications" ,
662
668
new Class [] {GetApplicationsRequest .class }, new Object [] {request });
663
- Map < SubClusterId , GetApplicationsResponse > applications = null ;
669
+ Collection < GetApplicationsResponse > applications = null ;
664
670
try {
665
671
applications = invokeConcurrent (remoteMethod , GetApplicationsResponse .class );
666
672
} catch (Exception ex ) {
@@ -670,7 +676,7 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
670
676
long stopTime = clock .getTime ();
671
677
routerMetrics .succeededMultipleAppsRetrieved (stopTime - startTime );
672
678
// Merge the Application Reports
673
- return RouterYarnClientUtils .mergeApplications (applications . values () , returnPartialReport );
679
+ return RouterYarnClientUtils .mergeApplications (applications , returnPartialReport );
674
680
}
675
681
676
682
@ Override
@@ -685,8 +691,7 @@ public GetClusterMetricsResponse getClusterMetrics(
685
691
new Class [] {GetClusterMetricsRequest .class }, new Object [] {request });
686
692
Collection <GetClusterMetricsResponse > clusterMetrics = null ;
687
693
try {
688
- clusterMetrics = invokeAppClientProtocolMethod (
689
- true , remoteMethod , GetClusterMetricsResponse .class );
694
+ clusterMetrics = invokeConcurrent (remoteMethod , GetClusterMetricsResponse .class );
690
695
} catch (Exception ex ) {
691
696
routerMetrics .incrGetClusterMetricsFailedRetrieved ();
692
697
RouterServerUtil .logAndThrowException ("Unable to get cluster metrics due to exception." , ex );
@@ -696,7 +701,7 @@ public GetClusterMetricsResponse getClusterMetrics(
696
701
return RouterYarnClientUtils .merge (clusterMetrics );
697
702
}
698
703
699
- <R > Map < SubClusterId , R > invokeConcurrent (ClientMethod request , Class <R > clazz )
704
+ <R > Collection < R > invokeConcurrent (ClientMethod request , Class <R > clazz )
700
705
throws YarnException {
701
706
702
707
Collection <SubClusterId > subClusterIds = federationFacade .getActiveSubClusterIds ();
@@ -724,24 +729,31 @@ <R> Map<SubClusterId, R> invokeConcurrent(ClientMethod request, Class<R> clazz)
724
729
futures .stream ().forEach (future -> {
725
730
SubClusterId subClusterId = null ;
726
731
try {
727
- Pair <SubClusterId , Object > mapFuture = future .get ();
728
- subClusterId = mapFuture .getKey ();
729
- Object result = mapFuture .getValue ();
732
+ Pair <SubClusterId , Object > pair = future .get ();
733
+ subClusterId = pair .getKey ();
734
+ Object result = pair .getValue ();
730
735
results .put (subClusterId , clazz .cast (result ));
731
736
} catch (InterruptedException | ExecutionException e ) {
732
737
Throwable cause = e .getCause ();
733
738
LOG .error ("Cannot execute {} on {}: {}" , request .getMethodName (),
734
- subClusterId .getId (), cause .getMessage ());
739
+ subClusterId .getId (), cause .getMessage ());
735
740
exceptions .put (subClusterId , e );
736
741
}
737
742
});
738
743
} catch (InterruptedException e ) {
739
744
throw new YarnException (e );
740
745
}
741
746
747
+ // All sub-clusters return results to be considered successful,
748
+ // otherwise an exception will be thrown.
749
+ if (exceptions != null && !exceptions .isEmpty ()) {
750
+ Set <SubClusterId > subClusterIdSets = exceptions .keySet ();
751
+ throw new YarnException ("invokeConcurrent Failed, An exception occurred in subClusterIds = " +
752
+ StringUtils .join (subClusterIdSets , "," ));
753
+ }
742
754
743
-
744
- return results ;
755
+ // return result
756
+ return results . values () ;
745
757
}
746
758
747
759
@ Override
@@ -752,24 +764,19 @@ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
752
764
RouterServerUtil .logAndThrowException ("Missing getClusterNodes request." , null );
753
765
}
754
766
long startTime = clock .getTime ();
755
- Map <SubClusterId , SubClusterInfo > subClusters =
756
- federationFacade .getSubClusters (true );
757
- Map <SubClusterId , GetClusterNodesResponse > clusterNodes = Maps .newHashMap ();
758
- for (SubClusterId subClusterId : subClusters .keySet ()) {
759
- ApplicationClientProtocol client ;
760
- try {
761
- client = getClientRMProxyForSubCluster (subClusterId );
762
- GetClusterNodesResponse response = client .getClusterNodes (request );
763
- clusterNodes .put (subClusterId , response );
764
- } catch (Exception ex ) {
765
- routerMetrics .incrClusterNodesFailedRetrieved ();
766
- RouterServerUtil .logAndThrowException ("Unable to get cluster nodes due to exception." , ex );
767
- }
767
+ ClientMethod remoteMethod = new ClientMethod ("getClusterNodes" ,
768
+ new Class []{GetClusterNodesRequest .class }, new Object []{request });
769
+ Collection <GetClusterNodesResponse > clusterNodes = null ;
770
+ try {
771
+ clusterNodes = invokeConcurrent (remoteMethod , GetClusterNodesResponse .class );
772
+ } catch (Exception ex ) {
773
+ routerMetrics .incrClusterNodesFailedRetrieved ();
774
+ RouterServerUtil .logAndThrowException ("Unable to get cluster nodes due to exception." , ex );
768
775
}
769
776
long stopTime = clock .getTime ();
770
777
routerMetrics .succeededGetClusterNodesRetrieved (stopTime - startTime );
771
778
// Merge the NodesResponse
772
- return RouterYarnClientUtils .mergeClusterNodesResponse (clusterNodes . values () );
779
+ return RouterYarnClientUtils .mergeClusterNodesResponse (clusterNodes );
773
780
}
774
781
775
782
@ Override
@@ -785,8 +792,7 @@ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
785
792
new Class []{GetQueueInfoRequest .class }, new Object []{request });
786
793
Collection <GetQueueInfoResponse > queues = null ;
787
794
try {
788
- queues = invokeAppClientProtocolMethod (true , remoteMethod ,
789
- GetQueueInfoResponse .class );
795
+ queues = invokeConcurrent (remoteMethod , GetQueueInfoResponse .class );
790
796
} catch (Exception ex ) {
791
797
routerMetrics .incrGetQueueInfoFailedRetrieved ();
792
798
RouterServerUtil .logAndThrowException ("Unable to get queue [" +
@@ -810,8 +816,7 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls(
810
816
new Class [] {GetQueueUserAclsInfoRequest .class }, new Object [] {request });
811
817
Collection <GetQueueUserAclsInfoResponse > queueUserAcls = null ;
812
818
try {
813
- queueUserAcls = invokeAppClientProtocolMethod (true , remoteMethod ,
814
- GetQueueUserAclsInfoResponse .class );
819
+ queueUserAcls = invokeConcurrent (remoteMethod , GetQueueUserAclsInfoResponse .class );
815
820
} catch (Exception ex ) {
816
821
routerMetrics .incrQueueUserAclsFailedRetrieved ();
817
822
RouterServerUtil .logAndThrowException ("Unable to get queue user Acls due to exception." , ex );
@@ -971,8 +976,7 @@ public ReservationListResponse listReservations(
971
976
new Class [] {ReservationListRequest .class }, new Object [] {request });
972
977
Collection <ReservationListResponse > listResponses = null ;
973
978
try {
974
- listResponses = invokeAppClientProtocolMethod (true , remoteMethod ,
975
- ReservationListResponse .class );
979
+ listResponses = invokeConcurrent (remoteMethod , ReservationListResponse .class );
976
980
} catch (Exception ex ) {
977
981
routerMetrics .incrListReservationsFailedRetrieved ();
978
982
RouterServerUtil .logAndThrowException (
@@ -1051,24 +1055,6 @@ public ReservationDeleteResponse deleteReservation(
1051
1055
throw new YarnException (msg );
1052
1056
}
1053
1057
1054
- private <R > Collection <R > invokeAppClientProtocolMethod (
1055
- Boolean filterInactiveSubClusters , ClientMethod request , Class <R > clazz )
1056
- throws YarnException , RuntimeException {
1057
- Map <SubClusterId , SubClusterInfo > subClusters =
1058
- federationFacade .getSubClusters (filterInactiveSubClusters );
1059
- return subClusters .keySet ().stream ().map (subClusterId -> {
1060
- try {
1061
- ApplicationClientProtocol protocol = getClientRMProxyForSubCluster (subClusterId );
1062
- Method method = ApplicationClientProtocol .class .
1063
- getMethod (request .getMethodName (), request .getTypes ());
1064
- return clazz .cast (method .invoke (protocol , request .getParams ()));
1065
- } catch (YarnException | NoSuchMethodException |
1066
- IllegalAccessException | InvocationTargetException ex ) {
1067
- throw new RuntimeException (ex );
1068
- }
1069
- }).collect (Collectors .toList ());
1070
- }
1071
-
1072
1058
@ Override
1073
1059
public GetNodesToLabelsResponse getNodeToLabels (
1074
1060
GetNodesToLabelsRequest request ) throws YarnException , IOException {
@@ -1081,8 +1067,7 @@ public GetNodesToLabelsResponse getNodeToLabels(
1081
1067
new Class [] {GetNodesToLabelsRequest .class }, new Object [] {request });
1082
1068
Collection <GetNodesToLabelsResponse > clusterNodes = null ;
1083
1069
try {
1084
- clusterNodes = invokeAppClientProtocolMethod (true , remoteMethod ,
1085
- GetNodesToLabelsResponse .class );
1070
+ clusterNodes = invokeConcurrent (remoteMethod , GetNodesToLabelsResponse .class );
1086
1071
} catch (Exception ex ) {
1087
1072
routerMetrics .incrNodeToLabelsFailedRetrieved ();
1088
1073
RouterServerUtil .logAndThrowException ("Unable to get node label due to exception." , ex );
@@ -1105,8 +1090,7 @@ public GetLabelsToNodesResponse getLabelsToNodes(
1105
1090
new Class [] {GetLabelsToNodesRequest .class }, new Object [] {request });
1106
1091
Collection <GetLabelsToNodesResponse > labelNodes = null ;
1107
1092
try {
1108
- labelNodes = invokeAppClientProtocolMethod (true , remoteMethod ,
1109
- GetLabelsToNodesResponse .class );
1093
+ labelNodes = invokeConcurrent (remoteMethod , GetLabelsToNodesResponse .class );
1110
1094
} catch (Exception ex ) {
1111
1095
routerMetrics .incrLabelsToNodesFailedRetrieved ();
1112
1096
RouterServerUtil .logAndThrowException ("Unable to get label node due to exception." , ex );
@@ -1129,8 +1113,7 @@ public GetClusterNodeLabelsResponse getClusterNodeLabels(
1129
1113
new Class [] {GetClusterNodeLabelsRequest .class }, new Object [] {request });
1130
1114
Collection <GetClusterNodeLabelsResponse > nodeLabels = null ;
1131
1115
try {
1132
- nodeLabels = invokeAppClientProtocolMethod (true , remoteMethod ,
1133
- GetClusterNodeLabelsResponse .class );
1116
+ nodeLabels = invokeConcurrent (remoteMethod , GetClusterNodeLabelsResponse .class );
1134
1117
} catch (Exception ex ) {
1135
1118
routerMetrics .incrClusterNodeLabelsFailedRetrieved ();
1136
1119
RouterServerUtil .logAndThrowException ("Unable to get cluster nodeLabels due to exception." ,
@@ -1542,8 +1525,7 @@ public GetAllResourceProfilesResponse getResourceProfiles(
1542
1525
new Class [] {GetAllResourceProfilesRequest .class }, new Object [] {request });
1543
1526
Collection <GetAllResourceProfilesResponse > resourceProfiles = null ;
1544
1527
try {
1545
- resourceProfiles = invokeAppClientProtocolMethod (true , remoteMethod ,
1546
- GetAllResourceProfilesResponse .class );
1528
+ resourceProfiles = invokeConcurrent (remoteMethod , GetAllResourceProfilesResponse .class );
1547
1529
} catch (Exception ex ) {
1548
1530
routerMetrics .incrGetResourceProfilesFailedRetrieved ();
1549
1531
RouterServerUtil .logAndThrowException ("Unable to get resource profiles due to exception." ,
@@ -1567,8 +1549,7 @@ public GetResourceProfileResponse getResourceProfile(
1567
1549
new Class [] {GetResourceProfileRequest .class }, new Object [] {request });
1568
1550
Collection <GetResourceProfileResponse > resourceProfile = null ;
1569
1551
try {
1570
- resourceProfile = invokeAppClientProtocolMethod (true , remoteMethod ,
1571
- GetResourceProfileResponse .class );
1552
+ resourceProfile = invokeConcurrent (remoteMethod , GetResourceProfileResponse .class );
1572
1553
} catch (Exception ex ) {
1573
1554
routerMetrics .incrGetResourceProfileFailedRetrieved ();
1574
1555
RouterServerUtil .logAndThrowException ("Unable to get resource profile due to exception." ,
@@ -1591,8 +1572,7 @@ public GetAllResourceTypeInfoResponse getResourceTypeInfo(
1591
1572
new Class [] {GetAllResourceTypeInfoRequest .class }, new Object [] {request });
1592
1573
Collection <GetAllResourceTypeInfoResponse > listResourceTypeInfo ;
1593
1574
try {
1594
- listResourceTypeInfo = invokeAppClientProtocolMethod (true , remoteMethod ,
1595
- GetAllResourceTypeInfoResponse .class );
1575
+ listResourceTypeInfo = invokeConcurrent (remoteMethod , GetAllResourceTypeInfoResponse .class );
1596
1576
} catch (Exception ex ) {
1597
1577
routerMetrics .incrResourceTypeInfoFailedRetrieved ();
1598
1578
LOG .error ("Unable to get all resource type info node due to exception." , ex );
@@ -1623,8 +1603,7 @@ public GetAttributesToNodesResponse getAttributesToNodes(
1623
1603
new Class [] {GetAttributesToNodesRequest .class }, new Object [] {request });
1624
1604
Collection <GetAttributesToNodesResponse > attributesToNodesResponses = null ;
1625
1605
try {
1626
- attributesToNodesResponses = invokeAppClientProtocolMethod (true , remoteMethod ,
1627
- GetAttributesToNodesResponse .class );
1606
+ attributesToNodesResponses = invokeConcurrent (remoteMethod , GetAttributesToNodesResponse .class );
1628
1607
} catch (Exception ex ) {
1629
1608
routerMetrics .incrGetAttributesToNodesFailedRetrieved ();
1630
1609
RouterServerUtil .logAndThrowException ("Unable to get attributes to nodes due to exception." ,
@@ -1647,7 +1626,7 @@ public GetClusterNodeAttributesResponse getClusterNodeAttributes(
1647
1626
new Class [] {GetClusterNodeAttributesRequest .class }, new Object [] {request });
1648
1627
Collection <GetClusterNodeAttributesResponse > clusterNodeAttributesResponses = null ;
1649
1628
try {
1650
- clusterNodeAttributesResponses = invokeAppClientProtocolMethod ( true , remoteMethod ,
1629
+ clusterNodeAttributesResponses = invokeConcurrent ( remoteMethod ,
1651
1630
GetClusterNodeAttributesResponse .class );
1652
1631
} catch (Exception ex ) {
1653
1632
routerMetrics .incrGetClusterNodeAttributesFailedRetrieved ();
@@ -1672,7 +1651,7 @@ public GetNodesToAttributesResponse getNodesToAttributes(
1672
1651
new Class [] {GetNodesToAttributesRequest .class }, new Object [] {request });
1673
1652
Collection <GetNodesToAttributesResponse > nodesToAttributesResponses = null ;
1674
1653
try {
1675
- nodesToAttributesResponses = invokeAppClientProtocolMethod ( true , remoteMethod ,
1654
+ nodesToAttributesResponses = invokeConcurrent ( remoteMethod ,
1676
1655
GetNodesToAttributesResponse .class );
1677
1656
} catch (Exception ex ) {
1678
1657
routerMetrics .incrGetNodesToAttributesFailedRetrieved ();
0 commit comments