Skip to content

Commit

Permalink
YARN-9323. FSLeafQueue#computeMaxAMResource does not override zero va…
Browse files Browse the repository at this point in the history
…lues for custom resources

(Contributed by Szilard Nemeth via Daniel Templeton)

Change-Id: Id844ccf09488f367c0c7de0a3b2d4aca1bba31cc
  • Loading branch information
Szilard Nemeth authored and templedf committed Feb 28, 2019
1 parent 7b928f1 commit 538bb48
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -832,4 +832,8 @@ public long getAggegatedReleasedContainers() {
public long getAggregatePreemptedContainers() {
return aggregateContainersPreempted.value();
}

public QueueMetricsForCustomResources getQueueMetricsForCustomResources() {
return queueMetricsForCustomResources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,8 @@ Map<String, Long> getReservedValues() {
QueueMetricsCustomResource getAggregatePreemptedSeconds() {
return aggregatePreemptedSeconds;
}

public QueueMetricsCustomResource getAvailable() {
return available;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
Expand All @@ -42,6 +43,8 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsCustomResource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsForCustomResources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
Expand Down Expand Up @@ -517,6 +520,29 @@ private Resource computeMaxAMResource() {
getMaxShare().getVirtualCores()));
}

QueueMetricsForCustomResources metricsForCustomResources =
scheduler.getRootQueueMetrics().getQueueMetricsForCustomResources();

if (metricsForCustomResources != null) {
QueueMetricsCustomResource availableResources =
metricsForCustomResources.getAvailable();

// We expect all custom resources contained in availableResources,
// so we will loop through all of them.
for (Map.Entry<String, Long> availableEntry : availableResources
.getValues().entrySet()) {
String resourceName = availableEntry.getKey();

// We only update the value if fairshare is 0 for that resource.
if (maxResource.getResourceValue(resourceName) == 0) {
Long availableValue = availableEntry.getValue();
long value = Math.min(availableValue,
getMaxShare().getResourceValue(resourceName));
maxResource.setResourceValue(resourceName, value);
}
}
}

// Round up to allow AM to run when there is only one vcore on the cluster
return Resources.multiplyAndRoundUp(maxResource, maxAMShare);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,8 @@ static FSQueueMetrics forQueue(MetricsSystem ms, String queueName,

return (FSQueueMetrics)metrics;
}

FSQueueMetricsForCustomResources getCustomResources() {
return customResources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
Expand All @@ -42,19 +43,26 @@
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsCustomResource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.Map;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;

public class TestFSLeafQueue extends FairSchedulerTestBase {
private final static String ALLOC_FILE = new File(TEST_DIR,
TestFSLeafQueue.class.getName() + ".xml").getAbsolutePath();
private Resource maxResource = Resources.createResource(1024 * 8);
private static final float MAX_AM_SHARE = 0.5f;
private static final String CUSTOM_RESOURCE = "test1";

@Before
public void setup() throws IOException {
Expand Down Expand Up @@ -105,6 +113,8 @@ public void test() throws Exception {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queueMaxAMShareDefault>" + MAX_AM_SHARE +
"</queueMaxAMShareDefault>");
out.println("<queue name=\"queueA\"></queue>");
out.println("<queue name=\"queueB\"></queue>");
out.println("</allocations>");
Expand Down Expand Up @@ -221,4 +231,128 @@ public void run() {
assertTrue("Test failed with exception(s)" + exceptions,
exceptions.isEmpty());
}

@Test
public void testCanRunAppAMReturnsTrue() {
conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE);
ResourceUtils.resetResourceTypes(conf);

resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();

Resource maxShare = Resource.newInstance(1024 * 8, 4,
ImmutableMap.of(CUSTOM_RESOURCE, 10L));

// Add a node to increase available memory and vcores in scheduler's
// root queue metrics
addNodeToScheduler(Resource.newInstance(4096, 10,
ImmutableMap.of(CUSTOM_RESOURCE, 25L)));

FSLeafQueue queue = setupQueue(maxShare);

//Min(availableMemory, maxShareMemory (maxResourceOverridden))
// --> Min(4096, 8192) = 4096
//Min(availableVCores, maxShareVCores (maxResourceOverridden))
// --> Min(10, 4) = 4
//Min(available test1, maxShare test1 (maxResourceOverridden))
// --> Min(25, 10) = 10
//MaxAMResource: (4096 MB memory, 4 vcores, 10 test1) * MAX_AM_SHARE
// --> 2048 MB memory, 2 vcores, 5 test1
Resource expectedAMShare = Resource.newInstance(2048, 2,
ImmutableMap.of(CUSTOM_RESOURCE, 5L));

Resource appAMResource = Resource.newInstance(2048, 2,
ImmutableMap.of(CUSTOM_RESOURCE, 3L));

Map<String, Long> customResourceValues =
verifyQueueMetricsForCustomResources(queue);

boolean result = queue.canRunAppAM(appAMResource);
assertTrue("AM should have been allocated!", result);

verifyAMShare(queue, expectedAMShare, customResourceValues);
}

private FSLeafQueue setupQueue(Resource maxShare) {
String queueName = "root.queue1";
FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null);
schedulable.setMaxShare(new ConfigurableResource(maxShare));
schedulable.setMaxAMShare(MAX_AM_SHARE);
return schedulable;
}

@Test
public void testCanRunAppAMReturnsFalse() {
conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE);
ResourceUtils.resetResourceTypes(conf);

resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();

Resource maxShare = Resource.newInstance(1024 * 8, 4,
ImmutableMap.of(CUSTOM_RESOURCE, 10L));

// Add a node to increase available memory and vcores in scheduler's
// root queue metrics
addNodeToScheduler(Resource.newInstance(4096, 10,
ImmutableMap.of(CUSTOM_RESOURCE, 25L)));

FSLeafQueue queue = setupQueue(maxShare);

//Min(availableMemory, maxShareMemory (maxResourceOverridden))
// --> Min(4096, 8192) = 4096
//Min(availableVCores, maxShareVCores (maxResourceOverridden))
// --> Min(10, 4) = 4
//Min(available test1, maxShare test1 (maxResourceOverridden))
// --> Min(25, 10) = 10
//MaxAMResource: (4096 MB memory, 4 vcores, 10 test1) * MAX_AM_SHARE
// --> 2048 MB memory, 2 vcores, 5 test1
Resource expectedAMShare = Resource.newInstance(2048, 2,
ImmutableMap.of(CUSTOM_RESOURCE, 5L));

Resource appAMResource = Resource.newInstance(2048, 2,
ImmutableMap.of(CUSTOM_RESOURCE, 6L));

Map<String, Long> customResourceValues =
verifyQueueMetricsForCustomResources(queue);

boolean result = queue.canRunAppAM(appAMResource);
assertFalse("AM should not have been allocated!", result);

verifyAMShare(queue, expectedAMShare, customResourceValues);
}

private void addNodeToScheduler(Resource node1Resource) {
RMNode node1 = MockNodes.newNodeInfo(0, node1Resource, 1, "127.0.0.2");
scheduler.handle(new NodeAddedSchedulerEvent(node1));
}

private void verifyAMShare(FSLeafQueue schedulable,
Resource expectedAMShare, Map<String, Long> customResourceValues) {
Resource actualAMShare = Resource.newInstance(
schedulable.getMetrics().getMaxAMShareMB(),
schedulable.getMetrics().getMaxAMShareVCores(), customResourceValues);
long customResourceValue =
actualAMShare.getResourceValue(CUSTOM_RESOURCE);

//make sure to verify custom resource value explicitly!
assertEquals(5L, customResourceValue);
assertEquals("AM share is not the expected!", expectedAMShare,
actualAMShare);
}

private Map<String, Long> verifyQueueMetricsForCustomResources(
FSLeafQueue schedulable) {
QueueMetricsCustomResource maxAMShareCustomResources =
schedulable.getMetrics().getCustomResources().getMaxAMShare();
Map<String, Long> customResourceValues = maxAMShareCustomResources
.getValues();
assertNotNull("Queue metrics for custom resources should not be null!",
maxAMShareCustomResources);
assertNotNull("Queue metrics for custom resources resource values " +
"should not be null!", customResourceValues);
return customResourceValues;
}
}

0 comments on commit 538bb48

Please sign in to comment.