Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ public class ResourceTrackerService extends AbstractService implements
private boolean checkIpHostnameInRegistration;
private boolean timelineServiceV2Enabled;

private String clusterId;

public ResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager,
NMLivelinessMonitor nmLivelinessMonitor,
Expand Down Expand Up @@ -264,6 +266,7 @@ protected void serviceStart() throws Exception {
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
server.getListenerAddress());
clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID, YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
}

@Override
Expand Down Expand Up @@ -419,7 +422,7 @@ public RegisterNodeManagerResponse registerNodeManager(
.getCurrentKey());

RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
resolve(host), capability, nodeManagerVersion, physicalResource);
resolve(host), capability, nodeManagerVersion, physicalResource, clusterId);

RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ public interface RMNode {
* @return hostname of this node
*/
public String getHostName();


/**
* the clusterId of this node
* @return clusterId of this node
*/
public String getClusterID();

/**
* the command port for this node
* @return command port for this node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import org.apache.commons.collections.keyvalue.DefaultMapEntry;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
Expand Down Expand Up @@ -119,6 +120,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private final NodeId nodeId;
private final RMContext context;
private final String hostName;
private String clusterId;
private final int commandPort;
private int httpPort;
private final String nodeAddress; // The containerManager address
Expand Down Expand Up @@ -378,12 +380,12 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
int cmPort, int httpPort, Node node, Resource capability,
String nodeManagerVersion) {
this(nodeId, context, hostName, cmPort, httpPort, node, capability,
nodeManagerVersion, null);
nodeManagerVersion, null, YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
}

public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
int cmPort, int httpPort, Node node, Resource capability,
String nodeManagerVersion, Resource physResource) {
String nodeManagerVersion, Resource physResource, String clusterId) {
this.nodeId = nodeId;
this.context = context;
this.hostName = hostName;
Expand All @@ -410,6 +412,8 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();

this.containerAllocationExpirer = context.getContainerAllocationExpirer();

this.clusterId = clusterId;
}

@Override
Expand All @@ -422,6 +426,11 @@ public String getHostName() {
return hostName;
}

@Override
public String getClusterID() {
return clusterId;
}

@Override
public int getCommandPort() {
return commandPort;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class NodeInfo {
protected ResourceInfo availableResource;
protected NodeAttributesInfo nodeAttributesInfo;
private ResourceInfo totalResource;
private String clusterId;

public NodeInfo() {
} // JAXB needs this
Expand Down Expand Up @@ -135,6 +136,7 @@ public NodeInfo(RMNode ni, ResourceScheduler sched) {

// update node and containers resource utilization
this.resourceUtilization = new ResourceUtilizationInfo(ni);
this.clusterId = ni.getClusterID();
}

public String getRack() {
Expand Down Expand Up @@ -251,4 +253,8 @@ public void setTotalResource(ResourceInfo total) {
public ResourceInfo getTotalResource() {
return this.totalResource;
}

public String getClusterId() {
return clusterId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
Expand Down Expand Up @@ -120,6 +121,7 @@ private static class MockRMNodeImpl implements RMNode {
private ResourceUtilization nodeUtilization;
private Resource physicalResource;
private RMContext rmContext;
private String clusterId;

MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
Resource perNode, String rackName, String healthReport,
Expand All @@ -141,6 +143,7 @@ private static class MockRMNodeImpl implements RMNode {
this.containersUtilization = containersUtilization;
this.nodeUtilization = nodeUtilization;
this.physicalResource = pPhysicalResource;
this.clusterId = YarnConfiguration.DEFAULT_RM_CLUSTER_ID;
}

public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
Expand All @@ -155,6 +158,20 @@ public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
containersUtilization, nodeUtilization, pPhysicalResource);
this.rmContext = rmContext;
}

public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
Resource perNode, String rackName, String healthReport,
long lastHealthReportTime, int cmdPort, String hostName,
NodeState state, Set<String> labels,
ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization, Resource pPhysicalResource,
String clusterId) {
this(nodeId, nodeAddr, httpAddress, perNode, rackName, healthReport,
lastHealthReportTime, cmdPort, hostName, state, labels,
containersUtilization, nodeUtilization, pPhysicalResource);
this.clusterId = clusterId;
}

@Override
public NodeId getNodeID() {
return this.nodeId;
Expand All @@ -165,6 +182,11 @@ public String getHostName() {
return this.hostName;
}

@Override
public String getClusterID() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs some tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add a test in TestNodesListManager

return clusterId;
}

@Override
public int getCommandPort() {
return this.cmdPort;
Expand Down Expand Up @@ -386,6 +408,26 @@ private static RMNode buildRMNode(int rack, final Resource perNode,
nodeUtilization, physicalResource, rmContext);
}

private static RMNode buildRMNode(int rack, final Resource perNode,
NodeState state, String httpAddr, int hostnum, String hostName, int port,
Set<String> labels, ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization, Resource physicalResource,
String clusterId) {
final String rackName = "rack" + rack;
final int nid = hostnum;
final String nodeAddr = hostName + ":" + nid;
if (hostName == null) {
hostName = "host" + nid;
}
final NodeId nodeID = NodeId.newInstance(hostName, port);

final String httpAddress = httpAddr;
String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe";
return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName,
healthReport, 0, nid, hostName, state, labels, containersUtilization,
nodeUtilization, physicalResource, clusterId);
}

public static RMNode nodeInfo(int rack, final Resource perNode,
NodeState state) {
return buildRMNode(rack, perNode, state, "N/A");
Expand Down Expand Up @@ -419,4 +461,10 @@ public static RMNode newNodeInfo(int rack, final Resource perNode,
return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum,
hostName, port, null, null, null, null, rmContext);
}

public static RMNode newNodeInfo(int rack, final Resource perNode,
int hostnum, String hostName, int port, String clusterID) {
return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, hostName, port,
null, null, null, null, clusterID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,18 @@ public boolean matches(AbstractEvent argument) {
};
}

@Test
public void testClusterId() throws Exception {

YarnConfiguration conf = new YarnConfiguration();
conf.setStrings(YarnConfiguration.RM_CLUSTER_ID, "c1");

RMNode rmnode1 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
1, "Host1", 1234);
RMNode rmnode2 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
1, "Host2", 1234, conf.get(YarnConfiguration.RM_CLUSTER_ID));
Assert.assertEquals("yarn_cluster", rmnode1.getClusterID());
Assert.assertEquals("c1", rmnode2.getClusterID());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ protected void render(Block html) {
setTitle("Nodes");

TBODY<TABLE<Hamlet>> tbody = html.table("#nodes").thead().tr()
.th(".subCluster", "SubCluster")
.th(".nodelabels", "Node Labels")
.th(".rack", "Rack")
.th(".state", "Node State")
Expand All @@ -81,6 +82,7 @@ protected void render(Block html) {
int usedMemory = (int) info.getUsedMemory();
int availableMemory = (int) info.getAvailableMemory();
TR<TBODY<TABLE<Hamlet>>> row = tbody.tr();
row.td().__(info.getClusterId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you attach a screenshot with the end result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I lost the screenshot

row.td().__(StringUtils.join(",", info.getNodeLabels())).__();
row.td().__(info.getRack()).__();
row.td().__(info.getState()).__();
Expand Down