|
19 | 19 | package org.apache.hadoop.yarn.server.router.clientrm;
|
20 | 20 |
|
21 | 21 | import org.apache.commons.collections.CollectionUtils;
|
| 22 | +import org.apache.commons.lang3.tuple.Pair; |
22 | 23 | import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
|
23 | 24 | import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
24 | 25 | import java.io.IOException;
|
25 | 26 | import java.lang.reflect.InvocationTargetException;
|
26 | 27 | import java.lang.reflect.Method;
|
27 |
| -import java.util.ArrayList; |
28 |
| -import java.util.Collection; |
29 |
| -import java.util.List; |
30 |
| -import java.util.Map; |
31 |
| -import java.util.Random; |
32 |
| -import java.util.TreeMap; |
| 28 | +import java.util.*; |
33 | 29 | import java.util.concurrent.BlockingQueue;
|
34 | 30 | import java.util.concurrent.Callable;
|
35 | 31 | import java.util.concurrent.ConcurrentHashMap;
|
@@ -662,14 +658,11 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
|
662 | 658 | RouterServerUtil.logAndThrowException("Missing getApplications request.", null);
|
663 | 659 | }
|
664 | 660 | long startTime = clock.getTime();
|
665 |
| - Map<SubClusterId, SubClusterInfo> subclusters = |
666 |
| - federationFacade.getSubClusters(true); |
667 | 661 | ClientMethod remoteMethod = new ClientMethod("getApplications",
|
668 | 662 | new Class[] {GetApplicationsRequest.class}, new Object[] {request});
|
669 | 663 | Map<SubClusterId, GetApplicationsResponse> applications = null;
|
670 | 664 | try {
|
671 |
| - applications = invokeConcurrent(subclusters.keySet(), remoteMethod, |
672 |
| - GetApplicationsResponse.class); |
| 665 | + applications = invokeConcurrent(remoteMethod, GetApplicationsResponse.class); |
673 | 666 | } catch (Exception ex) {
|
674 | 667 | routerMetrics.incrMultipleAppsFailedRetrieved();
|
675 | 668 | RouterServerUtil.logAndThrowException("Unable to get applications due to exception.", ex);
|
@@ -703,59 +696,51 @@ public GetClusterMetricsResponse getClusterMetrics(
|
703 | 696 | return RouterYarnClientUtils.merge(clusterMetrics);
|
704 | 697 | }
|
705 | 698 |
|
706 |
| - <R> Map<SubClusterId, R> invokeConcurrent(Collection<SubClusterId> clusterIds, |
707 |
| - ClientMethod request, Class<R> clazz) throws YarnException, IOException { |
708 |
| - List<Callable<Object>> callables = new ArrayList<>(); |
709 |
| - List<Future<Object>> futures = new ArrayList<>(); |
710 |
| - Map<SubClusterId, IOException> exceptions = new TreeMap<>(); |
711 |
| - for (SubClusterId subClusterId : clusterIds) { |
| 699 | + <R> Map<SubClusterId, R> invokeConcurrent(ClientMethod request, Class<R> clazz) |
| 700 | + throws YarnException { |
| 701 | + |
| 702 | + Collection<SubClusterId> subClusterIds = federationFacade.getActiveSubClusterIds(); |
| 703 | + |
| 704 | + List<Callable<Pair<SubClusterId, Object>>> callables = new ArrayList<>(); |
| 705 | + List<Future<Pair<SubClusterId, Object>>> futures = new ArrayList<>(); |
| 706 | + Map<SubClusterId, Exception> exceptions = new TreeMap<>(); |
| 707 | + |
| 708 | + // Generate parallel Callable tasks |
| 709 | + for (SubClusterId subClusterId : subClusterIds) { |
712 | 710 | callables.add(() -> {
|
713 | 711 | ApplicationClientProtocol protocol =
|
714 | 712 | getClientRMProxyForSubCluster(subClusterId);
|
715 | 713 | Method method = ApplicationClientProtocol.class
|
716 | 714 | .getMethod(request.getMethodName(), request.getTypes());
|
717 |
| - return method.invoke(protocol, request.getParams()); |
| 715 | + Object result = method.invoke(protocol, request.getParams()); |
| 716 | + return Pair.of(subClusterId, result); |
718 | 717 | });
|
719 | 718 | }
|
| 719 | + |
| 720 | + // Get results from multiple threads |
720 | 721 | Map<SubClusterId, R> results = new TreeMap<>();
|
721 | 722 | try {
|
722 | 723 | futures.addAll(executorService.invokeAll(callables));
|
723 |
| - for (int i = 0; i < futures.size(); i++) { |
724 |
| - Object clusterObj = CollectionUtils.get(clusterIds, i); |
725 |
| - SubClusterId subClusterId = SubClusterId.class.cast(clusterObj); |
| 724 | + futures.stream().forEach(future -> { |
| 725 | + SubClusterId subClusterId = null; |
726 | 726 | try {
|
727 |
| - Future<Object> future = futures.get(i); |
728 |
| - Object result = future.get(); |
| 727 | + Pair<SubClusterId, Object> mapFuture = future.get(); |
| 728 | + subClusterId = mapFuture.getKey(); |
| 729 | + Object result = mapFuture.getValue(); |
729 | 730 | results.put(subClusterId, clazz.cast(result));
|
730 |
| - } catch (ExecutionException ex) { |
731 |
| - Throwable cause = ex.getCause(); |
732 |
| - LOG.debug("Cannot execute {} on {}: {}", request.getMethodName(), |
733 |
| - subClusterId.getId(), cause.getMessage()); |
734 |
| - IOException ioe; |
735 |
| - if (cause instanceof IOException) { |
736 |
| - ioe = (IOException) cause; |
737 |
| - } else if (cause instanceof YarnException) { |
738 |
| - throw (YarnException) cause; |
739 |
| - } else { |
740 |
| - ioe = new IOException( |
741 |
| - "Unhandled exception while calling " + request.getMethodName() |
742 |
| - + ": " + cause.getMessage(), cause); |
743 |
| - } |
744 |
| - // Store the exceptions |
745 |
| - exceptions.put(subClusterId, ioe); |
| 731 | + } catch (InterruptedException | ExecutionException e) { |
| 732 | + Throwable cause = e.getCause(); |
| 733 | + LOG.error("Cannot execute {} on {}: {}", request.getMethodName(), |
| 734 | + subClusterId.getId(), cause.getMessage()); |
| 735 | + exceptions.put(subClusterId, e); |
746 | 736 | }
|
747 |
| - } |
748 |
| - if (results.isEmpty() && !clusterIds.isEmpty()) { |
749 |
| - Object clusterObj = CollectionUtils.get(clusterIds, 0); |
750 |
| - SubClusterId subClusterId = SubClusterId.class.cast(clusterObj); |
751 |
| - IOException ioe = exceptions.get(subClusterId); |
752 |
| - if (ioe != null) { |
753 |
| - throw ioe; |
754 |
| - } |
755 |
| - } |
| 737 | + }); |
756 | 738 | } catch (InterruptedException e) {
|
757 | 739 | throw new YarnException(e);
|
758 | 740 | }
|
| 741 | + |
| 742 | + |
| 743 | + |
759 | 744 | return results;
|
760 | 745 | }
|
761 | 746 |
|
|
0 commit comments