Skip to content

YARN-11014. YARN incorrectly validates maximum capacity resources on … #3715

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2239,6 +2239,22 @@ private void refreshLabelToNodeCache(Set<String> updateLabels) {
}
}

/**
* Add node to nodeTracker. Used when validating CS configuration by instantiating a new
* CS instance.
* @param nodesToAdd node to be added
*/
public void addNodes(List<FiCaSchedulerNode> nodesToAdd) {
writeLock.lock();
try {
for (FiCaSchedulerNode node : nodesToAdd) {
nodeTracker.addNode(node);
}
} finally {
writeLock.unlock();
}
}

private void addNode(RMNode nodeManager) {
writeLock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ private CapacitySchedulerConfigValidator() {
public static boolean validateCSConfiguration(
final Configuration oldConf, final Configuration newConf,
final RMContext rmContext) throws IOException {
CapacityScheduler liveScheduler = (CapacityScheduler) rmContext.getScheduler();
CapacityScheduler newCs = new CapacityScheduler();
try {
//TODO: extract all the validation steps and replace reinitialize with
//the specific validation steps
newCs.setConf(oldConf);
newCs.setRMContext(rmContext);
newCs.init(oldConf);
newCs.addNodes(liveScheduler.getAllNodes());
newCs.reinitialize(newConf, rmContext, true);
return true;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,23 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -34,9 +44,71 @@
import java.util.HashMap;
import java.util.Map;

import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
import static org.junit.Assert.fail;

public class TestCapacitySchedulerConfigValidator {
public static final int NODE_MEMORY = 16;
public static final int NODE1_VCORES = 8;
public static final int NODE2_VCORES = 10;
public static final int NODE3_VCORES = 12;
public static final Map<String, Long> NODE_GPU = ImmutableMap.of(GPU_URI, 2L);
public static final int GB = 1024;

private static final String PARENT_A = "parentA";
private static final String PARENT_B = "parentB";
private static final String LEAF_A = "leafA";
private static final String LEAF_B = "leafB";

private static final String PARENT_A_FULL_PATH = CapacitySchedulerConfiguration.ROOT
+ "." + PARENT_A;
private static final String LEAF_A_FULL_PATH = PARENT_A_FULL_PATH
+ "." + LEAF_A;
private static final String PARENT_B_FULL_PATH = CapacitySchedulerConfiguration.ROOT
+ "." + PARENT_B;
private static final String LEAF_B_FULL_PATH = PARENT_B_FULL_PATH
+ "." + LEAF_B;

private final Resource A_MINRES = Resource.newInstance(16 * GB, 10);
private final Resource B_MINRES = Resource.newInstance(32 * GB, 5);
private final Resource FULL_MAXRES = Resource.newInstance(48 * GB, 30);
private final Resource PARTIAL_MAXRES = Resource.newInstance(16 * GB, 10);
private final Resource VCORE_EXCEEDED_MAXRES = Resource.newInstance(16 * GB, 50);
private Resource A_MINRES_GPU;
private Resource B_MINRES_GPU;
private Resource FULL_MAXRES_GPU;
private Resource PARTIAL_MAXRES_GPU;
private Resource GPU_EXCEEDED_MAXRES_GPU;

protected MockRM mockRM = null;
protected MockNM nm1 = null;
protected MockNM nm2 = null;
protected MockNM nm3 = null;
protected CapacityScheduler cs;

public static void setupResources(boolean useGpu) {
Map<String, ResourceInformation> riMap = new HashMap<>();

ResourceInformation memory = ResourceInformation.newInstance(
ResourceInformation.MEMORY_MB.getName(),
ResourceInformation.MEMORY_MB.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
ResourceInformation vcores = ResourceInformation.newInstance(
ResourceInformation.VCORES.getName(),
ResourceInformation.VCORES.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
riMap.put(ResourceInformation.MEMORY_URI, memory);
riMap.put(ResourceInformation.VCORES_URI, vcores);
if (useGpu) {
riMap.put(ResourceInformation.GPU_URI,
ResourceInformation.newInstance(ResourceInformation.GPU_URI, "", 0,
ResourceTypes.COUNTABLE, 0, 10L));
}

ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
}

/**
* Test for the case when the scheduler.minimum-allocation-mb == 0.
Expand Down Expand Up @@ -69,7 +141,6 @@ public void testValidateMemoryAllocationHIgherMinThanMaxMem() {

}


@Test
public void testValidateMemoryAllocation() {
Map<String, String> configs = new HashMap();
Expand Down Expand Up @@ -115,7 +186,6 @@ public void testValidateVCoresHigherMinThanMaxVCore() {

}


@Test
public void testValidateVCores() {
Map<String, String> configs = new HashMap();
Expand Down Expand Up @@ -147,6 +217,106 @@ public void testValidateCSConfigInvalidCapacity() {
}
}

@Test
public void testValidateCSConfigDefaultRCAbsoluteModeParentMaxMemoryExceeded()
throws Exception {
setUpMockRM(false);
RMContext rmContext = mockRM.getRMContext();
CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
CapacitySchedulerConfiguration newConfiguration =
new CapacitySchedulerConfiguration(cs.getConfiguration());
newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, FULL_MAXRES);
try {
CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
fail("Parent maximum capacity exceeded");
} catch (IOException e) {
Assert.assertTrue(e.getCause().getMessage()
.startsWith("Max resource configuration"));
} finally {
mockRM.stop();
}
Comment on lines +223 to +238
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering whether this block could be extracted like the following:

CapacitySchedulerConfiguration newConfiguration =
        new CapacitySchedulerConfiguration(cs.getConfiguration());
 newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, FULL_MAXRES);
boolean shouldFail = false;
validateCSConfig(newConfiguration, shouldFail);

}

@Test
public void testValidateCSConfigDefaultRCAbsoluteModeParentMaxVcoreExceeded() throws Exception {
setUpMockRM(false);
RMContext rmContext = mockRM.getRMContext();
CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
CapacitySchedulerConfiguration newConfiguration =
new CapacitySchedulerConfiguration(cs.getConfiguration());
newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, VCORE_EXCEEDED_MAXRES);
try {
CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
} catch (IOException e) {
fail("In DefaultResourceCalculator vcore limits are not enforced");
} finally {
mockRM.stop();
}
}

@Test
public void testValidateCSConfigDominantRCAbsoluteModeParentMaxMemoryExceeded()
throws Exception {
setUpMockRM(true);
RMContext rmContext = mockRM.getRMContext();
CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
CapacitySchedulerConfiguration newConfiguration =
new CapacitySchedulerConfiguration(cs.getConfiguration());
newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, FULL_MAXRES);
try {
CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
fail("Parent maximum capacity exceeded");
} catch (IOException e) {
Assert.assertTrue(e.getCause().getMessage()
.startsWith("Max resource configuration"));
} finally {
mockRM.stop();
}
}

@Test
public void testValidateCSConfigDominantRCAbsoluteModeParentMaxVcoreExceeded() throws Exception {
setUpMockRM(true);
RMContext rmContext = mockRM.getRMContext();
CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
CapacitySchedulerConfiguration newConfiguration =
new CapacitySchedulerConfiguration(cs.getConfiguration());
newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, VCORE_EXCEEDED_MAXRES);
try {
CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
fail("Parent maximum capacity exceeded");
} catch (IOException e) {
Assert.assertTrue(e.getCause().getMessage()
.startsWith("Max resource configuration"));
} finally {
mockRM.stop();
}
}

@Test
public void testValidateCSConfigDominantRCAbsoluteModeParentMaxGPUExceeded() throws Exception {
setUpMockRM(true);
RMContext rmContext = mockRM.getRMContext();
CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
CapacitySchedulerConfiguration newConfiguration =
new CapacitySchedulerConfiguration(cs.getConfiguration());
newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, GPU_EXCEEDED_MAXRES_GPU);
try {
CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
fail("Parent maximum capacity exceeded");
} catch (IOException e) {
Assert.assertTrue(e.getCause().getMessage()
.startsWith("Max resource configuration"));
} finally {
mockRM.stop();
}
}

@Test
public void testValidateCSConfigStopALeafQueue() throws IOException {
Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
Expand All @@ -155,7 +325,7 @@ public void testValidateCSConfigStopALeafQueue() throws IOException {
newConfig
.set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
RMContext rmContext = prepareRMContext();
Boolean isValidConfig = CapacitySchedulerConfigValidator
boolean isValidConfig = CapacitySchedulerConfigValidator
.validateCSConfiguration(oldConfig, newConfig, rmContext);
Assert.assertTrue(isValidConfig);
}
Expand Down Expand Up @@ -340,9 +510,11 @@ public void testAddQueueToALeafQueue() throws IOException {
Assert.assertTrue(isValidConfig);
}


public static RMContext prepareRMContext() {
setupResources(false);
RMContext rmContext = Mockito.mock(RMContext.class);
CapacityScheduler mockCs = Mockito.mock(CapacityScheduler.class);
Mockito.when(rmContext.getScheduler()).thenReturn(mockCs);
LocalConfigurationProvider configProvider = Mockito
.mock(LocalConfigurationProvider.class);
Mockito.when(rmContext.getConfigurationProvider())
Expand All @@ -361,4 +533,94 @@ public static RMContext prepareRMContext() {
.thenReturn(queuePlacementManager);
return rmContext;
}

private void setUpMockRM(boolean useDominantRC) throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
setupResources(useDominantRC);
CapacitySchedulerConfiguration csConf = setupCSConfiguration(conf, useDominantRC);

mockRM = new MockRM(csConf);

cs = (CapacityScheduler) mockRM.getResourceScheduler();
mockRM.start();
cs.start();

setupNodes(mockRM);
}

private void setupNodes(MockRM newMockRM) throws Exception {
nm1 = new MockNM("h1:1234",
Resource.newInstance(NODE_MEMORY * GB, NODE1_VCORES, NODE_GPU),
newMockRM.getResourceTrackerService(),
YarnVersionInfo.getVersion());

nm1.registerNode();

nm2 = new MockNM("h2:1234",
Resource.newInstance(NODE_MEMORY * GB, NODE2_VCORES, NODE_GPU),
newMockRM.getResourceTrackerService(),
YarnVersionInfo.getVersion());
nm2.registerNode();

nm3 = new MockNM("h3:1234",
Resource.newInstance(NODE_MEMORY * GB, NODE3_VCORES, NODE_GPU),
newMockRM.getResourceTrackerService(),
YarnVersionInfo.getVersion());
nm3.registerNode();
}

private void setupGpuResourceValues() {
A_MINRES_GPU = Resource.newInstance(A_MINRES.getMemorySize(), A_MINRES.getVirtualCores(),
ImmutableMap.of(GPU_URI, 2L));
B_MINRES_GPU = Resource.newInstance(B_MINRES.getMemorySize(), B_MINRES.getVirtualCores(),
ImmutableMap.of(GPU_URI, 2L));
FULL_MAXRES_GPU = Resource.newInstance(FULL_MAXRES.getMemorySize(),
FULL_MAXRES.getVirtualCores(), ImmutableMap.of(GPU_URI, 6L));
PARTIAL_MAXRES_GPU = Resource.newInstance(PARTIAL_MAXRES.getMemorySize(),
PARTIAL_MAXRES.getVirtualCores(), ImmutableMap.of(GPU_URI, 4L));
GPU_EXCEEDED_MAXRES_GPU = Resource.newInstance(PARTIAL_MAXRES.getMemorySize(),
PARTIAL_MAXRES.getVirtualCores(), ImmutableMap.of(GPU_URI, 50L));
}

private CapacitySchedulerConfiguration setupCSConfiguration(YarnConfiguration configuration,
boolean useDominantRC) {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(configuration);
if (useDominantRC) {
csConf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getName());
csConf.set(YarnConfiguration.RESOURCE_TYPES, ResourceInformation.GPU_URI);
}

csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{PARENT_A, PARENT_B});
csConf.setQueues(PARENT_A_FULL_PATH, new String[]{LEAF_A});
csConf.setQueues(PARENT_B_FULL_PATH, new String[]{LEAF_B});

if (useDominantRC) {
setupGpuResourceValues();
csConf.setMinimumResourceRequirement("", PARENT_A_FULL_PATH, A_MINRES_GPU);
csConf.setMinimumResourceRequirement("", PARENT_B_FULL_PATH, B_MINRES_GPU);
csConf.setMinimumResourceRequirement("", LEAF_A_FULL_PATH, A_MINRES_GPU);
csConf.setMinimumResourceRequirement("", LEAF_B_FULL_PATH, B_MINRES_GPU);

csConf.setMaximumResourceRequirement("", PARENT_A_FULL_PATH, PARTIAL_MAXRES_GPU);
csConf.setMaximumResourceRequirement("", PARENT_B_FULL_PATH, FULL_MAXRES_GPU);
csConf.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, PARTIAL_MAXRES_GPU);
csConf.setMaximumResourceRequirement("", LEAF_B_FULL_PATH, FULL_MAXRES_GPU);
} else {
csConf.setMinimumResourceRequirement("", PARENT_A_FULL_PATH, A_MINRES);
csConf.setMinimumResourceRequirement("", PARENT_B_FULL_PATH, B_MINRES);
csConf.setMinimumResourceRequirement("", LEAF_A_FULL_PATH, A_MINRES);
csConf.setMinimumResourceRequirement("", LEAF_B_FULL_PATH, B_MINRES);

csConf.setMaximumResourceRequirement("", PARENT_A_FULL_PATH, PARTIAL_MAXRES);
csConf.setMaximumResourceRequirement("", PARENT_B_FULL_PATH, FULL_MAXRES);
csConf.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, PARTIAL_MAXRES);
csConf.setMaximumResourceRequirement("", LEAF_B_FULL_PATH, FULL_MAXRES);
}

return csConf;
}
}