Skip to content
This repository has been archived by the owner on Jul 22, 2022. It is now read-only.

Commit

Permalink
YARN-10396. Max applications calculation per queue disregards queue l…
Browse files Browse the repository at this point in the history
…evel settings in absolute mode. Contributed by Benjamin Teke.
  • Loading branch information
pbacsko authored and Hexiaoqiao committed Nov 28, 2020
1 parent 00e48c3 commit 30542c2
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1119,8 +1119,18 @@ private void deriveCapacityFromAbsoluteConfigurations(String label,
if (childQueue instanceof LeafQueue) {
LeafQueue leafQueue = (LeafQueue) childQueue;
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
int maxApplications = (int) (conf.getMaximumSystemApplications()
* childQueue.getQueueCapacities().getAbsoluteCapacity(label));
int maxApplications =
conf.getMaximumApplicationsPerQueue(childQueue.getQueuePath());
if (maxApplications < 0) {
int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
if (maxGlobalPerQueueApps > 0) {
maxApplications = (int) (maxGlobalPerQueueApps *
childQueue.getQueueCapacities().getAbsoluteCapacity(label));
} else {
maxApplications = (int) (conf.getMaximumSystemApplications()
* childQueue.getQueueCapacities().getAbsoluteCapacity(label));
}
}
leafQueue.setMaxApplications(maxApplications);

int maxApplicationsPerUser = Math.min(maxApplications,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,17 @@ public void setUp() throws Exception {

private static final String A = "a";
private static final String B = "b";
private static final String Q_A =
CapacitySchedulerConfiguration.ROOT + "." + A;
private static final String Q_B =
CapacitySchedulerConfiguration.ROOT + "." + B;
private void setupSingleLevelQueues(CapacitySchedulerConfiguration conf) {

// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B});

final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
conf.setCapacity(Q_A, 30);

final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
conf.setCapacity(Q_B, 70);

LOG.info("Setup top-level queues a and b");
Expand All @@ -130,11 +132,9 @@ private void setupSingleLevelQueuesWithAbsoluteResource(
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{A, B});

final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
conf.setMinimumResourceRequirement("", Q_A,
QUEUE_A_RESOURCE);

final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
conf.setMinimumResourceRequirement("", Q_B,
QUEUE_B_RESOURCE);

Expand Down Expand Up @@ -370,9 +370,7 @@ public void testSingleLevelQueues() throws Exception {
public void testSingleLevelQueuesPrecision() throws Exception {
// Setup queue configs
setupSingleLevelQueues(csConf);
final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + "a";
csConf.setCapacity(Q_A, 30);
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + "b";
csConf.setCapacity(Q_B, 70.5F);

Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
Expand Down Expand Up @@ -436,10 +434,8 @@ private void setupMultiLevelQueues(CapacitySchedulerConfiguration conf) {
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B, C, D});

final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
conf.setCapacity(Q_A, 10);

final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
conf.setCapacity(Q_B, 50);

final String Q_C = CapacitySchedulerConfiguration.ROOT + "." + C;
Expand Down Expand Up @@ -658,7 +654,6 @@ public void testQueueCapacitySettingChildZero() throws Exception {
setupMultiLevelQueues(csConf);

// set child queues capacity to 0 when parents not 0
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
csConf.setCapacity(Q_B + "." + B1, 0);
csConf.setCapacity(Q_B + "." + B2, 0);
csConf.setCapacity(Q_B + "." + B3, 0);
Expand All @@ -675,9 +670,7 @@ public void testQueueCapacitySettingParentZero() throws Exception {
setupMultiLevelQueues(csConf);

// set parent capacity to 0 when child not 0
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
csConf.setCapacity(Q_B, 0);
final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
csConf.setCapacity(Q_A, 60);

Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
Expand All @@ -692,13 +685,11 @@ public void testQueueCapacityZero() throws Exception {
setupMultiLevelQueues(csConf);

// set parent and child capacity to 0
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
csConf.setCapacity(Q_B, 0);
csConf.setCapacity(Q_B + "." + B1, 0);
csConf.setCapacity(Q_B + "." + B2, 0);
csConf.setCapacity(Q_B + "." + B3, 0);

final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
csConf.setCapacity(Q_A, 60);

Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
Expand Down Expand Up @@ -1026,10 +1017,125 @@ public void testAbsoluteResourceWithChangeInClusterResource()
QUEUE_B_RESOURCE_70PERC);
}

@Test
public void testDeriveCapacityFromAbsoluteConfigurations() throws Exception {
// Setup queue configs
setupSingleLevelQueuesWithAbsoluteResource(csConf);

Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf,
null, CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);

// Setup some nodes
int numNodes = 2;
final long memoryPerNode = (QUEUE_A_RESOURCE.getMemorySize() +
QUEUE_B_RESOURCE.getMemorySize()) / numNodes;
int coresPerNode = (QUEUE_A_RESOURCE.getVirtualCores() +
QUEUE_B_RESOURCE.getVirtualCores()) / numNodes;

Resource clusterResource = Resources.createResource(
numNodes * memoryPerNode, numNodes * coresPerNode);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));

// Start testing
// Only MaximumSystemApplications is set in csConf
LeafQueue a = (LeafQueue) queues.get(A);
LeafQueue b = (LeafQueue) queues.get(B);

float queueAScale = (float) QUEUE_A_RESOURCE.getMemorySize() /
(float) clusterResource.getMemorySize();
float queueBScale = (float) QUEUE_B_RESOURCE.getMemorySize() /
(float) clusterResource.getMemorySize();

assertEquals(queueAScale, a.getQueueCapacities().getCapacity(),
DELTA);
assertEquals(1f, a.getQueueCapacities().getMaximumCapacity(),
DELTA);
assertEquals(queueAScale, a.getQueueCapacities().getAbsoluteCapacity(),
DELTA);
assertEquals(1f,
a.getQueueCapacities().getAbsoluteMaximumCapacity(), DELTA);
assertEquals((int) (csConf.getMaximumSystemApplications() * queueAScale),
a.getMaxApplications());
assertEquals(a.getMaxApplications(), a.getMaxApplicationsPerUser());

assertEquals(queueBScale,
b.getQueueCapacities().getCapacity(), DELTA);
assertEquals(1f,
b.getQueueCapacities().getMaximumCapacity(), DELTA);
assertEquals(queueBScale,
b.getQueueCapacities().getAbsoluteCapacity(), DELTA);
assertEquals(1f,
b.getQueueCapacities().getAbsoluteMaximumCapacity(), DELTA);
assertEquals((int) (csConf.getMaximumSystemApplications() * queueBScale),
b.getMaxApplications());
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());

// Set GlobalMaximumApplicationsPerQueue in csConf
csConf.setGlobalMaximumApplicationsPerQueue(20000);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));

assertEquals((int) (csConf.getGlobalMaximumApplicationsPerQueue() *
queueAScale), a.getMaxApplications());
assertEquals(a.getMaxApplications(), a.getMaxApplicationsPerUser());
assertEquals((int) (csConf.getGlobalMaximumApplicationsPerQueue() *
queueBScale), b.getMaxApplications());
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());

// Set MaximumApplicationsPerQueue in csConf
int queueAMaxApplications = 30000;
int queueBMaxApplications = 30000;
csConf.set("yarn.scheduler.capacity." + Q_A + ".maximum-applications",
Integer.toString(queueAMaxApplications));
csConf.set("yarn.scheduler.capacity." + Q_B + ".maximum-applications",
Integer.toString(queueBMaxApplications));
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));

