Skip to content

Commit ed20aa3

Browse files
author
slfan1989
committed
YARN-7707. BackPort [GPG] Policy generator framework.
1 parent 4bd873b commit ed20aa3

File tree

18 files changed

+1778
-2
lines changed

18 files changed

+1778
-2
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4366,6 +4366,37 @@ public static boolean isAclEnabled(Configuration conf) {
43664366
FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms";
43674367
public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = TimeUnit.MINUTES.toMillis(30);
43684368

4369+
public static final String FEDERATION_GPG_POLICY_PREFIX =
4370+
FEDERATION_GPG_PREFIX + "policy.generator.";
4371+
4372+
/** The interval at which the policy generator runs, default is one hour. */
4373+
public static final String GPG_POLICY_GENERATOR_INTERVAL_MS =
4374+
FEDERATION_GPG_POLICY_PREFIX + "interval-ms";
4375+
public static final long DEFAULT_GPG_POLICY_GENERATOR_INTERVAL_MS = TimeUnit.HOURS.toMillis(1);
4376+
4377+
/**
4378+
* The configured policy generator class, runs NoOpGlobalPolicy by
4379+
* default.
4380+
*/
4381+
public static final String GPG_GLOBAL_POLICY_CLASS = FEDERATION_GPG_POLICY_PREFIX + "class";
4382+
public static final String DEFAULT_GPG_GLOBAL_POLICY_CLASS =
4383+
"org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator." +
4384+
"NoOpGlobalPolicy";
4385+
4386+
/**
4387+
* Whether or not the policy generator is running in read only (won't modify
4388+
* policies), default is false.
4389+
*/
4390+
public static final String GPG_POLICY_GENERATOR_READONLY =
4391+
FEDERATION_GPG_POLICY_PREFIX + "readonly";
4392+
public static final boolean DEFAULT_GPG_POLICY_GENERATOR_READONLY = false;
4393+
4394+
/**
4395+
* Which sub-clusters the policy generator should blacklist.
4396+
*/
4397+
public static final String GPG_POLICY_GENERATOR_BLACKLIST =
4398+
FEDERATION_GPG_POLICY_PREFIX + "blacklist";
4399+
43694400
/**
43704401
* Connection and Read timeout from the Router to RM.
43714402
*/

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5376,4 +5376,36 @@
53765376
<value>false</value>
53775377
</property>
53785378

5379+
<property>
5380+
<description>
5381+
The interval at which the policy generator runs, default is one hour
5382+
</description>
5383+
<name>yarn.federation.gpg.policy.generator.interval-ms</name>
5384+
<value>1h</value>
5385+
</property>
5386+
5387+
<property>
5388+
<description>
5389+
The configured policy generator class, runs NoOpGlobalPolicy by default
5390+
</description>
5391+
<name>yarn.federation.gpg.policy.generator.class</name>
5392+
<value>org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.NoOpGlobalPolicy</value>
5393+
</property>
5394+
5395+
<property>
5396+
<description>
5397+
Whether or not the policy generator is running in read only (won't modify policies), default is false
5398+
</description>
5399+
<name>yarn.federation.gpg.policy.generator.readonly</name>
5400+
<value>false</value>
5401+
</property>
5402+
5403+
<property>
5404+
<description>
5405+
Which subclusters the gpg should blacklist, default is none
5406+
</description>
5407+
<name>yarn.federation.gpg.policy.generator.blacklist</name>
5408+
<value></value>
5409+
</property>
5410+
53795411
</configuration>

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
6666
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
6767
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
68+
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
6869
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
6970
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
7071
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -311,6 +312,18 @@ public SubClusterPolicyConfiguration getPolicyConfiguration(final String queue)
311312
}
312313
}
313314

