Skip to content

Commit e92f501

Browse files
authored
Merge branch 'HBASE-26067' into HBASE-26079
2 parents a66cecc + 1848381 commit e92f501

30 files changed

+556
-385
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,9 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry
9090
private final RegistryEndpointsRefresher registryEndpointRefresher;
9191

9292
protected AbstractRpcBasedConnectionRegistry(Configuration conf,
93-
String hedgedReqsFanoutConfigName, String refreshIntervalSecsConfigName,
94-
String minRefreshIntervalSecsConfigName) throws IOException {
93+
String hedgedReqsFanoutConfigName, String initialRefreshDelaySecsConfigName,
94+
String refreshIntervalSecsConfigName, String minRefreshIntervalSecsConfigName)
95+
throws IOException {
9596
this.hedgedReadFanOut =
9697
Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT));
9798
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
@@ -103,8 +104,9 @@ protected AbstractRpcBasedConnectionRegistry(Configuration conf,
103104
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
104105
populateStubs(getBootstrapNodes(conf));
105106
// could return null here is refresh interval is less than zero
106-
registryEndpointRefresher = RegistryEndpointsRefresher.create(conf,
107-
refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
107+
registryEndpointRefresher =
108+
RegistryEndpointsRefresher.create(conf, initialRefreshDelaySecsConfigName,
109+
refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
108110
}
109111

110112
protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
5959
public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
6060
"hbase.client.master_registry.hedged.fanout";
6161

62+
public static final String MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS =
63+
"hbase.client.master_registry.initial_refresh_delay_secs";
64+
6265
public static final String MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS =
6366
"hbase.client.master_registry.refresh_interval_secs";
6467

@@ -85,7 +88,7 @@ public static Set<ServerName> parseMasterAddrs(Configuration conf) throws Unknow
8588
}
8689

8790
MasterRegistry(Configuration conf) throws IOException {
88-
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
91+
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS,
8992
MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
9093
}
9194

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ final class RegistryEndpointsRefresher {
4545

4646
private final Thread thread;
4747
private final Refresher refresher;
48+
private final long initialDelayMs;
4849
private final long periodicRefreshMs;
4950
private final long minTimeBetweenRefreshesMs;
5051

@@ -56,9 +57,20 @@ synchronized void stop() {
5657
notifyAll();
5758
}
5859

60+
private long getRefreshIntervalMs(boolean firstRefresh) {
61+
if (refreshNow) {
62+
return minTimeBetweenRefreshesMs;
63+
}
64+
if (firstRefresh) {
65+
return initialDelayMs;
66+
}
67+
return periodicRefreshMs;
68+
}
69+
5970
// The main loop for the refresh thread.
6071
private void mainLoop() {
6172
long lastRefreshTime = EnvironmentEdgeManager.currentTime();
73+
boolean firstRefresh = true;
6274
for (;;) {
6375
synchronized (this) {
6476
for (;;) {
@@ -68,9 +80,12 @@ private void mainLoop() {
6880
}
6981
// if refreshNow is true, then we will wait until minTimeBetweenRefreshesMs elapsed,
7082
// otherwise wait until periodicRefreshMs elapsed
71-
long waitTime = (refreshNow ? minTimeBetweenRefreshesMs : periodicRefreshMs) -
83+
long waitTime = getRefreshIntervalMs(firstRefresh) -
7284
(EnvironmentEdgeManager.currentTime() - lastRefreshTime);
7385
if (waitTime <= 0) {
86+
// we are going to refresh, reset this flag
87+
firstRefresh = false;
88+
refreshNow = false;
7489
break;
7590
}
7691
try {
@@ -81,8 +96,6 @@ private void mainLoop() {
8196
continue;
8297
}
8398
}
84-
// we are going to refresh, reset this flag
85-
refreshNow = false;
8699
}
87100
LOG.debug("Attempting to refresh registry end points");
88101
try {
@@ -104,8 +117,9 @@ public interface Refresher {
104117
void refresh() throws IOException;
105118
}
106119

107-
private RegistryEndpointsRefresher(long periodicRefreshMs, long minTimeBetweenRefreshesMs,
108-
Refresher refresher) {
120+
private RegistryEndpointsRefresher(long initialDelayMs, long periodicRefreshMs,
121+
long minTimeBetweenRefreshesMs, Refresher refresher) {
122+
this.initialDelayMs = initialDelayMs;
109123
this.periodicRefreshMs = periodicRefreshMs;
110124
this.minTimeBetweenRefreshesMs = minTimeBetweenRefreshesMs;
111125
this.refresher = refresher;
@@ -129,16 +143,19 @@ synchronized void refreshNow() {
129143
* {@code intervalSecsConfigName} is less than zero, will return null here, which means disable
130144
* refreshing of endpoints.
131145
*/
132-
static RegistryEndpointsRefresher create(Configuration conf, String intervalSecsConfigName,
133-
String minIntervalSecsConfigName, Refresher refresher) {
146+
static RegistryEndpointsRefresher create(Configuration conf, String initialDelaySecsConfigName,
147+
String intervalSecsConfigName, String minIntervalSecsConfigName, Refresher refresher) {
134148
long periodicRefreshMs = TimeUnit.SECONDS
135149
.toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
136150
if (periodicRefreshMs <= 0) {
137151
return null;
138152
}
153+
long initialDelayMs = Math.max(1,
154+
TimeUnit.SECONDS.toMillis(conf.getLong(initialDelaySecsConfigName, periodicRefreshMs / 10)));
139155
long minTimeBetweenRefreshesMs = TimeUnit.SECONDS
140156
.toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
141157
Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
142-
return new RegistryEndpointsRefresher(periodicRefreshMs, minTimeBetweenRefreshesMs, refresher);
158+
return new RegistryEndpointsRefresher(initialDelayMs, periodicRefreshMs,
159+
minTimeBetweenRefreshesMs, refresher);
143160
}
144161
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,17 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
5151
/** Configuration key that controls the fan out of requests **/
5252
public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout";
5353

54+
/**
55+
* As end users could configure any nodes in a cluster as the initial bootstrap nodes, it is
56+
* possible that different end users will configure the same machine which makes the machine over
57+
* load. So we should have a shorter delay for the initial refresh, to let users quickly switch to
58+
* the bootstrap nodes we want them to connect to.
59+
* <p/>
60+
* The default value for initial refresh delay is 1/10 of periodic refresh interval.
61+
*/
62+
public static final String INITIAL_REFRESH_DELAY_SECS =
63+
"hbase.client.bootstrap.initial_refresh_delay_secs";
64+
5465
public static final String PERIODIC_REFRESH_INTERVAL_SECS =
5566
"hbase.client.bootstrap.refresh_interval_secs";
5667

@@ -62,7 +73,8 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
6273
private static final char ADDRS_CONF_SEPARATOR = ',';
6374

6475
RpcConnectionRegistry(Configuration conf) throws IOException {
65-
super(conf, HEDGED_REQS_FANOUT_KEY, PERIODIC_REFRESH_INTERVAL_SECS, MIN_SECS_BETWEEN_REFRESHES);
76+
super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS,
77+
MIN_SECS_BETWEEN_REFRESHES);
6678
}
6779

6880
@Override

hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20+
import static org.junit.Assert.assertEquals;
2021
import static org.junit.Assert.assertNull;
2122
import static org.junit.Assert.assertTrue;
2223

23-
import java.io.IOException;
2424
import java.util.concurrent.CopyOnWriteArrayList;
2525
import java.util.concurrent.TimeUnit;
2626
import java.util.concurrent.atomic.AtomicInteger;
@@ -46,6 +46,8 @@ public class TestRegistryEndpointsRefresher {
4646
public static final HBaseClassTestRule CLASS_RULE =
4747
HBaseClassTestRule.forClass(TestRegistryEndpointsRefresher.class);
4848

49+
private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
50+
"hbase.test.registry.initial.delay.secs";
4951
private static final String INTERVAL_SECS_CONFIG_NAME =
5052
"hbase.test.registry.refresh.interval.secs";
5153
private static final String MIN_INTERVAL_SECS_CONFIG_NAME =
@@ -75,33 +77,45 @@ private void refresh() {
7577
callTimestamps.add(EnvironmentEdgeManager.currentTime());
7678
}
7779

78-
private void createRefresher(long intervalSecs, long minIntervalSecs) {
80+
private void createRefresher(long initialDelaySecs, long intervalSecs, long minIntervalSecs) {
81+
conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, initialDelaySecs);
7982
conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
8083
conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
81-
refresher = RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
82-
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
84+
refresher = RegistryEndpointsRefresher.create(conf, INITIAL_DELAY_SECS_CONFIG_NAME,
85+
INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
8386
}
8487

8588
@Test
8689
public void testDisableRefresh() {
8790
conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1);
8891
assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
89-
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
92+
INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
9093
}
9194

9295
@Test
93-
public void testPeriodicEndpointRefresh() throws IOException {
96+
public void testInitialDelay() throws InterruptedException {
97+
createRefresher(1, 10, 0);
98+
// Wait for 2 seconds to see that at least 1 refresh have been made since the initial delay is 1
99+
// seconds
100+
Waiter.waitFor(conf, 2000, () -> refreshCallCounter.get() == 1);
101+
// Sleep more 5 seconds to make sure we have not made new calls since the interval is 10 seconds
102+
Thread.sleep(5000);
103+
assertEquals(1, refreshCallCounter.get());
104+
}
105+
106+
@Test
107+
public void testPeriodicMasterEndPointRefresh() {
94108
// Refresh every 1 second.
95-
createRefresher(1, 0);
109+
createRefresher(1, 1, 0);
96110
// Wait for > 3 seconds to see that at least 3 refresh have been made.
97111
Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
98112
}
99113

100114
@Test
101-
public void testDurationBetweenRefreshes() throws IOException {
115+
public void testDurationBetweenRefreshes() {
102116
// Disable periodic refresh
103117
// A minimum duration of 1s between refreshes
104-
createRefresher(Integer.MAX_VALUE, 1);
118+
createRefresher(Integer.MAX_VALUE, Integer.MAX_VALUE, 1);
105119
// Issue a ton of manual refreshes.
106120
for (int i = 0; i < 10000; i++) {
107121
refresher.refreshNow();

hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class TestRpcBasedRegistryHedgedReads {
6969
private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class);
7070

7171
private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout";
72+
private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
73+
"hbase.test.refresh.initial.delay.secs";
7274
private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME =
7375
"hbase.test.refresh.interval.secs";
7476
private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME =
@@ -153,7 +155,8 @@ private AbstractRpcBasedConnectionRegistry createRegistry(int hedged) throws IOE
153155
Configuration conf = UTIL.getConfiguration();
154156
conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged);
155157
return new AbstractRpcBasedConnectionRegistry(conf, HEDGED_REQS_FANOUT_CONFIG_NAME,
156-
REFRESH_INTERVAL_SECS_CONFIG_NAME, MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {
158+
INITIAL_DELAY_SECS_CONFIG_NAME, REFRESH_INTERVAL_SECS_CONFIG_NAME,
159+
MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {
157160

158161
@Override
159162
protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
@@ -173,6 +176,7 @@ public static void setUpBeforeClass() {
173176
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
174177
RpcClient.class);
175178
// disable refresh, we do not need to refresh in this test
179+
conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, Integer.MAX_VALUE);
176180
conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE);
177181
conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1);
178182
BOOTSTRAP_NODES = IntStream.range(0, 10)

hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/BackupMasterStatusTmpl.jamon

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,19 @@ java.util.*;
2424
org.apache.hadoop.hbase.ServerName;
2525
org.apache.hadoop.hbase.ClusterMetrics;
2626
org.apache.hadoop.hbase.master.HMaster;
27-
org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
2827
</%import>
29-
<%java>
30-
MasterAddressTracker masterAddressTracker = master.getMasterAddressTracker();
31-
</%java>
32-
3328
<%if (!master.isActiveMaster()) %>
3429
<%java>
35-
ServerName active_master =
36-
(masterAddressTracker == null) ? null : masterAddressTracker.getMasterAddress();
37-
assert active_master != null : "Failed to retrieve master's ServerName!";
38-
int infoPort = (masterAddressTracker == null) ? 0 : masterAddressTracker.getMasterInfoPort();
30+
ServerName active_master = master.getActiveMaster().orElse(null);
31+
assert active_master != null : "Failed to retrieve active master's ServerName!";
32+
int activeInfoPort = active_master == null ? 0 : master.getActiveMasterInfoPort();
3933
</%java>
4034
<div class="row inner_header">
4135
<div class="page-header">
4236
<h1>Backup Master <small><% master.getServerName().getHostname() %></small></h1>
4337
</div>
4438
</div>
45-
<h4>Current Active Master: <a href="//<% active_master.getHostname() %>:<% infoPort %>/master-status"
39+
<h4>Current Active Master: <a href="//<% active_master.getHostname() %>:<% activeInfoPort %>/master-status"
4640
target="_blank"><% active_master.getHostname() %></a><h4>
4741
<%else>
4842
<h2>Backup Masters</h2>
@@ -54,13 +48,11 @@ MasterAddressTracker masterAddressTracker = master.getMasterAddressTracker();
5448
<th>Start Time</th>
5549
</tr>
5650
<%java>
57-
Collection<ServerName> backup_masters = master.getClusterMetricsWithoutCoprocessor(
58-
EnumSet.of(ClusterMetrics.Option.BACKUP_MASTERS)).getBackupMasterNames();
51+
Collection<ServerName> backup_masters = master.getBackupMasters();
5952
ServerName [] backupServerNames = backup_masters.toArray(new ServerName[backup_masters.size()]);
6053
Arrays.sort(backupServerNames);
6154
for (ServerName serverName : backupServerNames) {
62-
int infoPort = (masterAddressTracker == null) ? 0 : masterAddressTracker
63-
.getBackupMasterInfoPort(serverName);
55+
int infoPort = master.getBackupMasterInfoPort(serverName);
6456
</%java>
6557
<tr>
6658
<td><a href="//<% serverName.getHostname() %>:<% infoPort %>/master-status"

hbase-server/src/main/java/org/apache/hadoop/hbase/MetaRegionLocationCache.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.hadoop.hbase;
1919

2020
import java.util.ArrayList;
21+
import java.util.Collections;
2122
import java.util.List;
22-
import java.util.Optional;
2323
import java.util.concurrent.ConcurrentNavigableMap;
2424
import java.util.concurrent.ThreadFactory;
2525
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -203,19 +203,19 @@ private void updateMetaLocation(String path, ZNodeOpType opType) {
203203
* @return Optional list of HRegionLocations for meta replica(s), null if the cache is empty.
204204
*
205205
*/
206-
public Optional<List<HRegionLocation>> getMetaRegionLocations() {
206+
public List<HRegionLocation> getMetaRegionLocations() {
207207
ConcurrentNavigableMap<Integer, HRegionLocation> snapshot =
208208
cachedMetaLocations.tailMap(cachedMetaLocations.firstKey());
209209
if (snapshot.isEmpty()) {
210210
// This could be possible if the master has not successfully initialized yet or meta region
211211
// is stuck in some weird state.
212-
return Optional.empty();
212+
return Collections.emptyList();
213213
}
214214
List<HRegionLocation> result = new ArrayList<>();
215215
// Explicitly iterate instead of new ArrayList<>(snapshot.values()) because the underlying
216216
// ArrayValueCollection does not implement toArray().
217217
snapshot.values().forEach(location -> result.add(location));
218-
return Optional.of(result);
218+
return result;
219219
}
220220

221221
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,9 @@
1818
package org.apache.hadoop.hbase.client;
1919

2020
import java.io.IOException;
21-
import java.net.InetSocketAddress;
2221
import java.net.SocketAddress;
2322
import java.security.PrivilegedExceptionAction;
2423
import org.apache.hadoop.conf.Configuration;
25-
import org.apache.hadoop.hbase.regionserver.HRegionServer;
2624
import org.apache.hadoop.hbase.security.User;
2725
import org.apache.hadoop.hbase.util.FutureUtils;
2826
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -71,15 +69,13 @@ public static AsyncClusterConnection createAsyncClusterConnection(Configuration
7169
}
7270

7371
/**
74-
* Create a new {@link AsyncClusterConnection} instance for a region server.
72+
* Create a new {@link AsyncClusterConnection} instance to be used at server side where we have a
73+
* {@link ConnectionRegistryEndpoint}.
7574
*/
76-
public static AsyncClusterConnection createAsyncClusterConnection(HRegionServer regionServer)
75+
public static AsyncClusterConnection createAsyncClusterConnection(
76+
ConnectionRegistryEndpoint endpoint, Configuration conf, SocketAddress localAddress, User user)
7777
throws IOException {
78-
RegionServerRegistry registry = new RegionServerRegistry(regionServer);
79-
Configuration conf = regionServer.getConfiguration();
80-
InetSocketAddress localAddress =
81-
new InetSocketAddress(regionServer.getRSRpcServices().getSocketAddress().getAddress(), 0);
82-
User user = regionServer.getUserProvider().getCurrent();
78+
ShortCircuitConnectionRegistry registry = new ShortCircuitConnectionRegistry(endpoint);
8379
return createAsyncClusterConnection(conf, registry, localAddress, user);
8480
}
8581
}

0 commit comments

Comments
 (0)