Skip to content

Commit a452103

Browse files
author
liubin04
committed
YARN-11871. Make client retry when no active sub-cluster available.
1 parent bcf8c35 commit a452103

File tree

31 files changed

+151
-88
lines changed

31 files changed

+151
-88
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ protected static RetryPolicy createRetryPolicy(Configuration conf,
343343

344344
return RetryPolicies.failoverOnNetworkException(
345345
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
346-
failoverSleepBaseMs, failoverSleepMaxMs);
346+
maxFailoverAttempts, failoverSleepBaseMs, failoverSleepMaxMs);
347347
}
348348

349349
if (rmConnectionRetryIntervalMS < 0) {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
package org.apache.hadoop.yarn.server.federation.policies;
2020

21+
import java.io.IOException;
2122
import java.util.Map;
2223

24+
import org.apache.hadoop.ipc.RetriableException;
2325
import org.apache.hadoop.yarn.exceptions.YarnException;
2426
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
2527
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
@@ -137,16 +139,16 @@ public void setPolicyContext(
137139
*
138140
* @return the map of ids to info for all active subclusters.
139141
*
140-
* @throws YarnException if we can't get the list.
142+
* @throws IOException if we can't get the list.
141143
*/
142144
protected Map<SubClusterId, SubClusterInfo> getActiveSubclusters()
143-
throws YarnException {
145+
throws YarnException, IOException {
144146

145147
Map<SubClusterId, SubClusterInfo> activeSubclusters =
146148
getPolicyContext().getFederationStateStoreFacade().getSubClusters(true);
147149

148150
if (activeSubclusters == null || activeSubclusters.size() < 1) {
149-
throw new NoActiveSubclustersException(
151+
throw new RetriableException(
150152
"Zero active subclusters, cannot pick where to send job.");
151153
}
152154
return activeSubclusters;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.hadoop.yarn.server.federation.policies;
1919

20+
import java.io.IOException;
2021
import java.nio.ByteBuffer;
2122
import java.nio.charset.StandardCharsets;
2223
import java.util.ArrayList;
@@ -25,6 +26,7 @@
2526

2627
import org.apache.hadoop.classification.InterfaceAudience.Private;
2728
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.hadoop.ipc.RetriableException;
2830
import org.apache.hadoop.yarn.conf.YarnConfiguration;
2931
import org.apache.hadoop.yarn.exceptions.YarnException;
3032
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
@@ -185,12 +187,12 @@ public static FederationAMRMProxyPolicy loadAMRMPolicy(String queue,
185187
* @param blackListSubClusters the list of subClusters as identified by
186188
* {@link SubClusterId} to blackList from the selection of the home
187189
* subCluster.
188-
* @throws FederationPolicyException if there are no usable subclusters.
190+
* @throws IOException if there are no usable subclusters.
189191
*/
190192
public static void validateSubClusterAvailability(
191193
Collection<SubClusterId> activeSubClusters,
192194
Collection<SubClusterId> blackListSubClusters)
193-
throws FederationPolicyException {
195+
throws IOException {
194196
if (activeSubClusters != null && !activeSubClusters.isEmpty()) {
195197
if (blackListSubClusters == null) {
196198
return;
@@ -202,7 +204,7 @@ public static void validateSubClusterAvailability(
202204
}
203205
}
204206
}
205-
throw new FederationPolicyException(
207+
throw new RetriableException(
206208
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
207209
}
208210

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.hadoop.yarn.server.federation.policies;
1919

20+
import java.io.IOException;
2021
import java.nio.ByteBuffer;
2122
import java.nio.charset.StandardCharsets;
2223
import java.util.List;
@@ -127,7 +128,7 @@ public RouterPolicyFacade(Configuration conf,
127128
*/
128129
public SubClusterId getHomeSubcluster(
129130
ApplicationSubmissionContext appSubmissionContext,
130-
List<SubClusterId> blackListSubClusters) throws YarnException {
131+
List<SubClusterId> blackListSubClusters) throws YarnException, IOException {
131132

132133
// the maps are concurrent, but we need to protect from reset()
133134
// reinitialization mid-execution by creating a new reference local to this
@@ -233,7 +234,7 @@ public synchronized void reset() {
233234
* valid sub-cluster id could be found for this reservation.
234235
*/
235236
public SubClusterId getReservationHomeSubCluster(
236-
ReservationSubmissionRequest request) throws YarnException {
237+
ReservationSubmissionRequest request) throws YarnException, IOException {
237238

238239
// the maps are concurrent, but we need to protect from reset()
239240
// reinitialization mid-execution by creating a new reference local to this

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
2020

21+
import java.io.IOException;
2122
import java.util.HashMap;
2223
import java.util.List;
2324
import java.util.Map;
@@ -51,7 +52,7 @@ public void reinitialize(
5152
@Override
5253
public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
5354
List<ResourceRequest> resourceRequests,
54-
Set<SubClusterId> timedOutSubClusters) throws YarnException {
55+
Set<SubClusterId> timedOutSubClusters) throws YarnException, IOException {
5556

5657
Map<SubClusterId, SubClusterInfo> activeSubclusters =
5758
getActiveSubclusters();

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
1919

20+
import java.io.IOException;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.Set;
@@ -49,7 +50,7 @@ public interface FederationAMRMProxyPolicy
4950
*/
5051
Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
5152
List<ResourceRequest> resourceRequests,
52-
Set<SubClusterId> timedOutSubClusters) throws YarnException;
53+
Set<SubClusterId> timedOutSubClusters) throws YarnException, IOException;
5354

5455
/**
5556
* This method should be invoked to notify the policy about responses being

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818

1919
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
2020

21+
import java.io.IOException;
2122
import java.util.ArrayList;
2223
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.Set;
2627

28+
import org.apache.hadoop.ipc.RetriableException;
2729
import org.apache.hadoop.yarn.api.records.ResourceRequest;
2830
import org.apache.hadoop.yarn.exceptions.YarnException;
2931
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@@ -57,14 +59,14 @@ public void reinitialize(
5759
@Override
5860
public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
5961
List<ResourceRequest> resourceRequests,
60-
Set<SubClusterId> timedOutSubClusters) throws YarnException {
62+
Set<SubClusterId> timedOutSubClusters) throws YarnException, IOException {
6163
if (homeSubcluster == null) {
6264
throw new FederationPolicyException("No home subcluster available");
6365
}
6466

6567
Map<SubClusterId, SubClusterInfo> active = getActiveSubclusters();
6668
if (!active.containsKey(homeSubcluster)) {
67-
throw new FederationPolicyException(
69+
throw new RetriableException(
6870
"The local subcluster " + homeSubcluster + " is not active");
6971
}
7072

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
2020

21+
import java.io.IOException;
2122
import java.util.ArrayList;
2223
import java.util.Collection;
2324
import java.util.HashMap;
@@ -34,6 +35,7 @@
3435

3536
import org.apache.commons.collections4.MapUtils;
3637
import org.apache.hadoop.conf.Configuration;
38+
import org.apache.hadoop.ipc.RetriableException;
3739
import org.apache.hadoop.util.StringUtils;
3840
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
3941
import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
@@ -46,7 +48,6 @@
4648
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
4749
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
4850
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
49-
import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
5051
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
5152
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
5253
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@@ -267,7 +268,7 @@ public void notifyOfResponse(SubClusterId subClusterId,
267268
@Override
268269
public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
269270
List<ResourceRequest> resourceRequests,
270-
Set<SubClusterId> timedOutSubClusters) throws YarnException {
271+
Set<SubClusterId> timedOutSubClusters) throws YarnException, IOException {
271272

272273
// object used to accumulate statistics about the answer, initialize with
273274
// active subclusters. Create a new instance per call because this method
@@ -712,7 +713,8 @@ protected final class AllocationBookkeeper {
712713

713714
private void reinitialize(
714715
Map<SubClusterId, SubClusterInfo> activeSubclusters,
715-
Set<SubClusterId> timedOutSubClusters, Configuration pConf) throws YarnException {
716+
Set<SubClusterId> timedOutSubClusters, Configuration pConf)
717+
throws YarnException, IOException {
716718

717719
if (MapUtils.isEmpty(activeSubclusters)) {
718720
throw new YarnRuntimeException("null activeSubclusters received");
@@ -752,7 +754,7 @@ private void reinitialize(
752754
String errorMsg = "None of the subClusters enabled in this Policy (weight > 0) are "
753755
+ "currently active we cannot forward the ResourceRequest(s)";
754756
if (failOnError) {
755-
throw new NoActiveSubclustersException(errorMsg);
757+
throw new RetriableException(errorMsg);
756758
} else {
757759
LOG.error(errorMsg + ", continuing by enabling all active subClusters.");
758760
activeAndEnabledSC.addAll(activeSubclusters.keySet());

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.yarn.server.federation.policies.router;
2020

21+
import java.io.IOException;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.Collections;
@@ -84,7 +85,8 @@ public void validate(ApplicationSubmissionContext appSubmissionContext)
8485
* @throws YarnException if the policy fails to choose a sub-cluster
8586
*/
8687
protected abstract SubClusterId chooseSubCluster(String queue,
87-
Map<SubClusterId, SubClusterInfo> preSelectSubClusters) throws YarnException;
88+
Map<SubClusterId, SubClusterInfo> preSelectSubClusters)
89+
throws YarnException, IOException;
8890

8991
/**
9092
* Filter chosen SubCluster based on reservationId.
@@ -130,11 +132,11 @@ protected Map<SubClusterId, SubClusterInfo> prefilterSubClusters(
130132
* @return a hash-based chosen {@link SubClusterId} that will be the "home"
131133
* for this application.
132134
*
133-
* @throws YarnException if there are no active subclusters.
135+
* @throws IOException if there are no active subclusters.
134136
*/
135137
@Override
136138
public SubClusterId getHomeSubcluster(ApplicationSubmissionContext appContext,
137-
List<SubClusterId> blackLists) throws YarnException {
139+
List<SubClusterId> blackLists) throws YarnException, IOException {
138140

139141
// null checks and default-queue behavior
140142
validate(appContext);
@@ -169,7 +171,7 @@ public SubClusterId getHomeSubcluster(ApplicationSubmissionContext appContext,
169171
*/
170172
@Override
171173
public SubClusterId getReservationHomeSubcluster(ReservationSubmissionRequest request)
172-
throws YarnException {
174+
throws YarnException, IOException {
173175
if (request == null) {
174176
throw new FederationPolicyException("The ReservationSubmissionRequest cannot be null.");
175177
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.hadoop.yarn.server.federation.policies.router;
1919

20+
import java.io.IOException;
2021
import java.util.List;
2122

2223
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
@@ -49,7 +50,7 @@ public interface FederationRouterPolicy extends ConfigurableFederationPolicy {
4950
*/
5051
SubClusterId getHomeSubcluster(
5152
ApplicationSubmissionContext appSubmissionContext,
52-
List<SubClusterId> blackListSubClusters) throws YarnException;
53+
List<SubClusterId> blackListSubClusters) throws YarnException, IOException;
5354

5455
/**
5556
* Determines the sub-cluster where a ReservationSubmissionRequest should be
@@ -61,5 +62,5 @@ SubClusterId getHomeSubcluster(
6162
* @throws YarnException if the policy fails to choose a sub-cluster
6263
*/
6364
SubClusterId getReservationHomeSubcluster(
64-
ReservationSubmissionRequest request) throws YarnException;
65+
ReservationSubmissionRequest request) throws YarnException, IOException;
6566
}

0 commit comments

Comments
 (0)