Skip to content

Commit

Permalink
YARN-9425. Make initialDelay configurable for FederationStateStoreSer…
Browse files Browse the repository at this point in the history
…vice#scheduledExecutorService (#4731). Contributed by  groot and Shen Yinjie.

Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
  • Loading branch information
ashutoshcipher authored Aug 21, 2022
1 parent 7f176d0 commit c294a41
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3920,6 +3920,13 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
"yarnfederation/";

public static final String FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY =
FEDERATION_PREFIX + "state-store.heartbeat.initial-delay";

// 30 secs
public static final int
DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY = 30;

public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3624,6 +3624,16 @@
<name>yarn.federation.enabled</name>
<value>false</value>
</property>
<property>
<description>
Initial delay for federation state-store heartbeat service. Value is followed by a unit
specifier: ns, us, ms, s, m, h, d for nanoseconds, microseconds, milliseconds, seconds,
minutes, hours, days respectively. Values should provide units,
but seconds are assumed
</description>
<name>yarn.federation.state-store.heartbeat.initial-delay</name>
<value>30s</value>
</property>
<property>
<description>
Machine list file to be loaded by the FederationSubCluster Resolver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class FederationStateStoreService extends AbstractService
private FederationStateStore stateStoreClient = null;
private SubClusterId subClusterId;
private long heartbeatInterval;
private long heartbeatInitialDelay;
private RMContext rmContext;

public FederationStateStoreService(RMContext rmContext) {
Expand Down Expand Up @@ -126,10 +127,24 @@ protected void serviceInit(Configuration conf) throws Exception {
heartbeatInterval = conf.getLong(
YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);

if (heartbeatInterval <= 0) {
heartbeatInterval =
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS;
}

heartbeatInitialDelay = conf.getTimeDuration(
YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY,
TimeUnit.SECONDS);

if (heartbeatInitialDelay <= 0) {
LOG.warn("{} configured value is wrong, must be > 0; using default value of {}",
YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY);
heartbeatInitialDelay =
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY;
}
LOG.info("Initialized federation membership service.");

super.serviceInit(conf);
Expand Down Expand Up @@ -206,9 +221,9 @@ private void registerAndInitializeHeartbeat() {
scheduledExecutorService =
HadoopExecutors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleWithFixedDelay(stateStoreHeartbeat,
heartbeatInterval, heartbeatInterval, TimeUnit.SECONDS);
LOG.info("Started federation membership heartbeat with interval: {}",
heartbeatInterval);
heartbeatInitialDelay, heartbeatInterval, TimeUnit.SECONDS);
LOG.info("Started federation membership heartbeat with interval: {} and initial delay: {}",
heartbeatInterval, heartbeatInitialDelay);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
Expand Down Expand Up @@ -173,4 +174,37 @@ private String checkSubClusterInfo(SubClusterState state)
return response.getCapability();
}

@Test
public void testFederationStateStoreServiceInitialHeartbeatDelay() throws Exception {
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10);
conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());

GenericTestUtils.LogCapturer logCapture =
GenericTestUtils.LogCapturer.captureLogs(FederationStateStoreService.LOG);

final MockRM rm = new MockRM(conf);

// Initially there should be no entry for the sub-cluster
rm.init(conf);
stateStore = rm.getFederationStateStoreService().getStateStoreClient();
GetSubClusterInfoResponse response = stateStore.getSubCluster(request);
Assert.assertNull(response);

// Validate if sub-cluster is registered
rm.start();
String capability = checkSubClusterInfo(SubClusterState.SC_NEW);
Assert.assertTrue(capability.isEmpty());

// Heartbeat to see if sub-cluster transitions to running
FederationStateStoreHeartbeat storeHeartbeat =
rm.getFederationStateStoreService().getStateStoreHeartbeatThread();
storeHeartbeat.run();
capability = checkSubClusterInfo(SubClusterState.SC_RUNNING);
checkClusterMetricsInfo(capability, 0);

Assert.assertTrue(logCapture.getOutput().contains(
"Started federation membership heartbeat with interval: 300 and initial delay: 10"));
rm.stop();
}
}

0 comments on commit c294a41

Please sign in to comment.