-
Notifications
You must be signed in to change notification settings - Fork 9.2k
YARN-5871. [RESERVATION] Add support for reservation-based routing. #4632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
f126054
YARN-5871. [RESERVATION] Add support for reservation-based routing.
7394060
YARN-5871. [RESERVATION] Add support for reservation-based routing.
9f72ed7
YARN-5871. [RESERVATION] Add support for reservation-based routing.
d15a2ed
YARN-5871. Fix CheckStyle.
8bccea9
YARN-5871. Add Junit Test
e79188b
YARN-5871. Add Junit Test
6161eff
YARN-5871. Add Junit Test
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,15 +18,24 @@ | |
|
|
||
| package org.apache.hadoop.yarn.server.federation.policies.router; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Collections; | ||
|
|
||
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; | ||
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; | ||
| import org.apache.hadoop.yarn.api.records.ReservationId; | ||
| import org.apache.hadoop.yarn.conf.YarnConfiguration; | ||
| import org.apache.hadoop.yarn.exceptions.YarnException; | ||
| import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy; | ||
| import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; | ||
| import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; | ||
| import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; | ||
| import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; | ||
|
|
||
| /** | ||
| * Base abstract class for {@link FederationRouterPolicy} implementations, that | ||
|
|
@@ -63,4 +72,70 @@ public void validate(ApplicationSubmissionContext appSubmissionContext) | |
| } | ||
| } | ||
|
|
||
| protected abstract SubClusterId chooseSubCluster(String queue, | ||
| Map<SubClusterId, SubClusterInfo> preSelectSubClusters) throws YarnException; | ||
|
|
||
| protected Map<SubClusterId, SubClusterInfo> prefilterSubClusters( | ||
| ReservationId reservationId, Map<SubClusterId, SubClusterInfo> activeSubClusters) | ||
| throws YarnException { | ||
|
|
||
| // if a reservation exists limit scope to the sub-cluster this | ||
| // reservation is mapped to | ||
| if (reservationId != null) { | ||
|
|
||
| // note this might throw YarnException if the reservation is | ||
| // unknown. This is to be expected, and should be handled by | ||
| // policy invoker. | ||
| SubClusterId resSubCluster = | ||
| getPolicyContext().getFederationStateStoreFacade(). | ||
| getReservationHomeSubCluster(reservationId); | ||
|
|
||
| return Collections.singletonMap(resSubCluster, activeSubClusters.get(resSubCluster)); | ||
| } | ||
|
|
||
| return activeSubClusters; | ||
| } | ||
|
|
||
| @Override | ||
| public SubClusterId getHomeSubcluster(ApplicationSubmissionContext appContext, | ||
| List<SubClusterId> blackLists) throws YarnException { | ||
|
|
||
| // null checks and default-queue behavior | ||
| validate(appContext); | ||
|
|
||
| // apply filtering based on reservation location and active sub-clusters | ||
| Map<SubClusterId, SubClusterInfo> filteredSubClusters = prefilterSubClusters( | ||
| appContext.getReservationID(), getActiveSubclusters()); | ||
|
|
||
| FederationPolicyUtils.validateSubClusterAvailability( | ||
| new ArrayList<>(filteredSubClusters.keySet()), blackLists); | ||
|
|
||
| // remove black SubCluster | ||
| if (blackLists != null) { | ||
| blackLists.forEach(filteredSubClusters::remove); | ||
| } | ||
|
|
||
| // pick the chosen subCluster from the active ones | ||
| return chooseSubCluster(appContext.getQueue(), filteredSubClusters); | ||
| } | ||
|
|
||
|
|
||
| @Override | ||
| public SubClusterId getReservationHomeSubcluster(ReservationSubmissionRequest request) | ||
| throws YarnException { | ||
| if (request == null) { | ||
| throw new FederationPolicyException( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One line. |
||
| "The ReservationSubmissionRequest cannot be null."); | ||
| } | ||
|
|
||
| if (request.getQueue() == null) { | ||
| request.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); | ||
| } | ||
|
|
||
| // apply filtering based on reservation location and active sub-clusters | ||
| Map<SubClusterId, SubClusterInfo> filteredSubClusters = getActiveSubclusters(); | ||
|
|
||
| // pick the chosen subCluster from the active ones | ||
| return chooseSubCluster(request.getQueue(), filteredSubClusters); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,11 +22,9 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; | ||
| import org.apache.hadoop.yarn.exceptions.YarnException; | ||
| import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; | ||
| import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; | ||
| import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; | ||
| import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; | ||
|
|
@@ -50,53 +48,12 @@ public void reinitialize( | |
| setPolicyContext(federationPolicyContext); | ||
| } | ||
|
|
||
| /** | ||
| * Simply picks from alphabetically-sorted active subclusters based on the | ||
| * hash of quey name. Jobs of the same queue will all be routed to the same | ||
| * sub-cluster, as far as the number of active sub-cluster and their names | ||
| * remain the same. | ||
| * | ||
| * @param appSubmissionContext the {@link ApplicationSubmissionContext} that | ||
| * has to be routed to an appropriate subCluster for execution. | ||
| * | ||
| * @param blackListSubClusters the list of subClusters as identified by | ||
| * {@link SubClusterId} to blackList from the selection of the home | ||
| * subCluster. | ||
| * | ||
| * @return a hash-based chosen {@link SubClusterId} that will be the "home" | ||
| * for this application. | ||
| * | ||
| * @throws YarnException if there are no active subclusters. | ||
| */ | ||
| @Override | ||
| public SubClusterId getHomeSubcluster( | ||
| ApplicationSubmissionContext appSubmissionContext, | ||
| List<SubClusterId> blackListSubClusters) throws YarnException { | ||
|
|
||
| // throws if no active subclusters available | ||
| Map<SubClusterId, SubClusterInfo> activeSubclusters = | ||
| getActiveSubclusters(); | ||
|
|
||
| FederationPolicyUtils.validateSubClusterAvailability( | ||
| new ArrayList<SubClusterId>(activeSubclusters.keySet()), | ||
| blackListSubClusters); | ||
|
|
||
| if (blackListSubClusters != null) { | ||
|
|
||
| // Remove from the active SubClusters from StateStore the blacklisted ones | ||
| for (SubClusterId scId : blackListSubClusters) { | ||
| activeSubclusters.remove(scId); | ||
| } | ||
| } | ||
|
|
||
| validate(appSubmissionContext); | ||
|
|
||
| int chosenPosition = Math.abs( | ||
| appSubmissionContext.getQueue().hashCode() % activeSubclusters.size()); | ||
|
|
||
| List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet()); | ||
| protected SubClusterId chooseSubCluster(String queue, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have a javadoc in the parent? |
||
| Map<SubClusterId, SubClusterInfo> preSelectSubClusters) throws YarnException { | ||
| int chosenPosition = Math.abs(queue.hashCode() % preSelectSubClusters.size()); | ||
| List<SubClusterId> list = new ArrayList<>(preSelectSubClusters.keySet()); | ||
| Collections.sort(list); | ||
| return list.get(chosenPosition); | ||
| } | ||
|
|
||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we have a method that takes both arguments separate?