315+
/**
316+
* Set a policy configuration into the state store.
317+
*
318+
* @param policyConf the policy configuration to set
319+
* @throws YarnException if the request is invalid/fails
320+
*/
321+
public void setPolicyConfiguration(SubClusterPolicyConfiguration policyConf)
322+
throws YarnException {
323+
stateStore.setPolicyConfiguration(
324+
SetSubClusterPolicyConfigurationRequest.newInstance(policyConf));
325+
}
326+
314327
/**
315328
* Get the policies that is represented as
316329
* {@link SubClusterPolicyConfiguration} for all currently active queues in

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@
6161
<scope>test</scope>
6262
</dependency>
6363

64+
<dependency>
65+
<groupId>org.apache.hadoop</groupId>
66+
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
67+
<scope>provided</scope>
68+
</dependency>
69+
6470
<dependency>
6571
<groupId>org.apache.hadoop</groupId>
6672
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
@@ -72,6 +78,12 @@
7278
<scope>test</scope>
7379
</dependency>
7480

81+
<dependency>
82+
<groupId>org.mockito</groupId>
83+
<artifactId>mockito-all</artifactId>
84+
<scope>test</scope>
85+
</dependency>
86+
7587
<dependency>
7688
<groupId>org.apache.hadoop</groupId>
7789
<artifactId>hadoop-yarn-server-common</artifactId>
@@ -93,6 +105,12 @@
93105
<plugin>
94106
<groupId>org.apache.rat</groupId>
95107
<artifactId>apache-rat-plugin</artifactId>
108+
<configuration>
109+
<excludes>
110+
<exclude>src/test/resources/schedulerInfo1.json</exclude>
111+
<exclude>src/test/resources/schedulerInfo2.json</exclude>
112+
</excludes>
113+
</configuration>
96114
</plugin>
97115
</plugins>
98116
</build>

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,8 @@ public interface GPGContext {
2828
FederationStateStoreFacade getStateStoreFacade();
2929

3030
void setStateStoreFacade(FederationStateStoreFacade facade);
31+
32+
GPGPolicyFacade getPolicyFacade();
33+
34+
void setPolicyFacade(GPGPolicyFacade facade);
3135
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
public class GPGContextImpl implements GPGContext {
2727

2828
private FederationStateStoreFacade facade;
29+
private GPGPolicyFacade policyFacade;
2930

3031
@Override
3132
public FederationStateStoreFacade getStateStoreFacade() {
@@ -38,4 +39,13 @@ public void setStateStoreFacade(
3839
this.facade = federationStateStoreFacade;
3940
}
4041

42+
@Override
43+
public GPGPolicyFacade getPolicyFacade(){
44+
return policyFacade;
45+
}
46+
47+
@Override
48+
public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){
49+
policyFacade = gpgPolicyfacade;
50+
}
4151
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations under
15+
* the License.
16+
*/
17+
18+
package org.apache.hadoop.yarn.server.globalpolicygenerator;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
22+
import org.apache.hadoop.yarn.exceptions.YarnException;
23+
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
24+
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
25+
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
26+
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
27+
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
28+
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
29+
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
30+
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
31+
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
import java.util.HashMap;
36+
import java.util.Map;
37+
38+
/**
39+
* A utility class for the GPG Policy Generator to read and write policies
40+
* into the FederationStateStore. Policy specific logic is abstracted away in
41+
* this class, so the PolicyGenerator can avoid dealing with policy
42+
* construction, reinitialization, and serialization.
43+
*
44+
* There are only two exposed methods:
45+
*
46+
* {@link #getPolicyManager(String)}
47+
* Gets the PolicyManager via queue name. Null if there is no policy
48+
* configured for the specified queue. The PolicyManager can be used to
49+
* extract the {@link FederationRouterPolicy} and
50+
* {@link FederationAMRMProxyPolicy}, as well as any policy specific parameters
51+
*
52+
* {@link #setPolicyManager(FederationPolicyManager)}
53+
* Sets the PolicyManager. If the policy configuration is the same, no change
54+
* occurs. Otherwise, the internal cache is updated and the new configuration
55+
* is written into the FederationStateStore
56+
*
57+
* This class assumes that the GPG is the only service
58+
* writing policies. Thus, the only FederationStateStore reads occur the first
59+
* time a queue policy is retrieved - after that, the GPG only writes to the
60+
* FederationStateStore.
61+
*
62+
* The class uses a PolicyManager cache and a SubClusterPolicyConfiguration
63+
* cache. The primary use for these caches are to serve reads, and to
64+
* identify when the PolicyGenerator has actually changed the policy
65+
* so unnecessary FederationStateStore policy writes can be avoided.
66+
*/
67+
68+
public class GPGPolicyFacade {
69+
70+
private static final Logger LOG =
71+
LoggerFactory.getLogger(GPGPolicyFacade.class);
72+
73+
private FederationStateStoreFacade stateStore;
74+
75+
private Map<String, FederationPolicyManager> policyManagerMap;
76+
private Map<String, SubClusterPolicyConfiguration> policyConfMap;
77+
78+
private boolean readOnly;
79+
80+
public GPGPolicyFacade(FederationStateStoreFacade stateStore,
81+
Configuration conf) {
82+
this.stateStore = stateStore;
83+
this.policyManagerMap = new HashMap<>();
84+
this.policyConfMap = new HashMap<>();
85+
this.readOnly =
86+
conf.getBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY,
87+
YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_READONLY);
88+
}
89+
90+
/**
91+
* Provides a utility for the policy generator to read the policy manager
92+
* from the FederationStateStore. Because the policy generator should be the
93+
* only component updating the policy, this implementation does not use the
94+
* reinitialization feature.
95+
*
96+
* @param queueName the name of the queue we want the policy manager for.
97+
* @return the policy manager responsible for the queue policy.
98+
* @throws YarnException exceptions from yarn servers.
99+
*/
100+
public FederationPolicyManager getPolicyManager(String queueName)
101+
throws YarnException {
102+
FederationPolicyManager policyManager = policyManagerMap.get(queueName);
103+
// If we don't have the policy manager cached, pull configuration
104+
// from the FederationStateStore to create and cache it
105+
if (policyManager == null) {
106+
try {
107+
// If we don't have the configuration cached, pull it
108+
// from the stateStore
109+
SubClusterPolicyConfiguration conf = policyConfMap.get(queueName);
110+
if (conf == null) {
111+
conf = stateStore.getPolicyConfiguration(queueName);
112+
}
113+
// If configuration is still null, it does not exist in the
114+
// FederationStateStore
115+
if (conf == null) {
116+
LOG.info("Read null policy for queue {}", queueName);
117+
return null;
118+
}
119+
policyManager =
120+
FederationPolicyUtils.instantiatePolicyManager(conf.getType());
121+
policyManager.setQueue(queueName);
122+
123+
// TODO there is currently no way to cleanly deserialize a policy
124+
// manager sub type from just the configuration
125+
if (policyManager instanceof WeightedLocalityPolicyManager) {
126+
WeightedPolicyInfo wpinfo =
127+
WeightedPolicyInfo.fromByteBuffer(conf.getParams());
128+
WeightedLocalityPolicyManager wlpmanager =
129+
(WeightedLocalityPolicyManager) policyManager;
130+
LOG.info("Updating policy for queue {} to configured weights router: "
131+
+ "{}, amrmproxy: {}", queueName,
132+
wpinfo.getRouterPolicyWeights(),
133+
wpinfo.getAMRMPolicyWeights());
134+
wlpmanager.setWeightedPolicyInfo(wpinfo);
135+
} else {
136+
LOG.warn("Warning: FederationPolicyManager of unsupported type {}, "
137+
+ "initialization may be incomplete ", policyManager.getClass());
138+
}
139+
140+
policyManagerMap.put(queueName, policyManager);
141+
policyConfMap.put(queueName, conf);
142+
} catch (YarnException e) {
143+
LOG.error("Error reading SubClusterPolicyConfiguration from state "
144+
+ "store for queue: {}", queueName);
145+
throw e;
146+
}
147+
}
148+
return policyManager;
149+
}
150+
151+
/**
152+
* Provides a utility for the policy generator to write a policy manager
153+
* into the FederationStateStore. The facade keeps a cache and will only write
154+
* into the FederationStateStore if the policy configuration has changed.
155+
*
156+
* @param policyManager The policy manager we want to update into the state
157+
* store. It contains policy information as well as
158+
* the queue name we will update for.
159+
* @throws YarnException exceptions from yarn servers.
160+
*/
161+
public void setPolicyManager(FederationPolicyManager policyManager)
162+
throws YarnException {
163+
if (policyManager == null) {
164+
LOG.warn("Attempting to set null policy manager");
165+
return;
166+
}
167+
// Extract the configuration from the policy manager
168+
String queue = policyManager.getQueue();
169+
SubClusterPolicyConfiguration conf;
170+
try {
171+
conf = policyManager.serializeConf();
172+
} catch (FederationPolicyInitializationException e) {
173+
LOG.warn("Error serializing policy for queue {}", queue);
174+
throw e;
175+
}
176+
if (conf == null) {
177+
// State store does not currently support setting a policy back to null
178+
// because it reads the queue name to set from the policy!
179+
LOG.warn("Skip setting policy to null for queue {} into state store",
180+
queue);
181+
return;
182+
}
183+
// Compare with configuration cache, if different, write the conf into
184+
// store and update our conf and manager cache
185+
if (!confCacheEqual(queue, conf)) {
186+
try {
187+
if (readOnly) {
188+
LOG.info("[read-only] Skipping policy update for queue {}", queue);
189+
return;
190+
}
191+
LOG.info("Updating policy for queue {} into state store", queue);
192+
stateStore.setPolicyConfiguration(conf);
193+
policyConfMap.put(queue, conf);
194+
policyManagerMap.put(queue, policyManager);
195+
} catch (YarnException e) {
196+
LOG.warn("Error writing SubClusterPolicyConfiguration to state "
197+
+ "store for queue: {}", queue);
198+
throw e;
199+
}
200+
} else {
201+
LOG.info("Setting unchanged policy - state store write skipped");
202+
}
203+
}
204+
205+
/**
206+
* @param queue the queue to check the cached policy configuration for
207+
* @param conf the new policy configuration
208+
* @return whether or not the conf is equal to the cached conf
209+
*/
210+
private boolean confCacheEqual(String queue,
211+
SubClusterPolicyConfiguration conf) {
212+
SubClusterPolicyConfiguration cachedConf = policyConfMap.get(queue);
213+
if (conf == null && cachedConf == null) {
214+
return true;
215+
} else if (conf != null && cachedConf != null) {
216+
if (conf.equals(cachedConf)) {
217+
return true;
218+
}
219+
}
220+
return false;
221+
}
222+
}

0 commit comments

Comments
 (0)