|
29 | 29 | import java.util.Map;
|
30 | 30 | import java.util.Map.Entry;
|
31 | 31 | import java.util.Set;
|
32 |
| -import java.util.concurrent.Callable; |
33 | 32 | import java.util.concurrent.CompletionService;
|
34 | 33 | import java.util.concurrent.ExecutorCompletionService;
|
35 | 34 | import java.util.concurrent.ExecutorService;
|
36 | 35 | import java.util.concurrent.Future;
|
37 | 36 | import java.util.concurrent.TimeUnit;
|
| 37 | +import java.util.stream.Collectors; |
38 | 38 | import java.util.stream.Stream;
|
39 | 39 |
|
40 | 40 | import javax.servlet.http.HttpServletRequest;
|
@@ -711,63 +711,37 @@ public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
|
711 | 711 | AppsInfo apps = new AppsInfo();
|
712 | 712 | long startTime = clock.getTime();
|
713 | 713 |
|
714 |
| - Map<SubClusterId, SubClusterInfo> subClustersActive = null; |
715 |
| - try { |
716 |
| - subClustersActive = federationFacade.getSubClusters(true); |
717 |
| - } catch (YarnException e) { |
718 |
| - routerMetrics.incrMultipleAppsFailedRetrieved(); |
719 |
| - return null; |
720 |
| - } |
721 |
| - |
722 |
| - // Send the requests in parallel |
723 |
| - CompletionService<AppsInfo> compSvc = |
724 |
| - new ExecutorCompletionService<>(this.threadpool); |
725 |
| - |
726 | 714 | // HttpServletRequest does not work with ExecutorCompletionService.
|
727 | 715 | // Create a duplicate hsr.
|
728 | 716 | final HttpServletRequest hsrCopy = clone(hsr);
|
| 717 | + Collection<SubClusterInfo> subClusterInfos = federationFacade.getActiveSubClusters(); |
729 | 718 |
|
730 |
| - |
731 |
| - |
732 |
| - for (final SubClusterInfo info : subClustersActive.values()) { |
733 |
| - compSvc.submit(new Callable<AppsInfo>() { |
734 |
| - @Override |
735 |
| - public AppsInfo call() { |
| 719 | + List<AppsInfo> appsInfos = subClusterInfos.parallelStream().map(subClusterInfo -> { |
| 720 | + try { |
736 | 721 | DefaultRequestInterceptorREST interceptor =
|
737 |
| - getOrCreateInterceptorForSubCluster( |
738 |
| - info.getSubClusterId(), info.getRMWebServiceAddress()); |
| 722 | + getOrCreateInterceptorForSubCluster(subClusterInfo); |
739 | 723 | AppsInfo rmApps = interceptor.getApps(hsrCopy, stateQuery,
|
740 | 724 | statesQuery, finalStatusQuery, userQuery, queueQuery, count,
|
741 | 725 | startedBegin, startedEnd, finishBegin, finishEnd,
|
742 | 726 | applicationTypes, applicationTags, name, unselectedFields);
|
743 |
| - |
744 |
| - if (rmApps == null) { |
745 |
| - routerMetrics.incrMultipleAppsFailedRetrieved(); |
746 |
| - LOG.error("Subcluster {} failed to return appReport.", info.getSubClusterId()); |
747 |
| - return null; |
| 727 | + if (rmApps != null) { |
| 728 | + return rmApps; |
748 | 729 | }
|
749 |
| - return rmApps; |
750 |
| - } |
751 |
| - }); |
752 |
| - } |
753 |
| - |
754 |
| - // Collect all the responses in parallel |
755 |
| - for (int i = 0; i < subClustersActive.size(); i++) { |
756 |
| - try { |
757 |
| - Future<AppsInfo> future = compSvc.take(); |
758 |
| - AppsInfo appsResponse = future.get(); |
759 |
| - |
760 |
| - long stopTime = clock.getTime(); |
761 |
| - routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime); |
762 |
| - |
763 |
| - if (appsResponse != null) { |
764 |
| - apps.addAll(appsResponse.getApps()); |
| 730 | + } catch (Exception e) { |
| 731 | + LOG.warn("Failed to get application report.", e); |
765 | 732 | }
|
766 |
| - } catch (Throwable e) { |
767 | 733 | routerMetrics.incrMultipleAppsFailedRetrieved();
|
768 |
| - LOG.warn("Failed to get application report", e); |
769 |
| - } |
770 |
| - } |
| 734 | + LOG.error("Subcluster {} failed to return appReport.", subClusterInfo.getSubClusterId()); |
| 735 | + return null; |
| 736 | + }).collect(Collectors.toList()); |
| 737 | + |
| 738 | + appsInfos.forEach(appsInfo -> { |
| 739 | + if (appsInfo != null) { |
| 740 | + apps.addAll(appsInfo.getApps()); |
| 741 | + long stopTime = clock.getTime(); |
| 742 | + routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime); |
| 743 | + } |
| 744 | + }); |
771 | 745 |
|
772 | 746 | if (apps.getApps().isEmpty()) {
|
773 | 747 | return null;
|
|
0 commit comments