2525
2626import org .apache .hadoop .conf .Configuration ;
2727import org .apache .hadoop .yarn .api .records .ApplicationSubmissionContext ;
28+ import org .apache .hadoop .yarn .api .protocolrecords .ReservationSubmissionRequest ;
2829import org .apache .hadoop .yarn .conf .YarnConfiguration ;
2930import org .apache .hadoop .yarn .exceptions .YarnException ;
3031import org .apache .hadoop .yarn .server .federation .policies .exceptions .FederationPolicyException ;
@@ -136,7 +137,7 @@ public SubClusterId getHomeSubcluster(
136137
137138 if (appSubmissionContext == null ) {
138139 throw new FederationPolicyException (
139- "The ApplicationSubmissionContext " + " cannot be null." );
140+ "The ApplicationSubmissionContext cannot be null." );
140141 }
141142
142143 String queue = appSubmissionContext .getQueue ();
@@ -148,51 +149,7 @@ public SubClusterId getHomeSubcluster(
148149 queue = YarnConfiguration .DEFAULT_QUEUE_NAME ;
149150 }
150151
151- // the facade might cache this request, based on its parameterization
152- SubClusterPolicyConfiguration configuration = null ;
153-
154- try {
155- configuration = federationFacade .getPolicyConfiguration (queue );
156- } catch (YarnException e ) {
157- String errMsg = "There is no policy configured for the queue: " + queue
158- + ", falling back to defaults." ;
159- LOG .warn (errMsg , e );
160- }
161-
162- // If there is no policy configured for this queue, fallback to the baseline
163- // policy that is configured either in the store or via XML config (and
164- // cached)
165- if (configuration == null ) {
166- LOG .warn ("There is no policies configured for queue: " + queue + " we"
167- + " fallback to default policy for: "
168- + YarnConfiguration .DEFAULT_FEDERATION_POLICY_KEY );
169-
170- queue = YarnConfiguration .DEFAULT_FEDERATION_POLICY_KEY ;
171- try {
172- configuration = federationFacade .getPolicyConfiguration (queue );
173- } catch (YarnException e ) {
174- String errMsg = "Cannot retrieve policy configured for the queue: "
175- + queue + ", falling back to defaults." ;
176- LOG .warn (errMsg , e );
177-
178- }
179- }
180-
181- // the fallback is not configure via store, but via XML, using
182- // previously loaded configuration.
183- if (configuration == null ) {
184- configuration =
185- cachedConfs .get (YarnConfiguration .DEFAULT_FEDERATION_POLICY_KEY );
186- }
187-
188- // if the configuration has changed since last loaded, reinit the policy
189- // based on current configuration
190- if (!cachedConfs .containsKey (queue )
191- || !cachedConfs .get (queue ).equals (configuration )) {
192- singlePolicyReinit (policyMap , cachedConfs , queue , configuration );
193- }
194-
195- FederationRouterPolicy policy = policyMap .get (queue );
152+ FederationRouterPolicy policy = getFederationRouterPolicy (cachedConfs , policyMap , queue );
196153 if (policy == null ) {
197154 // this should never happen, as the to maps are updated together
198155 throw new FederationPolicyException ("No FederationRouterPolicy found "
@@ -262,4 +219,92 @@ public synchronized void reset() {
262219
263220 }
264221
222+ /**
223+ * This method provides a wrapper of all policy functionalities for routing a
224+ * reservation. Internally it manages configuration changes, and policy
225+ * init/reinit.
226+ *
227+ * @param request the reservation to route.
228+ *
229+ * @return the id of the subcluster that will be the "home" for this
230+ * reservation.
231+ *
232+ * @throws YarnException if there are issues initializing policies, or no
233+ * valid sub-cluster id could be found for this reservation.
234+ */
235+ public SubClusterId getReservationHomeSubCluster (
236+ ReservationSubmissionRequest request ) throws YarnException {
237+
238+ // the maps are concurrent, but we need to protect from reset()
239+ // reinitialization mid-execution by creating a new reference local to this
240+ // method.
241+ Map <String , SubClusterPolicyConfiguration > cachedConfs = globalConfMap ;
242+ Map <String , FederationRouterPolicy > policyMap = globalPolicyMap ;
243+
244+ if (request == null ) {
245+ throw new FederationPolicyException (
246+ "The ReservationSubmissionRequest cannot be null." );
247+ }
248+
249+ String queue = request .getQueue ();
250+ FederationRouterPolicy policy = getFederationRouterPolicy (cachedConfs , policyMap , queue );
251+
252+ if (policy == null ) {
253+ // this should never happen, as the to maps are updated together
254+ throw new FederationPolicyException ("No FederationRouterPolicy found "
255+ + "for queue: " + request .getQueue () + " (while routing "
256+ + "reservation: " + request .getReservationId () + ") "
257+ + "and no default specified." );
258+ }
259+
260+ return policy .getReservationHomeSubcluster (request );
261+ }
262+
263+ private FederationRouterPolicy getFederationRouterPolicy (
264+ Map <String , SubClusterPolicyConfiguration > cachedConfiguration ,
265+ Map <String , FederationRouterPolicy > policyMap , String queue )
266+ throws FederationPolicyInitializationException {
267+
268+ // the facade might cache this request, based on its parameterization
269+ SubClusterPolicyConfiguration configuration = null ;
270+ String copyQueue = queue ;
271+
272+ try {
273+ configuration = federationFacade .getPolicyConfiguration (copyQueue );
274+ } catch (YarnException e ) {
275+ LOG .warn ("There is no policy configured for the queue: {}, falling back to defaults." ,
276+ copyQueue , e );
277+ }
278+
279+ // If there is no policy configured for this queue, fallback to the baseline
280+ // policy that is configured either in the store or via XML config (and cached)
281+ if (configuration == null ) {
282+ final String policyKey = YarnConfiguration .DEFAULT_FEDERATION_POLICY_KEY ;
283+ LOG .warn ("There is no policies configured for queue: {} " +
284+ "we fallback to default policy for: {}. " , copyQueue , policyKey );
285+ copyQueue = YarnConfiguration .DEFAULT_FEDERATION_POLICY_KEY ;
286+ try {
287+ configuration = federationFacade .getPolicyConfiguration (copyQueue );
288+ } catch (YarnException e ) {
289+ LOG .warn ("Cannot retrieve policy configured for the queue: {}, falling back to defaults." ,
290+ copyQueue , e );
291+ }
292+ }
293+
294+ // the fallback is not configure via store, but via XML, using
295+ // previously loaded configuration.
296+ if (configuration == null ) {
297+ configuration = cachedConfiguration .get (YarnConfiguration .DEFAULT_FEDERATION_POLICY_KEY );
298+ }
299+
300+ // if the configuration has changed since last loaded, reinit the policy
301+ // based on current configuration
302+ SubClusterPolicyConfiguration policyConfiguration =
303+ cachedConfiguration .getOrDefault (copyQueue , null );
304+ if (policyConfiguration == null || !policyConfiguration .equals (configuration )) {
305+ singlePolicyReinit (policyMap , cachedConfiguration , copyQueue , configuration );
306+ }
307+
308+ return policyMap .get (copyQueue );
309+ }
265310}
0 commit comments