Skip to content

Commit cff2b62

Browse files
committed
YARN-11536. [Federation] Router CLI Supports Batch Save the SubClusterPolicyConfiguration Of Queues.
1 parent 82c8070 commit cff2b62

File tree

10 files changed

+344
-4
lines changed

10 files changed

+344
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.yarn.server.api.protocolrecords;
19+
20+
import org.apache.hadoop.classification.InterfaceAudience.Private;
21+
import org.apache.hadoop.classification.InterfaceAudience.Public;
22+
import org.apache.hadoop.classification.InterfaceStability.Unstable;
23+
import org.apache.hadoop.yarn.util.Records;
24+
25+
import java.util.List;
26+
27+
/**
28+
* In Federation mode,
29+
* we will support batch save queues policies to FederationStateStore.
30+
*/
31+
@Private
32+
@Unstable
33+
public abstract class BatchSaveFederationQueuePoliciesRequest {
34+
35+
@Private
36+
@Unstable
37+
public static BatchSaveFederationQueuePoliciesRequest newInstance(
38+
List<FederationQueueWeight> federationQueueWeights) {
39+
BatchSaveFederationQueuePoliciesRequest request =
40+
Records.newRecord(BatchSaveFederationQueuePoliciesRequest.class);
41+
request.setFederationQueueWeights(federationQueueWeights);
42+
return request;
43+
}
44+
45+
@Public
46+
@Unstable
47+
public abstract List<FederationQueueWeight> getFederationQueueWeights();
48+
49+
@Private
50+
@Unstable
51+
public abstract void setFederationQueueWeights(
52+
List<FederationQueueWeight> federationQueueWeights);
53+
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,19 @@ public static FederationQueueWeight newInstance(String routerWeight,
7575
return federationQueueWeight;
7676
}
7777

78+
@Private
79+
@Unstable
80+
public static FederationQueueWeight newInstance(String routerWeight,
81+
String amrmWeight, String headRoomAlpha, String queue, String policyManagerClassName) {
82+
FederationQueueWeight federationQueueWeight = Records.newRecord(FederationQueueWeight.class);
83+
federationQueueWeight.setRouterWeight(routerWeight);
84+
federationQueueWeight.setAmrmWeight(amrmWeight);
85+
federationQueueWeight.setHeadRoomAlpha(headRoomAlpha);
86+
federationQueueWeight.setQueue(queue);
87+
federationQueueWeight.setPolicyManagerClassName(policyManagerClassName);
88+
return federationQueueWeight;
89+
}
90+
7891
@Public
7992
@Unstable
8093
public abstract String getRouterWeight();
@@ -166,4 +179,20 @@ public static void checkHeadRoomAlphaValid(String headRoomAlpha) throws YarnExce
166179
protected static boolean isNumeric(String value) {
167180
return NumberUtils.isCreatable(value);
168181
}
182+
183+
@Public
184+
@Unstable
185+
public abstract String getQueue();
186+
187+
@Public
188+
@Unstable
189+
public abstract void setQueue(String queue);
190+
191+
@Public
192+
@Unstable
193+
public abstract String getPolicyManagerClassName();
194+
195+
@Public
196+
@Unstable
197+
public abstract void setPolicyManagerClassName(String policyManagerClassName);
169198
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,14 @@ message SaveFederationQueuePolicyResponseProto {
180180
required string message = 1;
181181
}
182182

183+
message BatchSaveFederationQueuePoliciesRequestProto {
184+
repeated FederationQueueWeightProto federationQueueWeights = 1;
185+
}
186+
187+
message BatchSaveFederationQueuePoliciesResponseProto {
188+
required string message = 1;
189+
}
190+
183191
//////////////////////////////////////////////////////////////////
184192
///////////// RM Failover related records ////////////////////////
185193
//////////////////////////////////////////////////////////////////

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,8 @@ message FederationQueueWeightProto {
444444
optional string routerWeight = 1;
445445
optional string amrmWeight = 2;
446446
optional string headRoomAlpha = 3;
447+
optional string queue = 4;
448+
optional string policyManagerClassName = 5;
447449
}
448450

449451
////////////////////////////////////////////////////////////////////////

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ public class RouterCLI extends Configured implements Tool {
7171
"set the state of the subCluster to SC_LOST."))
7272
// Command2: policy
7373
.put("-policy", new UsageInfo(
74-
"[-s|--save [queue;router weight;amrm weight;headroomalpha]]",
74+
"[-s|--save [queue;router weight;amrm weight;headroomalpha]] " +
75+
"[-bs|--batch-save [--format xml,json] [-f|--input-file fileName]]",
7576
"We provide a set of commands for Policy:" +
7677
" Include list policies, save policies, batch save policies. " +
7778
" (Note: The policy type will be directly read from the" +
@@ -102,8 +103,18 @@ public class RouterCLI extends Configured implements Tool {
102103
// Command2: policy
103104
// save policy
104105
private static final String OPTION_S = "s";
106+
107+
private static final String OPTION_BATCH_S = "bs";
108+
105109
private static final String OPTION_SAVE = "save";
110+
111+
private static final String OPTION_BATCH_SAVE = "batch-save";
112+
113+
private static final String OPTION_FORMAT = "format";
114+
106115
private static final String CMD_POLICY = "-policy";
116+
private static final String FORMAT_XML = "xml";
117+
private static final String FORMAT_JSON = "json";
107118

108119
public RouterCLI() {
109120
super();
@@ -161,7 +172,8 @@ private static void printHelp() {
161172
.append("The full syntax is: \n\n")
162173
.append("routeradmin\n")
163174
.append(" [-deregisterSubCluster [-sc|--subClusterId [subCluster Id]]\n")
164-
.append(" [-policy [-s|--save [queue;router weight;amrm weight;headroomalpha]]\n")
175+
.append(" [-policy [-s|--save [queue;router weight;amrm weight;headroomalpha] " +
176+
"[-bs|--batch-save [--format xml,json] [-f|--input-file fileName]]]\n")
165177
.append(" [-help [cmd]]").append("\n");
166178
StringBuilder helpBuilder = new StringBuilder();
167179
System.out.println(summary);
@@ -304,7 +316,20 @@ private int handlePolicy(String[] args)
304316
"We will save the policy information of the queue, " +
305317
"including queue and weight information");
306318
saveOpt.setOptionalArg(true);
319+
Option batchSaveOpt = new Option(OPTION_BATCH_S, OPTION_BATCH_SAVE, false,
320+
"We will save queue policies in bulk, " +
321+
"where users can provide XML or JSON files containing the policies. " +
322+
"This command will parse the file contents and store the results " +
323+
"in the FederationStateStore.");
324+
Option formatOpt = new Option(null, "format", true,
325+
"Users can specify the file format using this option. " +
326+
"Currently, there are two supported file formats: XML and JSON. " +
327+
"These files contain the policy information for storing queue policies.");
328+
formatOpt.setOptionalArg(true);
329+
307330
opts.addOption(saveOpt);
331+
opts.addOption(batchSaveOpt);
332+
opts.addOption(formatOpt);
308333

309334
// Parse command line arguments.
310335
CommandLine cliParser;
@@ -317,6 +342,7 @@ private int handlePolicy(String[] args)
317342
}
318343

319344
// Try to parse the cmd save.
345+
// Save a single queue policy
320346
if (cliParser.hasOption(OPTION_S) || cliParser.hasOption(OPTION_SAVE)) {
321347
String policy = cliParser.getOptionValue(OPTION_S);
322348
if (StringUtils.isBlank(policy)) {
@@ -325,6 +351,21 @@ private int handlePolicy(String[] args)
325351
return handleSavePolicy(policy);
326352
}
327353

354+
// Save Queue Policies in Batches
355+
if (cliParser.hasOption(OPTION_BATCH_S) || cliParser.hasOption(OPTION_BATCH_SAVE)) {
356+
if (cliParser.hasOption(OPTION_FORMAT)) {
357+
String format = cliParser.getOptionValue(OPTION_FORMAT);
358+
if (StringUtils.isBlank(format) ||
359+
StringUtils.equalsAnyIgnoreCase(format, FORMAT_XML, FORMAT_JSON)) {
360+
System.out.println("We currently only support policy configuration files " +
361+
"in XML and JSON formats.");
362+
return EXIT_ERROR;
363+
}
364+
System.out.println("format:" + format);
365+
}
366+
System.out.println("We need to specify the format of the input file.");
367+
}
368+
328369
return EXIT_ERROR;
329370
}
330371

@@ -342,6 +383,12 @@ private int handleSavePolicy(String policy) {
342383
}
343384
}
344385

386+
private int handBatchSavePolicies(String format, String policyFile) {
387+
LOG.info("Batch Save Federation Policies. Format = {}, PolicyFile = {}.",
388+
format, policyFile);
389+
return 1;
390+
}
391+
345392
/**
346393
* We will parse the policy, and it has specific formatting requirements.
347394
*
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.yarn.client.util;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
23+
/**
24+
* This is a memory paging utility that is used to paginate a dataset.
25+
*
26+
* This class is designed to support batch entry queue policies.
27+
*/
28+
public class MemoryPageUtils<T> {
29+
private List<T> dataList;
30+
private int pageSize;
31+
32+
/**
33+
* MemoryPageUtils constructor.
34+
*
35+
* @param pageSize Number of records returned per page.
36+
*/
37+
public MemoryPageUtils(int pageSize) {
38+
this.pageSize = pageSize;
39+
this.dataList = new ArrayList<>();
40+
}
41+
42+
public void addToMemory(T data) {
43+
dataList.add(data);
44+
}
45+
46+
public List<T> readFromMemory(int pageNumber) {
47+
int startIndex = pageNumber * pageSize;
48+
int endIndex = Math.min(startIndex + pageSize, dataList.size());
49+
if (startIndex >= dataList.size()) {
50+
return null;
51+
}
52+
return dataList.subList(startIndex, endIndex);
53+
}
54+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.yarn.client;
19+
20+
import org.apache.hadoop.yarn.client.util.MemoryPageUtils;
21+
import org.junit.Test;
22+
23+
import java.util.List;
24+
25+
import static org.junit.Assert.assertEquals;
26+
import static org.junit.Assert.assertNull;
27+
28+
/**
29+
* The purpose of this class is to test
30+
* whether the memory paging function is as expected.
31+
*/
32+
public class TestMemoryPageUtils {
33+
34+
@Test
35+
public void testMemoryPage() {
36+
// We design such a unit test for testing pagination, and we prepare 6 pieces of policy data.
37+
// If 1 page is followed by 5 pieces of data, we will get 2 pages.
38+
// Page 1 will contain 5 records and page 2 will contain 1 record.
39+
MemoryPageUtils<String> policies = new MemoryPageUtils<>(5);
40+
policies.addToMemory("policy-1");
41+
policies.addToMemory("policy-2");
42+
policies.addToMemory("policy-3");
43+
policies.addToMemory("policy-4");
44+
policies.addToMemory("policy-5");
45+
policies.addToMemory("policy-6");
46+
47+
// Page 1 will return 5 records.
48+
List<String> firstPage = policies.readFromMemory(0);
49+
assertEquals(5, firstPage.size());
50+
51+
// Page 2 will return 1 records
52+
List<String> secondPage = policies.readFromMemory(1);
53+
assertEquals(1, secondPage.size());
54+
55+
// Page 10, This is a wrong number of pages, we will get null.
56+
List<String> tenPage = policies.readFromMemory(10);
57+
assertNull(tenPage);
58+
}
59+
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ private DeregisterSubClusterResponse generateAllSubClusterData() {
120120
public void testHelp() throws Exception {
121121
ByteArrayOutputStream dataOut = new ByteArrayOutputStream();
122122
ByteArrayOutputStream dataErr = new ByteArrayOutputStream();
123-
System.setOut(new PrintStream(dataOut));
124-
System.setErr(new PrintStream(dataErr));
123+
// System.setOut(new PrintStream(dataOut));
124+
// System.setErr(new PrintStream(dataErr));
125125

126126
String[] args = {"-help"};
127127
rmAdminCLI.run(args);
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?xml version="1.0"?>
2+
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
3+
4+
<!--
5+
Licensed to the Apache Software Foundation (ASF) under one or more
6+
contributor license agreements. See the NOTICE file distributed with
7+
this work for additional information regarding copyright ownership.
8+
The ASF licenses this file to You under the Apache License, Version 2.0
9+
(the "License"); you may not use this file except in compliance with
10+
the License. You may obtain a copy of the License at
11+
12+
http://www.apache.org/licenses/LICENSE-2.0
13+
14+
Unless required by applicable law or agreed to in writing, software
15+
distributed under the License is distributed on an "AS IS" BASIS,
16+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
See the License for the specific language governing permissions and
18+
limitations under the License.
19+
-->
20+
21+
<federationWeights>
22+
<weight>
23+
<queue>
24+
<name>root.a</name>
25+
<amrmPolicyWeights>
26+
<subClusterIdInfo>
27+
<id>SC-1</id>
28+
<weight>0.7</weight>
29+
</subClusterIdInfo>
30+
<subClusterIdInfo>
31+
<id>SC-2</id>
32+
<weight>0.3</weight>
33+
</subClusterIdInfo>
34+
</amrmPolicyWeights>
35+
<routerPolicyWeights>
36+
<subClusterIdInfo>
37+
<id>SC-1</id>
38+
<weight>0.6</weight>
39+
</subClusterIdInfo>
40+
<subClusterIdInfo>
41+
<id>SC-2</id>
42+
<weight>0.4</weight>
43+
</subClusterIdInfo>
44+
</routerPolicyWeights>
45+
<headroomAlpha>1.0</headroomAlpha>
46+
</queue>
47+
</weight>
48+
</federationWeights>

0 commit comments

Comments
 (0)