Skip to content

HBASE-26180 Introduce a initial refresh interval for RpcConnectionReg… #3601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry
private final RegistryEndpointsRefresher registryEndpointRefresher;

protected AbstractRpcBasedConnectionRegistry(Configuration conf,
String hedgedReqsFanoutConfigName, String refreshIntervalSecsConfigName,
String minRefreshIntervalSecsConfigName) throws IOException {
String hedgedReqsFanoutConfigName, String initialRefreshDelaySecsConfigName,
String refreshIntervalSecsConfigName, String minRefreshIntervalSecsConfigName)
throws IOException {
this.hedgedReadFanOut =
Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT));
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
Expand All @@ -103,8 +104,9 @@ protected AbstractRpcBasedConnectionRegistry(Configuration conf,
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
populateStubs(getBootstrapNodes(conf));
// could return null here is refresh interval is less than zero
registryEndpointRefresher = RegistryEndpointsRefresher.create(conf,
refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
registryEndpointRefresher =
RegistryEndpointsRefresher.create(conf, initialRefreshDelaySecsConfigName,
refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
}

protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
"hbase.client.master_registry.hedged.fanout";

public static final String MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS =
"hbase.client.master_registry.initial_refresh_delay_secs";

public static final String MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.master_registry.refresh_interval_secs";

Expand All @@ -85,7 +88,7 @@ public static Set<ServerName> parseMasterAddrs(Configuration conf) throws Unknow
}

MasterRegistry(Configuration conf) throws IOException {
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS,
MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ final class RegistryEndpointsRefresher {

private final Thread thread;
private final Refresher refresher;
private final long initialDelayMs;
private final long periodicRefreshMs;
private final long minTimeBetweenRefreshesMs;

Expand All @@ -56,9 +57,20 @@ synchronized void stop() {
notifyAll();
}

private long getRefreshIntervalMs(boolean firstRefresh) {
if (refreshNow) {
return minTimeBetweenRefreshesMs;
}
if (firstRefresh) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this works ok for a large cluster once we populate the stubs of all the bootstrap nodes. But for a smaller cluster (<=10), populateStubs() returns everything. So its likely that even after the first refresh, all the subsequent refreshes hit the same bunch of nodes at the same time.

It looks to me like the right way to fix this is to add jitter to the refreshTime? jitter is a random sleep between (0, n) so that we spread out the calls, this is for all the refreshes and not just the first refresh, WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think jitter is used to solve another problem? It is always good to add a jitter to periodically requests to reduce the possibility to request at the same time.

For the purpose of this issue, let me just paste the description here first to see it can help you to understand better

Introduce a initial refresh interval for RpcConnectionRegistry so we can get the new list soon once we connect to the cluster.
As end users could configure any nodes in a cluster as the initial bootstrap nodes, it is possible that different end users will configure the same machine which makes the machine over load. So we should have a shorter delay for the initial refresh, to let users quickly switch to the bootstrap nodes we want them to connect to.

Free feel to ask if you have other concerns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry interpreted it as something else totally. I thought you were talking about what if all requests hit the servers at the same time and suggested jitter, my bad!

return initialDelayMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor edge case is initialDelayMs > periodicRefreshMs (like in a misconfiguration), that needs to be handled ?

(or) we can make initialDelayMs = periodicRefreshMs/4 or something like that, essentially a f(periodicRefreshMs) rather than a separate config, whatever works for you, I don't have a preference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a new config option is needed, but the default value could be periodicRefreshMs/4 or /5, if users explicitly set it, we will use the value set by users. WDYT?

}
return periodicRefreshMs;
}

// The main loop for the refresh thread.
private void mainLoop() {
long lastRefreshTime = EnvironmentEdgeManager.currentTime();
boolean firstRefresh = true;
for (;;) {
synchronized (this) {
for (;;) {
Expand All @@ -68,9 +80,12 @@ private void mainLoop() {
}
// if refreshNow is true, then we will wait until minTimeBetweenRefreshesMs elapsed,
// otherwise wait until periodicRefreshMs elapsed
long waitTime = (refreshNow ? minTimeBetweenRefreshesMs : periodicRefreshMs) -
long waitTime = getRefreshIntervalMs(firstRefresh) -
(EnvironmentEdgeManager.currentTime() - lastRefreshTime);
if (waitTime <= 0) {
// we are going to refresh, reset this flag
firstRefresh = false;
refreshNow = false;
break;
}
try {
Expand All @@ -81,8 +96,6 @@ private void mainLoop() {
continue;
}
}
// we are going to refresh, reset this flag
refreshNow = false;
}
LOG.debug("Attempting to refresh registry end points");
try {
Expand All @@ -104,8 +117,9 @@ public interface Refresher {
void refresh() throws IOException;
}

private RegistryEndpointsRefresher(long periodicRefreshMs, long minTimeBetweenRefreshesMs,
Refresher refresher) {
private RegistryEndpointsRefresher(long initialDelayMs, long periodicRefreshMs,
long minTimeBetweenRefreshesMs, Refresher refresher) {
this.initialDelayMs = initialDelayMs;
this.periodicRefreshMs = periodicRefreshMs;
this.minTimeBetweenRefreshesMs = minTimeBetweenRefreshesMs;
this.refresher = refresher;
Expand All @@ -129,16 +143,19 @@ synchronized void refreshNow() {
* {@code intervalSecsConfigName} is less than zero, will return null here, which means disable
* refreshing of endpoints.
*/
static RegistryEndpointsRefresher create(Configuration conf, String intervalSecsConfigName,
String minIntervalSecsConfigName, Refresher refresher) {
static RegistryEndpointsRefresher create(Configuration conf, String initialDelaySecsConfigName,
String intervalSecsConfigName, String minIntervalSecsConfigName, Refresher refresher) {
long periodicRefreshMs = TimeUnit.SECONDS
.toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
if (periodicRefreshMs <= 0) {
return null;
}
long initialDelayMs = Math.max(1,
TimeUnit.SECONDS.toMillis(conf.getLong(initialDelaySecsConfigName, periodicRefreshMs / 10)));
long minTimeBetweenRefreshesMs = TimeUnit.SECONDS
.toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
return new RegistryEndpointsRefresher(periodicRefreshMs, minTimeBetweenRefreshesMs, refresher);
return new RegistryEndpointsRefresher(initialDelayMs, periodicRefreshMs,
minTimeBetweenRefreshesMs, refresher);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
/** Configuration key that controls the fan out of requests **/
public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout";

/**
* As end users could configure any nodes in a cluster as the initial bootstrap nodes, it is
* possible that different end users will configure the same machine which makes the machine over
* load. So we should have a shorter delay for the initial refresh, to let users quickly switch to
* the bootstrap nodes we want them to connect to.
* <p/>
* The default value for initial refresh delay is 1/10 of periodic refresh interval.
*/
public static final String INITIAL_REFRESH_DELAY_SECS =
"hbase.client.bootstrap.initial_refresh_delay_secs";

public static final String PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.bootstrap.refresh_interval_secs";

Expand All @@ -62,7 +73,8 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
private static final char ADDRS_CONF_SEPARATOR = ',';

RpcConnectionRegistry(Configuration conf) throws IOException {
super(conf, HEDGED_REQS_FANOUT_KEY, PERIODIC_REFRESH_INTERVAL_SECS, MIN_SECS_BETWEEN_REFRESHES);
super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS,
MIN_SECS_BETWEEN_REFRESHES);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
*/
package org.apache.hadoop.hbase.client;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -46,6 +46,8 @@ public class TestRegistryEndpointsRefresher {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegistryEndpointsRefresher.class);

private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
"hbase.test.registry.initial.delay.secs";
private static final String INTERVAL_SECS_CONFIG_NAME =
"hbase.test.registry.refresh.interval.secs";
private static final String MIN_INTERVAL_SECS_CONFIG_NAME =
Expand Down Expand Up @@ -75,33 +77,45 @@ private void refresh() {
callTimestamps.add(EnvironmentEdgeManager.currentTime());
}

private void createRefresher(long intervalSecs, long minIntervalSecs) {
private void createRefresher(long initialDelaySecs, long intervalSecs, long minIntervalSecs) {
conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, initialDelaySecs);
conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
refresher = RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
refresher = RegistryEndpointsRefresher.create(conf, INITIAL_DELAY_SECS_CONFIG_NAME,
INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
}

@Test
public void testDisableRefresh() {
conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1);
assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
}

@Test
public void testPeriodicEndpointRefresh() throws IOException {
public void testInitialDelay() throws InterruptedException {
createRefresher(1, 10, 0);
// Wait for 2 seconds to see that at least 1 refresh have been made since the initial delay is 1
// seconds
Waiter.waitFor(conf, 2000, () -> refreshCallCounter.get() == 1);
// Sleep more 5 seconds to make sure we have not made new calls since the interval is 10 seconds
Thread.sleep(5000);
assertEquals(1, refreshCallCounter.get());
}

@Test
public void testPeriodicMasterEndPointRefresh() {
// Refresh every 1 second.
createRefresher(1, 0);
createRefresher(1, 1, 0);
// Wait for > 3 seconds to see that at least 3 refresh have been made.
Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
}

@Test
public void testDurationBetweenRefreshes() throws IOException {
public void testDurationBetweenRefreshes() {
// Disable periodic refresh
// A minimum duration of 1s between refreshes
createRefresher(Integer.MAX_VALUE, 1);
createRefresher(Integer.MAX_VALUE, Integer.MAX_VALUE, 1);
// Issue a ton of manual refreshes.
for (int i = 0; i < 10000; i++) {
refresher.refreshNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class TestRpcBasedRegistryHedgedReads {
private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class);

private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout";
private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
"hbase.test.refresh.initial.delay.secs";
private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME =
"hbase.test.refresh.interval.secs";
private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME =
Expand Down Expand Up @@ -153,7 +155,8 @@ private AbstractRpcBasedConnectionRegistry createRegistry(int hedged) throws IOE
Configuration conf = UTIL.getConfiguration();
conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged);
return new AbstractRpcBasedConnectionRegistry(conf, HEDGED_REQS_FANOUT_CONFIG_NAME,
REFRESH_INTERVAL_SECS_CONFIG_NAME, MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {
INITIAL_DELAY_SECS_CONFIG_NAME, REFRESH_INTERVAL_SECS_CONFIG_NAME,
MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {

@Override
protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
Expand All @@ -173,6 +176,7 @@ public static void setUpBeforeClass() {
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
RpcClient.class);
// disable refresh, we do not need to refresh in this test
conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, Integer.MAX_VALUE);
conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE);
conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1);
BOOTSTRAP_NODES = IntStream.range(0, 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class TestRpcConnectionRegistry {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// allow refresh immediately so we will switch to use region servers soon.
UTIL.getConfiguration().setLong(RpcConnectionRegistry.INITIAL_REFRESH_DELAY_SECS, 1);
UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1);
UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0);
UTIL.startMiniCluster(3);
Expand Down Expand Up @@ -91,8 +92,8 @@ private void setMaxNodeCount(int count) {
@Test
public void testRegistryRPCs() throws Exception {
HMaster activeMaster = UTIL.getHBaseCluster().getMaster();
// wait until we switch to use region servers
UTIL.waitFor(10000, () -> registry.getParsedServers().size() == 3);
// sleep 3 seconds, since our initial delay is 1 second, we should have refreshed the endpoints
Thread.sleep(3000);
assertThat(registry.getParsedServers(),
hasItems(activeMaster.getServerManager().getOnlineServersList().toArray(new ServerName[0])));

Expand Down