|
48 | 48 | import org.apache.hadoop.yarn.webapp.NotFoundException;
|
49 | 49 | import org.apache.http.NameValuePair;
|
50 | 50 | import org.apache.http.client.utils.URLEncodedUtils;
|
| 51 | +import org.slf4j.Logger; |
| 52 | +import org.slf4j.LoggerFactory; |
51 | 53 |
|
52 | 54 | import javax.servlet.http.HttpServletRequest;
|
53 | 55 | import javax.ws.rs.container.ContainerRequestContext;
|
54 | 56 |
|
55 | 57 | @Private
|
56 | 58 | @Evolving
|
57 | 59 | public class WebAppUtils {
|
| 60 | + private static final Logger LOG = LoggerFactory.getLogger(WebAppUtils.class); |
58 | 61 | public static final String WEB_APP_TRUSTSTORE_PASSWORD_KEY =
|
59 | 62 | "ssl.server.truststore.password";
|
60 | 63 | public static final String WEB_APP_KEYSTORE_PASSWORD_KEY =
|
@@ -107,17 +110,34 @@ public static void setNMWebAppHostNameAndPort(Configuration conf,
|
107 | 110 | */
|
108 | 111 | public static <T, R> R execOnActiveRM(Configuration conf,
|
109 | 112 | ThrowingBiFunction<String, T, R> func, T arg) throws Exception {
|
110 |
| - int haIndex = 0; |
111 |
| - if (HAUtil.isHAEnabled(conf)) { |
112 |
| - String activeRMId = RMHAUtils.findActiveRMHAId(conf); |
113 |
| - if (activeRMId != null) { |
114 |
| - haIndex = new ArrayList<>(HAUtil.getRMHAIds(conf)).indexOf(activeRMId); |
115 |
| - } else { |
116 |
| - throw new ConnectException("No Active RM available"); |
| 113 | + // If HA is not enabled we are running the function on the only RM that is available. |
| 114 | + if (!HAUtil.isHAEnabled(conf)) { |
| 115 | + String rmAddress = getRMWebAppURLWithScheme(conf, 0); |
| 116 | + return func.apply(rmAddress, arg); |
| 117 | + } |
| 118 | + |
| 119 | + // In HA mode we can find the active RM if user has admin permissions to check service states. |
| 120 | + // Otherwise, activeRMId will be null. |
| 121 | + List<String> rmIds = (List<String>) HAUtil.getRMHAIds(conf); |
| 122 | + String activeRMId = RMHAUtils.findActiveRMHAId(conf); |
| 123 | + if (activeRMId != null) { |
| 124 | + int activeRMIndex = rmIds.indexOf(activeRMId); |
| 125 | + String rmAddress = getRMWebAppURLWithScheme(conf, activeRMIndex); |
| 126 | + return func.apply(rmAddress, arg); |
| 127 | + } |
| 128 | + |
| 129 | + // If user does not have the necessary permissions we have to iterate through the RMs |
| 130 | + // to find the active one. |
| 131 | + for (int i = 0; i < rmIds.size(); i++) { |
| 132 | + try { |
| 133 | + String rmAddress = getRMWebAppURLWithScheme(conf, i); |
| 134 | + return func.apply(rmAddress, arg); |
| 135 | + } catch (Exception e) { |
| 136 | + // Log exception and try next RM if there are any. |
| 137 | + LOG.trace("Exception while connecting to RM", e); |
117 | 138 | }
|
118 | 139 | }
|
119 |
| - String rm1Address = getRMWebAppURLWithScheme(conf, haIndex); |
120 |
| - return func.apply(rm1Address, arg); |
| 140 | + throw new ConnectException("No active RM available to execute this command"); |
121 | 141 | }
|
122 | 142 |
|
123 | 143 | /** A BiFunction which throws on Exception. */
|
|
0 commit comments