Skip to content

YARN-11573. Add config option to make container allocation prefer nodes without reserved containers #6098

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,5 @@ public class ActivityDiagnosticConstant {
public final static String
NODE_CAN_NOT_FIND_CONTAINER_TO_BE_UNRESERVED_WHEN_NEEDED =
"Node can't find a container to be unreserved when needed";
public static final String NODE_HAS_BEEN_RESERVED = "Node has been reserved";
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true;

public static final String SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS = PREFIX
+ "skip-allocate-on-nodes-with-reserved-containers";

@Private
public static final boolean DEFAULT_SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS = false;

@Private
public static final String MAXIMUM_ALLOCATION = "maximum-allocation";

Expand Down Expand Up @@ -938,6 +944,11 @@ public boolean getReservationContinueLook() {
DEFAULT_RESERVE_CONT_LOOK_ALL_NODES);
}

public boolean getSkipAllocateOnNodesWithReservedContainer() {
return getBoolean(SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS,
DEFAULT_SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS);
}

private static String getAclKey(QueueACL acl) {
return "acl_" + StringUtils.toLowerCase(acl.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityLevel;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.Container;
Expand Down Expand Up @@ -850,9 +853,23 @@ private ContainerAllocation allocate(Resource clusterResource,

Iterator<FiCaSchedulerNode> iter = schedulingPS.getPreferredNodeIterator(
candidates);

while (iter.hasNext()) {
FiCaSchedulerNode node = iter.next();

// Do not schedule if there are any reservations to fulfill on the node
if (iter.hasNext() &&
node.getReservedContainer() != null &&
isSkipAllocateOnNodesWithReservedContainer()) {
LOG.debug("Skipping scheduling on node {} since it has already been"
+ " reserved by {}", node.getNodeID(),
node.getReservedContainer().getContainerId());
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, schedulerKey,
ActivityDiagnosticConstant.NODE_HAS_BEEN_RESERVED, ActivityLevel.NODE);
continue;
}

if (reservedContainer == null) {
result = preCheckForNodeCandidateSet(node,
schedulingMode, resourceLimits, schedulerKey);
Expand Down Expand Up @@ -894,7 +911,19 @@ private ContainerAllocation allocate(Resource clusterResource,

return result;
}


private boolean isSkipAllocateOnNodesWithReservedContainer() {
ResourceScheduler scheduler = rmContext.getScheduler();
boolean skipAllocateOnNodesWithReservedContainer = false;
if (scheduler instanceof CapacityScheduler) {
CapacityScheduler cs = (CapacityScheduler) scheduler;
CapacitySchedulerConfiguration csConf = cs.getConfiguration();
skipAllocateOnNodesWithReservedContainer =
csConf.getSkipAllocateOnNodesWithReservedContainer();
}
return skipAllocateOnNodesWithReservedContainer;
}

@Override
public CSAssignment assignContainers(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators;

import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.slf4j.Logger;
Expand Down Expand Up @@ -478,4 +482,110 @@ public void testMultiNodeSorterAfterHeartbeatInterval() throws Exception {
rm.stop();
}

@Test(timeout=30000)
public void testSkipAllocationOnNodeReservedByAnotherApp() throws Exception {
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration(conf);
newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ ".resource-based.sorting-interval.ms", 0);
newConf.setMaximumApplicationMasterResourcePerQueuePercent("root.default", 1.0f);
newConf.set(CapacitySchedulerConfiguration.SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS,
"true");
MockRM rm1 = new MockRM(newConf);

rm1.start();
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8 * GB);
MockNM nm2 = rm1.registerNode("127.0.0.2:1235", 8 * GB);

// launch an app to queue, AM container should be launched in nm1
RMApp app1 = MockRMAppSubmitter.submit(rm1, MockRMAppSubmissionData.Builder
.createWithMemory(5 * GB, rm1)
.withAppName("app")
.withUser("user")
.withQueue("default")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

// launch another app to queue, AM container should be launched in nm2
RMApp app2 = MockRMAppSubmitter.submit(rm1, MockRMAppSubmissionData.Builder
.createWithMemory(5 * GB, rm1)
.withAppName("app")
.withUser("user")
.withQueue("default")
.build());
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);

CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());

// Ask a container with 4 GB memory size for app1,
am1.allocate("*", 4 * GB, 1, new ArrayList<>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));


// Check containers of app1 and app2.
Set<RMNode> reservedContainers = checkReservedContainers(cs,
rm1.getRMContext().getRMNodes(), 1);
Assert.assertEquals(1, reservedContainers.size());
RMNode nodeWithReservedContainer = reservedContainers.iterator().next();
LOG.debug("Reserved container on: {}", nodeWithReservedContainer);

//Move reservation to nm1 for easier testing
if (nodeWithReservedContainer.getNodeID().getHost().startsWith("127.0.0.2")) {
moveReservation(cs, rm1, nm1, nm2, am1);
}
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
Assert.assertNull(cs.getNode(nm2.getNodeId()).getReservedContainer());

Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());

//Make sure to have available headroom on the child queue,
// see: RegularContainerAllocator#checkHeadroom,
//that can make RegularContainerAllocator.preCheckForNodeCandidateSet to return
// ContainerAllocation.QUEUE_SKIPPED
MockNM nm3 = rm1.registerNode("127.0.0.3:1235", 3 * GB);

//Allocate a container for app2, we expect this to be allocated on nm2 as
// nm1 has a reservation for another app
am2.allocate("*", 4 * GB, 1, new ArrayList<>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());

rm1.close();
}

private static void moveReservation(CapacityScheduler cs,
MockRM rm1, MockNM nm1, MockNM nm2, MockAM am1) {
RMNode sourceNode = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
RMNode targetNode = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerApplicationAttempt firstSchedulerAppAttempt =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp app = (FiCaSchedulerApp)firstSchedulerAppAttempt;
RMContainer reservedContainer = cs.getNode(sourceNode.getNodeID()).getReservedContainer();
LOG.debug("Moving reservation");
app.moveReservation(reservedContainer,
cs.getNode(sourceNode.getNodeID()), cs.getNode(targetNode.getNodeID()));
}

private static Set<RMNode> checkReservedContainers(CapacityScheduler cs,
ConcurrentMap<NodeId, RMNode> rmNodes, int expectedNumberOfContainers) {
Set<RMNode> result = new HashSet<>();
for (Map.Entry<NodeId, RMNode> entry : rmNodes.entrySet()) {
if (cs.getNode(entry.getKey()).getReservedContainer() != null) {
result.add(entry.getValue());
}
}

Assert.assertEquals(expectedNumberOfContainers, result.size());
return result;
}
}