Skip to content

Commit c5764a9

Browse files
committed
YARN-11878. AsyncDispatcher event queue backlog with millions of STATUS_UPDATE events
1 parent be548a4 commit c5764a9

File tree

1 file changed

+13
-7
lines changed
  • hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode

1 file changed

+13
-7
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
3737

3838
import org.apache.commons.collections4.keyvalue.DefaultMapEntry;
39+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
3940
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
4041
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
4142
import org.slf4j.Logger;
@@ -139,6 +140,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
139140
private Integer decommissioningTimeout;
140141

141142
private long timeStamp;
143+
private final boolean opportunisticContainersEnabled;
142144
/* Aggregated resource utilization for the containers. */
143145
private ResourceUtilization containersUtilization;
144146
/* Resource utilization for the node. */
@@ -407,6 +409,8 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
407409
this.healthReport = "Healthy";
408410
this.lastHealthReportTime = System.currentTimeMillis();
409411
this.nodeManagerVersion = nodeManagerVersion;
412+
this.opportunisticContainersEnabled = YarnConfiguration
413+
.isOpportunisticContainerAllocationEnabled(context.getYarnConfiguration());
410414
this.timeStamp = 0;
411415
// If physicalResource is not available, capability is a reasonable guess
412416
this.physicalResource = physResource==null ? capability : physResource;
@@ -1583,7 +1587,6 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
15831587
List<Map.Entry<ApplicationId, ContainerStatus>> needUpdateContainers =
15841588
new ArrayList<Map.Entry<ApplicationId, ContainerStatus>>();
15851589
int numRemoteRunningContainers = 0;
1586-
final Resource allocatedResource = Resource.newInstance(Resources.none());
15871590

15881591
for (ContainerStatus remoteContainer : containerStatuses) {
15891592
ContainerId containerId = remoteContainer.getContainerId();
@@ -1654,15 +1657,18 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
16541657
.unregister(new AllocationExpirationInfo(containerId));
16551658
}
16561659

1657-
if ((remoteContainer.getState() == ContainerState.RUNNING ||
1658-
remoteContainer.getState() == ContainerState.NEW) &&
1659-
remoteContainer.getCapability() != null) {
1660-
Resources.addTo(allocatedResource, remoteContainer.getCapability());
1660+
if (opportunisticContainersEnabled){
1661+
Resource allocatedResource = Resource.newInstance(Resources.none());
1662+
Resource capability = remoteContainer.getCapability();
1663+
if ((remoteContainer.getState() == ContainerState.RUNNING ||
1664+
remoteContainer.getState() == ContainerState.NEW) &&
1665+
remoteContainer.getCapability() != null) {
1666+
Resources.addTo(allocatedResource, remoteContainer.getCapability());
1667+
}
1668+
allocatedContainerResource = allocatedResource;
16611669
}
16621670
}
16631671

1664-
allocatedContainerResource = allocatedResource;
1665-
16661672
List<ContainerStatus> lostContainers =
16671673
findLostContainers(numRemoteRunningContainers, containerStatuses);
16681674
for (ContainerStatus remoteContainer : lostContainers) {

0 commit comments

Comments
 (0)