Skip to content

Commit

Permalink
YARN-3272. Surface container locality info in RM web UI (Jian He via …
Browse files Browse the repository at this point in the history
…wangda)

(cherry picked from commit e17e5ba)
  • Loading branch information
wangdatan committed Mar 3, 2015
1 parent 7e6624c commit 0a502c6
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 40 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ Release 2.7.0 - UNRELEASED
YARN-3281. Added RMStateStore to StateMachine visualization list.
(Chengbing Liu via jianhe)

YARN-3272. Surface container locality info in RM web UI.
(Jian He via wangda)

OPTIMIZATIONS

YARN-2990. FairScheduler's delay-scheduling always waits for node-local and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@
<Class name="~org\.apache\.hadoop\.yarn\.server\.resourcemanager\.rmapp\.attempt\.RMAppAttemptImpl.*" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<Match>
<Class name="~org\.apache\.hadoop\.yarn\.server\.resourcemanager\.rmapp\.attempt\.RMAppAttemptMetrics" />
<Method name="getLocalityStatistics" />
<Bug pattern="EI_EXPOSE_REP" />
<Method name="incNumAllocatedContainers"/>
<Bug pattern="VO_VOLATILE_INCREMENT" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl$AppRejectedTransition" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.util.resource.Resources;

public class RMAppAttemptMetrics {
Expand All @@ -49,6 +50,10 @@ public class RMAppAttemptMetrics {
private AtomicLong finishedVcoreSeconds = new AtomicLong(0);
private RMContext rmContext;

private int[][] localityStatistics =
new int[NodeType.values().length][NodeType.values().length];
private volatile int totalAllocatedContainers;

public RMAppAttemptMetrics(ApplicationAttemptId attemptId,
RMContext rmContext) {
this.attemptId = attemptId;
Expand All @@ -57,7 +62,7 @@ public RMAppAttemptMetrics(ApplicationAttemptId attemptId,
this.writeLock = lock.writeLock();
this.rmContext = rmContext;
}

public void updatePreemptionInfo(Resource resource, RMContainer container) {
try {
writeLock.lock();
Expand Down Expand Up @@ -126,4 +131,18 @@ public void updateAggregateAppResourceUsage(long finishedMemorySeconds,
this.finishedMemorySeconds.addAndGet(finishedMemorySeconds);
this.finishedVcoreSeconds.addAndGet(finishedVcoreSeconds);
}

public void incNumAllocatedContainers(NodeType containerType,
NodeType requestType) {
localityStatistics[containerType.index][requestType.index]++;
totalAllocatedContainers++;
}

public int[][] getLocalityStatistics() {
return this.localityStatistics;
}

public int getTotalAllocatedContainers() {
return this.totalAllocatedContainers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
* Resource classification.
*/
public enum NodeType {
NODE_LOCAL,
RACK_LOCAL,
OFF_SWITCH
NODE_LOCAL(0), RACK_LOCAL(1), OFF_SWITCH(2);
public int index;

private NodeType(int index) {
this.index = index;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
Expand Down Expand Up @@ -78,7 +79,7 @@ public class SchedulerApplicationAttempt {
private long lastVcoreSeconds = 0;

protected final AppSchedulingInfo appSchedulingInfo;

protected ApplicationAttemptId attemptId;
protected Map<ContainerId, RMContainer> liveContainers =
new HashMap<ContainerId, RMContainer>();
protected final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
Expand Down Expand Up @@ -132,6 +133,7 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
activeUsersManager, rmContext.getEpoch());
this.queue = queue;
this.pendingRelease = new HashSet<ContainerId>();
this.attemptId = applicationAttemptId;
if (rmContext.getRMApps() != null &&
rmContext.getRMApps()
.containsKey(applicationAttemptId.getApplicationId())) {
Expand Down Expand Up @@ -619,4 +621,15 @@ public synchronized void recoverContainer(RMContainer rmContainer) {
// schedulingOpportunities
// lastScheduledContainer
}

public void incNumAllocatedContainers(NodeType containerType,
NodeType requestType) {
RMAppAttempt attempt =
rmContext.getRMApps().get(attemptId.getApplicationId())
.getCurrentAppAttempt();
if (attempt != null) {
attempt.getRMAppAttemptMetrics().incNumAllocatedContainers(containerType,
requestType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.TreeSet;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
Expand Down Expand Up @@ -1242,15 +1243,25 @@ private CSAssignment assignContainersOnNode(Resource clusterResource,
RMContainer reservedContainer, boolean needToUnreserve) {
Resource assigned = Resources.none();

NodeType requestType = null;
MutableObject allocatedContainer = new MutableObject();
// Data-local
ResourceRequest nodeLocalResourceRequest =
application.getResourceRequest(priority, node.getNodeName());
if (nodeLocalResourceRequest != null) {
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, application, priority, reservedContainer, needToUnreserve);
if (Resources.greaterThan(resourceCalculator, clusterResource,
requestType = NodeType.NODE_LOCAL;
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, application, priority, reservedContainer, needToUnreserve,
allocatedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {

//update locality statistics
if (allocatedContainer.getValue() != null) {
application.incNumAllocatedContainers(NodeType.NODE_LOCAL,
requestType);
}
return new CSAssignment(assigned, NodeType.NODE_LOCAL);
}
}
Expand All @@ -1262,12 +1273,23 @@ private CSAssignment assignContainersOnNode(Resource clusterResource,
if (!rackLocalResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}

assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, application, priority, reservedContainer, needToUnreserve);
if (Resources.greaterThan(resourceCalculator, clusterResource,

if (requestType != NodeType.NODE_LOCAL) {
requestType = NodeType.RACK_LOCAL;
}

assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, application, priority, reservedContainer, needToUnreserve,
allocatedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {

//update locality statistics
if (allocatedContainer.getValue() != null) {
application.incNumAllocatedContainers(NodeType.RACK_LOCAL,
requestType);
}
return new CSAssignment(assigned, NodeType.RACK_LOCAL);
}
}
Expand All @@ -1279,11 +1301,21 @@ private CSAssignment assignContainersOnNode(Resource clusterResource,
if (!offSwitchResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}
if (requestType != NodeType.NODE_LOCAL
&& requestType != NodeType.RACK_LOCAL) {
requestType = NodeType.OFF_SWITCH;
}

assigned =
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, application, priority, reservedContainer, needToUnreserve,
allocatedContainer);

return new CSAssignment(assignOffSwitchContainers(clusterResource,
offSwitchResourceRequest, node, application, priority,
reservedContainer, needToUnreserve),
NodeType.OFF_SWITCH);
// update locality statistics
if (allocatedContainer.getValue() != null) {
application.incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType);
}
return new CSAssignment(assigned, NodeType.OFF_SWITCH);
}

return SKIP_ASSIGNMENT;
Expand Down Expand Up @@ -1370,40 +1402,43 @@ protected boolean checkLimitsToReserve(Resource clusterResource,
private Resource assignNodeLocalContainers(Resource clusterResource,
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve) {
if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
RMContainer reservedContainer, boolean needToUnreserve,
MutableObject allocatedContainer) {
if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
needToUnreserve);
needToUnreserve, allocatedContainer);
}

return Resources.none();
}

private Resource assignRackLocalContainers(Resource clusterResource,
ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve) {
if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
private Resource assignRackLocalContainers(
Resource clusterResource, ResourceRequest rackLocalResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve,
MutableObject allocatedContainer) {
if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
needToUnreserve);
needToUnreserve, allocatedContainer);
}

return Resources.none();
}

private Resource assignOffSwitchContainers(Resource clusterResource,
ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
private Resource assignOffSwitchContainers(
Resource clusterResource, ResourceRequest offSwitchResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve,
MutableObject allocatedContainer) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
needToUnreserve);
needToUnreserve, allocatedContainer);
}

return Resources.none();
Expand Down Expand Up @@ -1487,7 +1522,7 @@ Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node,
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer,
boolean needToUnreserve) {
boolean needToUnreserve, MutableObject createdContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " application=" + application.getApplicationId()
Expand Down Expand Up @@ -1592,7 +1627,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
" container=" + container +
" queue=" + this +
" clusterResource=" + clusterResource);

createdContainer.setValue(allocatedContainer);
return container.getResource();
} else {
// if we are allowed to allocate but this node doesn't have space, reserve it or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,18 +204,55 @@ protected void render(Block html) {
table._();
div._();

createContainerLocalityTable(html, attemptMetrics);
createResourceRequestsTable(html, app);
}

private void createContainerLocalityTable(Block html,
RMAppAttemptMetrics attemptMetrics) {
if (attemptMetrics == null) {
return;
}

DIV<Hamlet> div = html.div(_INFO_WRAP);
TABLE<DIV<Hamlet>> table =
div.h3(
"Total Allocated Containers: "
+ attemptMetrics.getTotalAllocatedContainers()).h3("Each table cell"
+ " represents the number of NodeLocal/RackLocal/OffSwitch containers"
+ " satisfied by NodeLocal/RackLocal/OffSwitch resource requests.").table(
"#containerLocality");
table.
tr().
th(_TH, "").
th(_TH, "Node Local Request").
th(_TH, "Rack Local Request").
th(_TH, "Off Switch Request").
_();

String[] containersType =
{ "Num Node Local Containers (satisfied by)", "Num Rack Local Containers (satisfied by)",
"Num Off Switch Containers (satisfied by)" };
boolean odd = false;
for (int i = 0; i < attemptMetrics.getLocalityStatistics().length; i++) {
table.tr((odd = !odd) ? _ODD : _EVEN).td(containersType[i])
.td(String.valueOf(attemptMetrics.getLocalityStatistics()[i][0]))
.td(i == 0 ? "" : String.valueOf(attemptMetrics.getLocalityStatistics()[i][1]))
.td(i <= 1 ? "" : String.valueOf(attemptMetrics.getLocalityStatistics()[i][2]))._();
}
table._();
div._();
}

private void createResourceRequestsTable(Block html, AppInfo app) {
TBODY<TABLE<Hamlet>> tbody =
html.table("#ResourceRequests").thead().tr()
.th(".priority", "Priority")
.th(".resourceName", "ResourceName")
.th(".resourceName", "Resource Name")
.th(".totalResource", "Capability")
.th(".numContainers", "NumContainers")
.th(".relaxLocality", "RelaxLocality")
.th(".nodeLabelExpression", "NodeLabelExpression")._()._().tbody();
.th(".numContainers", "Num Containers")
.th(".relaxLocality", "Relax Locality")
.th(".nodeLabelExpression", "Node Label Expression")._()._().tbody();

Resource totalResource = Resource.newInstance(0, 0);
if (app.getResourceRequests() != null) {
Expand Down
Loading

0 comments on commit 0a502c6

Please sign in to comment.