-
Notifications
You must be signed in to change notification settings - Fork 9.2k
YARN-11235. Refactor Policy Code and Define getReservationHomeSubcluster #4656
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@goiri This is the first pr after YARN-5871 Split. Please help to review the code, thank you very much! |
|
🎊 +1 overall
This message was automatically generated. |
| public SubClusterId getReservationHomeSubcluster(ReservationSubmissionRequest request) | ||
| throws YarnException { | ||
| if (request == null) { | ||
| throw new FederationPolicyException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
| nodeRequest = rr; | ||
| } catch (YarnException e) { | ||
| LOG.error("Cannot resolve node : {}", e.getLocalizedMessage()); | ||
| LOG.error("Cannot resolve node.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer not having the full exception. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion, I will modify the code!
| } | ||
| protected SubClusterId chooseSubCluster( | ||
| String queue, Map<SubClusterId, SubClusterInfo> preSelectSubClusters) throws YarnException { | ||
| if(preSelectSubClusters == null || preSelectSubClusters.size() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space after if
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isEmpty()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
| // changes dynamically (and this would unfairly spread the load to | ||
| // sub-clusters adjacent to an inactive one), hence we need to count/scan | ||
| // the list and based on weight pick the next sub-cluster. | ||
| Map<SubClusterIdInfo, Float> weights = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
single line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will modify the code.
| } | ||
| if (entry.getKey() != null | ||
| && activeSubclusters.containsKey(entry.getKey().toId())) { | ||
| if (entry.getKey() != null && preSelectSubClusters.containsKey(entry.getKey().toId())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your suggestion.
| if (pickedIndex == -1) { | ||
| throw new FederationPolicyException( | ||
| "No positive weight found on active subclusters"); | ||
| "No positive weight found on active subClusters."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would leave the old capitalization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
| import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; | ||
| import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; | ||
| import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
|
🎊 +1 overall
This message was automatically generated. |
|
@goiri Please help to review the code again, thank you very much! |
| try { | ||
| configuration = federationFacade.getPolicyConfiguration(copyQueue); | ||
| } catch (YarnException e) { | ||
| LOG.warn("There is no policy configured for the queue: {}, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can fit the string into a single line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your help reviewing the code, I will modify the code.
| // cached) | ||
| if (configuration == null) { | ||
| final String policyKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; | ||
| LOG.warn("There is no policies configured for queue: {} " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can rearrange so the string is a single line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
|
|
||
| // if the configuration has changed since last loaded, reinit the policy | ||
| // based on current configuration | ||
| if (!cachedConfiguration.containsKey(copyQueue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
| // if the configuration has changed since last loaded, reinit the policy | ||
| // based on current configuration | ||
| if (!cachedConfiguration.containsKey(copyQueue) | ||
| || !cachedConfiguration.get(copyQueue).equals(configuration)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would extract to make it more readable.
| nodeRequest = rr; | ||
| } catch (YarnException e) { | ||
| LOG.error("Cannot resolve node : {}", e.getLocalizedMessage()); | ||
| LOG.error("Cannot resolve node."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't it make sense to keep output the message at least?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it and use e.getMessage() .
| } catch (YarnException e) { | ||
| LOG.error("Validating resource requests failed, Falling back to " | ||
| + "WeightedRandomRouterPolicy placement: " + e.getMessage()); | ||
| + "WeightedRandomRouterPolicy placement.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single line sring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
| /** | ||
| * This implements a policy that interprets "weights" as a ordered list of | ||
| * preferences among sub-clusters. Highest weight among active subclusters is | ||
| * preferences among sub-clusters. Highest weight among active subClusters is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We keep changing the capitalization.
I think "Subcluster" is fine; no need to capitalize the C.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
|
💔 -1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
|
@goiri Please help to review the code again, thank you very much! |
| Map<SubClusterId, SubClusterInfo> filteredSubClusters = prefilterSubClusters( | ||
| appContext.getReservationID(), getActiveSubclusters()); | ||
|
|
||
| FederationPolicyUtils.validateSubClusterAvailability( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have validateSubClusterAvailability taking Collection so we don't need temp data structures?
| FederationStateStoreFacade fedFacade = | ||
| FederationStateStoreFacade.getInstance(); | ||
| YarnConfiguration conf = new YarnConfiguration(); | ||
| conf.set(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, "0"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setInt?
| public FederationStateStoreFacade getMemoryFacade() throws YarnException { | ||
|
|
||
| // setting up a store and its facade (with caching off) | ||
| FederationStateStoreFacade fedFacade = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One line
| import org.junit.Assert; | ||
| import org.junit.Test; | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
|
|
||
| @Test | ||
| public void testNullReservationContext() throws Exception { | ||
| FederationRouterPolicy policy = ((FederationRouterPolicy) getPolicy()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Too many spaces after the =
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
| Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>(); | ||
| Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>(); | ||
|
|
||
| long now = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Time.now()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
| .getHomeSubcluster(getApplicationSubmissionContext(), null); | ||
| fail(); | ||
| } catch (YarnException ex) { | ||
| Assert.assertTrue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LambdaTestUtils?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion, I will modify the code.
| when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); | ||
| when(sci.getSubClusterId()).thenReturn(sc.toId()); | ||
| getActiveSubclusters().put(sc.toId(), sci); | ||
| long now = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Time.now()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
...src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
Outdated
Show resolved
Hide resolved
|
🎊 +1 overall
This message was automatically generated. |
|
@goiri Please help to review the code again, thank you very much! I will follow up with YARN-11236. |
|
💔 -1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
.../java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClustersInfoResponse.java
Show resolved
Hide resolved
| public static GetSubClustersInfoResponse newInstance( | ||
| Collection<SubClusterInfo> subClusters) { | ||
| GetSubClustersInfoResponse subClusterInfos = | ||
| Records.newRecord(GetSubClustersInfoResponse.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation doesn't seem correct.
I think it even fits in one line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
.../test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
Show resolved
Hide resolved
| when(sci.getSubClusterId()).thenReturn(sc.toId()); | ||
| getActiveSubclusters().put(sc.toId(), sci); | ||
| long now = Time.now(); | ||
| SubClusterInfo federationSubClusterInfo = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure the indetation is correct.
Let's do:
SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(
sc.toId(), "dns1:80", "dns1:81", "dns1:82", "dns1:83",
now - 1000, SubClusterState.SC_RUNNING, now - 2000, generateClusterMetricsInfo(i));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
| when(sci.getSubClusterId()).thenReturn(sc.toId()); | ||
| getActiveSubclusters().put(sc.toId(), sci); | ||
| long now = Time.now(); | ||
| SubClusterInfo federationSubClusterInfo = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
| if (activeSubClusters == null) { | ||
| activeSubClusters = new HashMap<>(); | ||
| } | ||
| GetSubClustersInfoResponse response = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
|
💔 -1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
|
@goiri Please help to review the code again, thank you very much! |
This reverts commit 874c9c5.
|
🎊 +1 overall
This message was automatically generated. |
|
|
||
| FederationPolicyUtils.validateSubClusterAvailability( | ||
| list, blackListSubClusters); | ||
| FederationPolicyUtils.validateSubClusterAvailability(list, blackListSubClusters); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could just do:
FederationPolicyUtils.validateSubClusterAvailability(activeSubclusters.keySet(), blackListSubClusters);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion, I will modify the code.
|
@goiri Please help to review the code again, thank you very much! |
|
🎊 +1 overall
This message was automatically generated. |
|
@goiri Thank you for helping to review the code, I will follow up with YARN-11236 after this pr is completed. |
JIRA:YARN-11235. Refactor Policy Code and Define getReservationHomeSubcluster.