assertEquals(queueAMaxApplications, a.getMaxApplications());
assertEquals(a.getMaxApplications(), a.getMaxApplicationsPerUser());
assertEquals(queueBMaxApplications, b.getMaxApplications());
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());

// Extra cases for testing maxApplicationsPerUser
int halfPercent = 50;
int oneAndQuarterPercent = 125;
a.getUsersManager().setUserLimit(halfPercent);
b.getUsersManager().setUserLimit(oneAndQuarterPercent);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));

assertEquals(a.getMaxApplications() * halfPercent / 100,
a.getMaxApplicationsPerUser());
// Q_B's limit per user shouldn't be greater
// than the whole queue's application limit
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());

float userLimitFactorQueueA = 0.9f;
float userLimitFactorQueueB = 1.1f;
a.getUsersManager().setUserLimit(halfPercent);
a.getUsersManager().setUserLimitFactor(userLimitFactorQueueA);
b.getUsersManager().setUserLimit(100);
b.getUsersManager().setUserLimitFactor(userLimitFactorQueueB);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));

assertEquals((int) (a.getMaxApplications() * halfPercent *
userLimitFactorQueueA / 100), a.getMaxApplicationsPerUser());
// Q_B's limit per user shouldn't be greater
// than the whole queue's application limit
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());

}

@After
public void tearDown() throws Exception {
}

private ResourceLimits anyResourceLimits() {
return any(ResourceLimits.class);
}
Expand Down

0 comments on commit 30542c2

Please sign in to comment.