Skip to content

Commit 0461a07

Browse files
committed
YARN-10475: Scale RM-NM heartbeat interval based on node utilization. Contributed by Jim Brennan (Jim_Brennan).
1 parent deea5d8 commit 0461a07

File tree

14 files changed

+417
-12
lines changed

14 files changed

+417
-12
lines changed

hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,13 @@ public boolean isUpdatedCapability() {
246246
@Override
247247
public void resetUpdatedCapability() {
248248
}
249+
250+
@Override
251+
public long calculateHeartBeatInterval(
252+
long defaultInterval, long minInterval, long maxInterval,
253+
float speedupFactor, float slowdownFactor) {
254+
return defaultInterval;
255+
}
249256
}
250257

251258
public static RMNode newNodeInfo(String rackName, String hostName,

hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,4 +231,11 @@ public boolean isUpdatedCapability() {
231231
@Override
232232
public void resetUpdatedCapability() {
233233
}
234+
235+
@Override
236+
public long calculateHeartBeatInterval(
237+
long defaultInterval, long minInterval, long maxInterval,
238+
float speedupFactor, float slowdownFactor) {
239+
return defaultInterval;
240+
}
234241
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,30 @@ public static boolean isAclEnabled(Configuration conf) {
688688
RM_PREFIX + "nodemanagers.heartbeat-interval-ms";
689689
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS = 1000;
690690

691+
/** Enable Heartbeat Interval Scaling based on cpu utilization. */
692+
public static final String RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE =
693+
RM_PREFIX + "nodemanagers.heartbeat-interval-scaling-enable";
694+
public static final boolean
695+
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE = false;
696+
697+
public static final String RM_NM_HEARTBEAT_INTERVAL_MIN_MS =
698+
RM_PREFIX + "nodemanagers.heartbeat-interval-min-ms";
699+
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MIN_MS = 1000;
700+
701+
public static final String RM_NM_HEARTBEAT_INTERVAL_MAX_MS =
702+
RM_PREFIX + "nodemanagers.heartbeat-interval-max-ms";
703+
public static final long DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MAX_MS = 1000;
704+
705+
public static final String RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR =
706+
RM_PREFIX + "nodemanagers.heartbeat-interval-speedup-factor";
707+
public static final float
708+
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR = 1.0f;
709+
710+
public static final String RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR =
711+
RM_PREFIX + "nodemanagers.heartbeat-interval-slowdown-factor";
712+
public static final float
713+
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR = 1.0f;
714+
691715
/** Number of worker threads that write the history data. */
692716
public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
693717
RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size";

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,56 @@
860860
<value>1000</value>
861861
</property>
862862

863+
<property>
864+
<description>Enables heart-beat interval scaling. The NodeManager
865+
heart-beat interval will scale based on the difference between the CPU
866+
utilization on the node and the cluster-wide average CPU utilization.
867+
</description>
868+
<name>
869+
yarn.resourcemanager.nodemanagers.heartbeat-interval-scaling-enable
870+
</name>
871+
<value>false</value>
872+
</property>
873+
874+
<property>
875+
<description>If heart-beat interval scaling is enabled, this is the
876+
minimum heart-beat interval in milliseconds
877+
</description>
878+
<name>yarn.resourcemanager.nodemanagers.heartbeat-interval-min-ms</name>
879+
<value>1000</value>
880+
</property>
881+
882+
<property>
883+
<description>If heart-beat interval scaling is enabled, this is the
884+
maximum heart-beat interval in milliseconds</description>
885+
<name>yarn.resourcemanager.nodemanagers.heartbeat-interval-max-ms</name>
886+
<value>1000</value>
887+
</property>
888+
889+
<property>
890+
<description>If heart-beat interval scaling is enabled, this controls
891+
the degree of adjustment when speeding up heartbeat intervals.
892+
At 1.0, 20% less than average CPU utilization will result in a 20%
893+
decrease in heartbeat interval.
894+
</description>
895+
<name>
896+
yarn.resourcemanager.nodemanagers.heartbeat-interval-speedup-factor
897+
</name>
898+
<value>1.0</value>
899+
</property>
900+
901+
<property>
902+
<description>If heart-beat interval scaling is enabled, this controls
903+
the degree of adjustment when slowing down heartbeat intervals.
904+
At 1.0, 20% greater than average CPU utilization will result in a 20%
905+
increase in heartbeat interval.
906+
</description>
907+
<name>
908+
yarn.resourcemanager.nodemanagers.heartbeat-interval-slowdown-factor
909+
</name>
910+
<value>1.0</value>
911+
</property>
912+
863913
<property>
864914
<description>The minimum allowed version of a connecting nodemanager. The valid values are
865915
NONE (no version checking), EqualToRM (the nodemanager's version is equal to

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,14 @@ public RefreshNodesResourcesResponse refreshNodesResources(
730730
// refresh dynamic resource in ResourceTrackerService
731731
this.rm.getRMContext().getResourceTrackerService().
732732
updateDynamicResourceConfiguration(newConf);
733+
734+
// Update our heartbeat configuration as well
735+
Configuration ysconf =
736+
getConfiguration(new Configuration(false),
737+
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
738+
this.rm.getRMContext().getResourceTrackerService()
739+
.updateHeartBeatConfiguration(ysconf);
740+
733741
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
734742
"AdminService");
735743
return response;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
3333
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
3434
import org.apache.hadoop.metrics2.lib.MutableRate;
35+
import org.apache.hadoop.yarn.api.records.Resource;
3536
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
3637

3738
@InterfaceAudience.Private
@@ -53,6 +54,8 @@ public class ClusterMetrics {
5354
private MutableRate aMContainerAllocationDelay;
5455
@Metric("Memory Utilization") MutableGaugeLong utilizedMB;
5556
@Metric("Vcore Utilization") MutableGaugeLong utilizedVirtualCores;
57+
@Metric("Memory Capability") MutableGaugeLong capabilityMB;
58+
@Metric("Vcore Capability") MutableGaugeLong capabilityVirtualCores;
5659

5760
private static final MetricsInfo RECORD_INFO = info("ClusterMetrics",
5861
"Metrics for the Yarn Cluster");
@@ -83,7 +86,7 @@ private static void registerMetrics() {
8386
}
8487

8588
@VisibleForTesting
86-
synchronized static void destroy() {
89+
public synchronized static void destroy() {
8790
isInitialized.set(false);
8891
INSTANCE = null;
8992
}
@@ -195,6 +198,28 @@ public void addAMRegisterDelay(long delay) {
195198
aMRegisterDelay.add(delay);
196199
}
197200

201+
public long getCapabilityMB() {
202+
return capabilityMB.value();
203+
}
204+
205+
public long getCapabilityVirtualCores() {
206+
return capabilityVirtualCores.value();
207+
}
208+
209+
public void incrCapability(Resource res) {
210+
if (res != null) {
211+
capabilityMB.incr(res.getMemorySize());
212+
capabilityVirtualCores.incr(res.getVirtualCores());
213+
}
214+
}
215+
216+
public void decrCapability(Resource res) {
217+
if (res != null) {
218+
capabilityMB.decr(res.getMemorySize());
219+
capabilityVirtualCores.decr(res.getVirtualCores());
220+
}
221+
}
222+
198223
public void addAMContainerAllocationDelay(long delay) {
199224
aMContainerAllocationDelay.add(delay);
200225
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

Lines changed: 94 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import org.apache.hadoop.yarn.api.records.NodeAttribute;
5656
import org.apache.hadoop.yarn.conf.YarnConfiguration;
5757
import org.apache.hadoop.yarn.exceptions.YarnException;
58-
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
5958
import org.apache.hadoop.yarn.factories.RecordFactory;
6059
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
6160
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -114,6 +113,13 @@ public class ResourceTrackerService extends AbstractService implements
114113
private final WriteLock writeLock;
115114

116115
private long nextHeartBeatInterval;
116+
private boolean heartBeatIntervalScalingEnable;
117+
private long heartBeatIntervalMin;
118+
private long heartBeatIntervalMax;
119+
private float heartBeatIntervalSpeedupFactor;
120+
private float heartBeatIntervalSlowdownFactor;
121+
122+
117123
private Server server;
118124
private InetSocketAddress resourceTrackerAddress;
119125
private String minimumNodeManagerVersion;
@@ -157,14 +163,6 @@ protected void serviceInit(Configuration conf) throws Exception {
157163
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
158164

159165
RackResolver.init(conf);
160-
nextHeartBeatInterval =
161-
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
162-
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
163-
if (nextHeartBeatInterval <= 0) {
164-
throw new YarnRuntimeException("Invalid Configuration. "
165-
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS
166-
+ " should be larger than 0.");
167-
}
168166

169167
checkIpHostnameInRegistration = conf.getBoolean(
170168
YarnConfiguration.RM_NM_REGISTRATION_IP_HOSTNAME_CHECK_KEY,
@@ -188,7 +186,7 @@ protected void serviceInit(Configuration conf) throws Exception {
188186
isDelegatedCentralizedNodeLabelsConf =
189187
YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf);
190188
}
191-
189+
updateHeartBeatConfiguration(conf);
192190
loadDynamicResourceConfiguration(conf);
193191
decommissioningWatcher.init(conf);
194192
super.serviceInit(conf);
@@ -233,6 +231,84 @@ public void updateDynamicResourceConfiguration(
233231
}
234232
}
235233

234+
/**
235+
* Update HearBeatConfiguration with new configuration.
236+
* @param conf Yarn Configuration
237+
*/
238+
public void updateHeartBeatConfiguration(Configuration conf) {
239+
this.writeLock.lock();
240+
try {
241+
nextHeartBeatInterval =
242+
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
243+
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
244+
heartBeatIntervalScalingEnable =
245+
conf.getBoolean(
246+
YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE,
247+
YarnConfiguration.
248+
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SCALING_ENABLE);
249+
heartBeatIntervalMin =
250+
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MIN_MS,
251+
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MIN_MS);
252+
heartBeatIntervalMax =
253+
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MAX_MS,
254+
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MAX_MS);
255+
heartBeatIntervalSpeedupFactor =
256+
conf.getFloat(
257+
YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR,
258+
YarnConfiguration.
259+
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR);
260+
heartBeatIntervalSlowdownFactor =
261+
conf.getFloat(
262+
YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR,
263+
YarnConfiguration.
264+
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR);
265+
266+
if (nextHeartBeatInterval <= 0) {
267+
LOG.warn("HeartBeat interval: " + nextHeartBeatInterval
268+
+ " must be greater than 0, using default.");
269+
nextHeartBeatInterval =
270+
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS;
271+
}
272+
273+
if (heartBeatIntervalScalingEnable) {
274+
if (heartBeatIntervalMin <= 0
275+
|| heartBeatIntervalMin > heartBeatIntervalMax
276+
|| nextHeartBeatInterval < heartBeatIntervalMin
277+
|| nextHeartBeatInterval > heartBeatIntervalMax) {
278+
LOG.warn("Invalid NM Heartbeat Configuration. "
279+
+ "Required: 0 < minimum <= interval <= maximum. Got: 0 < "
280+
+ heartBeatIntervalMin + " <= "
281+
+ nextHeartBeatInterval + " <= "
282+
+ heartBeatIntervalMax
283+
+ " Setting min and max to configured interval.");
284+
heartBeatIntervalMin = nextHeartBeatInterval;
285+
heartBeatIntervalMax = nextHeartBeatInterval;
286+
}
287+
if (heartBeatIntervalSpeedupFactor < 0
288+
|| heartBeatIntervalSlowdownFactor < 0) {
289+
LOG.warn(
290+
"Heartbeat scaling factors must be >= 0 "
291+
+ " SpeedupFactor:" + heartBeatIntervalSpeedupFactor
292+
+ " SlowdownFactor:" + heartBeatIntervalSlowdownFactor
293+
+ ". Using Defaults");
294+
heartBeatIntervalSlowdownFactor =
295+
YarnConfiguration.
296+
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR;
297+
heartBeatIntervalSpeedupFactor =
298+
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SPEEDUP_FACTOR;
299+
}
300+
LOG.info("Heartbeat Scaling Configuration: "
301+
+ " defaultInterval:" + nextHeartBeatInterval
302+
+ " minimumInterval:" + heartBeatIntervalMin
303+
+ " maximumInterval:" + heartBeatIntervalMax
304+
+ " speedupFactor:" + heartBeatIntervalSpeedupFactor
305+
+ " slowdownFactor:" + heartBeatIntervalSlowdownFactor);
306+
}
307+
} finally {
308+
this.writeLock.unlock();
309+
}
310+
}
311+
236312
@Override
237313
protected void serviceStart() throws Exception {
238314
super.serviceStart();
@@ -629,10 +705,17 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
629705
}
630706

631707
// Heartbeat response
708+
long newInterval = nextHeartBeatInterval;
709+
if (heartBeatIntervalScalingEnable) {
710+
newInterval = rmNode.calculateHeartBeatInterval(
711+
nextHeartBeatInterval, heartBeatIntervalMin,
712+
heartBeatIntervalMax, heartBeatIntervalSpeedupFactor,
713+
heartBeatIntervalSlowdownFactor);
714+
}
632715
NodeHeartbeatResponse nodeHeartBeatResponse =
633716
YarnServerBuilderUtils.newNodeHeartbeatResponse(
634717
getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
635-
NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
718+
NodeAction.NORMAL, null, null, null, null, newInterval);
636719
rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
637720

638721
populateKeys(request, nodeHeartBeatResponse);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,4 +212,8 @@ public interface RMNode {
212212
* @return all node attributes as a Set.
213213
*/
214214
Set<NodeAttribute> getAllNodeAttributes();
215+
216+
long calculateHeartBeatInterval(long defaultInterval,
217+
long minInterval, long maxInterval, float speedupFactor,
218+
float slowdownFactor);
215219
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,48 @@ public void resetLastNodeHeartBeatResponse() {
716716
}
717717
}
718718

719+
@Override
720+
public long calculateHeartBeatInterval(long defaultInterval, long minInterval,
721+
long maxInterval, float speedupFactor, float slowdownFactor) {
722+
723+
long newInterval = defaultInterval;
724+
725+
ClusterMetrics metrics = ClusterMetrics.getMetrics();
726+
float clusterUtil = metrics.getUtilizedVirtualCores()
727+
/ Math.max(1.0f, metrics.getCapabilityVirtualCores());
728+
729+
if (this.nodeUtilization != null && this.getPhysicalResource() != null) {
730+
// getCPU() returns utilization normalized to 1 cpu. getVirtualCores() on
731+
// a physicalResource returns number of physical cores. So,
732+
// nodeUtil will be CPU utilization of entire node.
733+
float nodeUtil = this.nodeUtilization.getCPU()
734+
/ Math.max(1.0f, this.getPhysicalResource().getVirtualCores());
735+
736+
// sanitize
737+
nodeUtil = Math.min(1.0f, Math.max(0.0f, nodeUtil));
738+
clusterUtil = Math.min(1.0f, Math.max(0.0f, clusterUtil));
739+
740+
if (nodeUtil > clusterUtil) {
741+
// Slow down - 20% more CPU utilization means slow down by 20% * factor
742+
newInterval = (long) (defaultInterval
743+
* (1.0f + (nodeUtil - clusterUtil) * slowdownFactor));
744+
} else {
745+
// Speed up - 20% less CPU utilization means speed up by 20% * factor
746+
newInterval = (long) (defaultInterval
747+
* (1.0f - (clusterUtil - nodeUtil) * speedupFactor));
748+
}
749+
newInterval =
750+
Math.min(maxInterval, Math.max(minInterval, newInterval));
751+
752+
if (LOG.isDebugEnabled()) {
753+
LOG.debug("Setting heartbeatinterval to: " + newInterval
754+
+ " node:" + this.nodeId + " nodeUtil: " + nodeUtil
755+
+ " clusterUtil: " + clusterUtil);
756+
}
757+
}
758+
return newInterval;
759+
}
760+
719761
public void handle(RMNodeEvent event) {
720762
LOG.debug("Processing {} of type {}", event.getNodeId(), event.getType());
721763
writeLock.lock();

0 commit comments

Comments
 (0)