Skip to content

YARN-10891. Extend QueueInfo with max-parallel-apps in CS. #3314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public static QueueInfo newInstance(String queueName,
List<QueueInfo> childQueues, List<ApplicationReport> applications,
QueueState queueState, Set<String> accessibleNodeLabels,
String defaultNodeLabelExpression, QueueStatistics queueStatistics,
boolean preemptionDisabled, float weight) {
boolean preemptionDisabled, float weight,
int maxParallelApps) {
QueueInfo queueInfo = Records.newRecord(QueueInfo.class);
queueInfo.setQueueName(queueName);
queueInfo.setQueuePath(queuePath);
Expand All @@ -75,6 +76,7 @@ public static QueueInfo newInstance(String queueName,
queueInfo.setQueueStatistics(queueStatistics);
queueInfo.setPreemptionDisabled(preemptionDisabled);
queueInfo.setWeight(weight);
queueInfo.setMaxParallelApps(maxParallelApps);
return queueInfo;
}

Expand All @@ -86,14 +88,14 @@ public static QueueInfo newInstance(String queueName,
List<QueueInfo> childQueues, List<ApplicationReport> applications,
QueueState queueState, Set<String> accessibleNodeLabels,
String defaultNodeLabelExpression, QueueStatistics queueStatistics,
boolean preemptionDisabled, float weight,
boolean preemptionDisabled, float weight, int maxParallelApps,
Map<String, QueueConfigurations> queueConfigurations) {
QueueInfo queueInfo = QueueInfo.newInstance(queueName, queuePath, capacity,
maximumCapacity, currentCapacity,
childQueues, applications,
queueState, accessibleNodeLabels,
defaultNodeLabelExpression, queueStatistics,
preemptionDisabled, weight);
preemptionDisabled, weight, maxParallelApps);
queueInfo.setQueueConfigurations(queueConfigurations);
return queueInfo;
}
Expand All @@ -106,15 +108,15 @@ public static QueueInfo newInstance(String queueName,
List<QueueInfo> childQueues, List<ApplicationReport> applications,
QueueState queueState, Set<String> accessibleNodeLabels,
String defaultNodeLabelExpression, QueueStatistics queueStatistics,
boolean preemptionDisabled, float weight,
boolean preemptionDisabled, float weight, int maxParallelApps,
Map<String, QueueConfigurations> queueConfigurations,
boolean intraQueuePreemptionDisabled) {
QueueInfo queueInfo = QueueInfo.newInstance(queueName, queuePath, capacity,
maximumCapacity, currentCapacity,
childQueues, applications,
queueState, accessibleNodeLabels,
defaultNodeLabelExpression, queueStatistics,
preemptionDisabled, weight, queueConfigurations);
preemptionDisabled, weight, maxParallelApps, queueConfigurations);
queueInfo.setIntraQueuePreemptionDisabled(intraQueuePreemptionDisabled);
return queueInfo;
}
Expand Down Expand Up @@ -166,6 +168,18 @@ public static QueueInfo newInstance(String queueName,
@Private
@Unstable
public abstract void setWeight(float weight);

/**
* Get the <em>configured max parallel apps</em> of the queue.
* @return <em>configured max parallel apps</em> of the queue
*/
@Public
@Stable
public abstract int getMaxParallelApps();

@Private
@Unstable
public abstract void setMaxParallelApps(int maxParallelApps);

/**
* Get the <em>maximum capacity</em> of the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ message QueueInfoProto {
optional bool intraQueuePreemptionDisabled = 13;
optional float weight = 14;
optional string queuePath = 15;
optional int32 maxParallelApps = 16;
}

message QueueConfigurationsProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ private void printQueueInfo(PrintWriter writer, QueueInfo queueInfo) {
writer.println(df.format(queueInfo.getMaximumCapacity() * 100) + "%");
writer.print("\tWeight : ");
writer.println(df.format(queueInfo.getWeight()));
writer.print("\tMaximum Parallel Apps : ");
writer.println(queueInfo.getMaxParallelApps());
writer.print("\tDefault Node Label expression : ");
String nodeLabelExpression = queueInfo.getDefaultNodeLabelExpression();
nodeLabelExpression =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ public List<NodeReport> createFakeNodeReports() {
public QueueInfo createFakeQueueInfo() {
return QueueInfo.newInstance("root", "root", 100f, 100f, 50f, null,
createFakeAppReports(), QueueState.RUNNING, null,
null, null, false, -1.0f,
null, null, false, -1.0f, 10,
null, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1722,8 +1722,8 @@ public void testGetQueueInfo() throws Exception {
newInstance("queueA", "root.queueA",
0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, nodeLabels,
"GPU", null, false, -1.0f, null,
false);
"GPU", null, false, -1.0f, 10,
null, false);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
assertEquals(0, result);
Expand All @@ -1738,6 +1738,7 @@ public void testGetQueueInfo() throws Exception {
pw.println("\tCurrent Capacity : " + "50.00%");
pw.println("\tMaximum Capacity : " + "80.00%");
pw.println("\tWeight : " + "-1.00");
pw.println("\tMaximum Parallel Apps : " + "10");
pw.println("\tDefault Node Label expression : " + "GPU");
pw.println("\tAccessible Node Labels : " + "JDK_7,GPU");
pw.println("\tPreemption : " + "enabled");
Expand Down Expand Up @@ -1895,7 +1896,7 @@ public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
newInstance("queueA", "root.queueA",
0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, null, null, null,
true, -1.0f, null, true);
true, -1.0f, 10, null, true);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
assertEquals(0, result);
Expand All @@ -1910,6 +1911,7 @@ public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
pw.println("\tCurrent Capacity : " + "50.00%");
pw.println("\tMaximum Capacity : " + "80.00%");
pw.println("\tWeight : " + "-1.00");
pw.println("\tMaximum Parallel Apps : " + "10");
pw.println("\tDefault Node Label expression : "
+ NodeLabel.DEFAULT_NODE_LABEL_PARTITION);
pw.println("\tAccessible Node Labels : ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,18 @@ public void setWeight(float weight) {
builder.setWeight(weight);
}

@Override
public int getMaxParallelApps() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasMaxParallelApps()) ? p.getMaxParallelApps() : -1;
}

@Override
public void setMaxParallelApps(int weight) {
maybeInitBuilder();
builder.setMaxParallelApps(weight);
}

@Override
public void setChildQueues(List<QueueInfo> childQueues) {
if (childQueues == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ public static void setup() throws Exception {
typeValueCache.put(QueueInfo.class, QueueInfo.
newInstance("root", "root", 1.0f,
1.0f, 0.1f, null, null, QueueState.RUNNING, ImmutableSet.of("x", "y"),
"x && y", null, false, -1.0f, null, false));
"x && y", null, false, -1.0f, 10, null, false));
generateByNewInstance(QueueStatistics.class);
generateByNewInstance(QueueUserACLInfo.class);
generateByNewInstance(YarnClusterMetrics.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ protected QueueInfo getQueueInfo() {
getIntraQueuePreemptionDisabled());
queueInfo.setQueueConfigurations(getQueueConfigurations());
queueInfo.setWeight(queueCapacities.getWeight());
queueInfo.setMaxParallelApps(maxParallelApps);
return queueInfo;
}

Expand Down Expand Up @@ -1494,6 +1495,7 @@ public void setMaxParallelApps(int maxParallelApps) {
this.maxParallelApps = maxParallelApps;
}

@Override
public int getMaxParallelApps() {
return maxParallelApps;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
* @return current run-state
*/
public QueueState getState();

/**
* Get the max-parallel-applications property of the queue
* @return max-parallel-applications
*/
public int getMaxParallelApps();

/**
* Get child queues
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class CapacitySchedulerQueueInfo {
protected float weight;
protected float normalizedWeight;
protected int numApplications;
protected int maxParallelApps;
protected String queueName;
protected boolean isAbsoluteResource;
protected QueueState state;
Expand Down Expand Up @@ -120,6 +121,7 @@ public class CapacitySchedulerQueueInfo {
weight = q.getQueueCapacities().getWeight();
normalizedWeight = q.getQueueCapacities().getNormalizedWeight();
numApplications = q.getNumApplications();
maxParallelApps = q.getMaxParallelApps();
allocatedContainers = q.getMetrics().getAllocatedContainers();
pendingContainers = q.getMetrics().getPendingContainers();
reservedContainers = q.getMetrics().getReservedContainers();
Expand Down Expand Up @@ -352,6 +354,10 @@ public float getNormalizedWeight() {
return normalizedWeight;
}

public int getMaxParallelApps() {
return maxParallelApps;
}

public String getDefaultNodeLabelExpression() {
return defaultNodeLabelExpression;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private void mockQueue(String queueName,
QueueInfo queueInfo = QueueInfo.
newInstance(queueName, queuePath, 0, 0,
0, null, null,
null, null, null, null, false, -1.0f, null, false);
null, null, null, null, false, -1.0f, 10, null, false);
when(confScheduler.getQueueInfo(eq(queueName), anyBoolean(), anyBoolean()))
.thenReturn(queueInfo);
Queue queue = mock(Queue.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ private Queue createQueue(String name, Queue parent, float capacity) {
QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf);
QueueInfo queueInfo = QueueInfo.newInstance(name,
"root." + name, capacity, 1.0f, 0, null,
null, QueueState.RUNNING, null, "", null, false, -1.0f, null, false);
null, QueueState.RUNNING, null, "", null, false, -1.0f, 10, null, false);
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
Queue queue = mock(Queue.class);
when(queue.getMetrics()).thenReturn(metrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5101,7 +5101,7 @@ private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
CSQueueMetrics metrics = CSQueueMetrics.forQueue(name, parent, false, cs.getConf());
QueueInfo queueInfo = QueueInfo.
newInstance(name, path, capacity, 1.0f, 0, null,
null, QueueState.RUNNING, null, "", null, false, -1.0f, null, false);
null, QueueState.RUNNING, null, "", null, false, -1.0f, 10, null, false);
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
AbstractCSQueue queue = mock(AbstractCSQueue.class);
when(queue.getMetrics()).thenReturn(metrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server.resourcemanager.webapp;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAX_PARALLEL_APPLICATIONS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -83,6 +84,7 @@ private class QueueInfo {
float absoluteMaxCapacity;
float absoluteUsedCapacity;
int numApplications;
int maxParallelApps;
String queueName;
private String queuePath;
String state;
Expand Down Expand Up @@ -140,6 +142,7 @@ private static void setupQueueConfiguration(
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
config.setCapacity(A, 10.5f);
config.setMaximumCapacity(A, 50);
config.setInt(CapacitySchedulerConfiguration.getQueuePrefix(A) + MAX_PARALLEL_APPLICATIONS, 42);

final String B = CapacitySchedulerConfiguration.ROOT + ".b";
config.setCapacity(B, 89.5f);
Expand Down Expand Up @@ -311,6 +314,8 @@ public void verifySubQueueXML(Element qElem, String q,
WebServicesTestUtils.getXmlFloat(qElem, "absoluteUsedCapacity");
qi.numApplications =
WebServicesTestUtils.getXmlInt(qElem, "numApplications");
qi.maxParallelApps =
WebServicesTestUtils.getXmlInt(qElem, "maxParallelApps");
qi.queueName = WebServicesTestUtils.getXmlString(qElem, "queueName");
qi.queuePath = WebServicesTestUtils.getXmlString(qElem, "queuePath");
qi.state = WebServicesTestUtils.getXmlString(qElem, "state");
Expand Down Expand Up @@ -424,10 +429,10 @@ private void verifyClusterSchedulerGeneric(String type, float usedCapacity,
private void verifySubQueue(JSONObject info, String q,
float parentAbsCapacity, float parentAbsMaxCapacity)
throws JSONException, Exception {
int numExpectedElements = 37;
int numExpectedElements = 38;
boolean isParentQueue = true;
if (!info.has("queues")) {
numExpectedElements = 55;
numExpectedElements = 56;
isParentQueue = false;
}
assertEquals("incorrect number of elements", numExpectedElements, info.length());
Expand All @@ -440,6 +445,7 @@ private void verifySubQueue(JSONObject info, String q,
qi.absoluteMaxCapacity = (float) info.getDouble("absoluteMaxCapacity");
qi.absoluteUsedCapacity = (float) info.getDouble("absoluteUsedCapacity");
qi.numApplications = info.getInt("numApplications");
qi.maxParallelApps = info.getInt("maxParallelApps");
qi.queueName = info.getString("queueName");
qi.queuePath = info.getString("queuePath");
qi.state = info.getString("state");
Expand Down Expand Up @@ -526,7 +532,9 @@ private void verifySubQueueGeneric(String q, QueueInfo info,
+ " queue is not configured in Absolute resource",
info.isAbsoluteResource);
}

assertEquals("maxParallelApps doesn't match " + q,
(q.equals("root.a") ? 42 : Integer.MAX_VALUE),
info.maxParallelApps);
}

private void verifyLeafQueueGeneric(String q, LeafQueueInfo info)
Expand Down
Loading