Skip to content

Commit 573e04a

Browse files
author
Duo Zhang
committed
HBASE-26214 Introduce a ConnectionRegistryEndpoint interface
1 parent 6ed03d9 commit 573e04a

File tree

10 files changed

+112
-59
lines changed

10 files changed

+112
-59
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/MetaRegionLocationCache.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.hadoop.hbase;
1919

2020
import java.util.ArrayList;
21+
import java.util.Collections;
2122
import java.util.List;
22-
import java.util.Optional;
2323
import java.util.concurrent.ConcurrentNavigableMap;
2424
import java.util.concurrent.ThreadFactory;
2525
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -203,19 +203,19 @@ private void updateMetaLocation(String path, ZNodeOpType opType) {
203203
* @return Optional list of HRegionLocations for meta replica(s), null if the cache is empty.
204204
*
205205
*/
206-
public Optional<List<HRegionLocation>> getMetaRegionLocations() {
206+
public List<HRegionLocation> getMetaRegionLocations() {
207207
ConcurrentNavigableMap<Integer, HRegionLocation> snapshot =
208208
cachedMetaLocations.tailMap(cachedMetaLocations.firstKey());
209209
if (snapshot.isEmpty()) {
210210
// This could be possible if the master has not successfully initialized yet or meta region
211211
// is stuck in some weird state.
212-
return Optional.empty();
212+
return Collections.emptyList();
213213
}
214214
List<HRegionLocation> result = new ArrayList<>();
215215
// Explicitly iterate instead of new ArrayList<>(snapshot.values()) because the underlying
216216
// ArrayValueCollection does not implement toArray().
217217
snapshot.values().forEach(location -> result.add(location));
218-
return Optional.of(result);
218+
return result;
219219
}
220220

221221
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,9 @@
1818
package org.apache.hadoop.hbase.client;
1919

2020
import java.io.IOException;
21-
import java.net.InetSocketAddress;
2221
import java.net.SocketAddress;
2322
import java.security.PrivilegedExceptionAction;
2423
import org.apache.hadoop.conf.Configuration;
25-
import org.apache.hadoop.hbase.regionserver.HRegionServer;
2624
import org.apache.hadoop.hbase.security.User;
2725
import org.apache.hadoop.hbase.util.FutureUtils;
2826
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -71,15 +69,13 @@ public static AsyncClusterConnection createAsyncClusterConnection(Configuration
7169
}
7270

7371
/**
74-
* Create a new {@link AsyncClusterConnection} instance for a region server.
72+
* Create a new {@link AsyncClusterConnection} instance to be used at server side where we have a
73+
* {@link ConnectionRegistryEndpoint}.
7574
*/
76-
public static AsyncClusterConnection createAsyncClusterConnection(HRegionServer regionServer)
75+
public static AsyncClusterConnection createAsyncClusterConnection(
76+
ConnectionRegistryEndpoint endpoint, Configuration conf, SocketAddress localAddress, User user)
7777
throws IOException {
78-
RegionServerRegistry registry = new RegionServerRegistry(regionServer);
79-
Configuration conf = regionServer.getConfiguration();
80-
InetSocketAddress localAddress =
81-
new InetSocketAddress(regionServer.getRSRpcServices().getSocketAddress().getAddress(), 0);
82-
User user = regionServer.getUserProvider().getCurrent();
78+
ShortCircuitConnectionRegistry registry = new ShortCircuitConnectionRegistry(endpoint);
8379
return createAsyncClusterConnection(conf, registry, localAddress, user);
8480
}
8581
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import java.util.List;
21+
import java.util.Optional;
22+
import org.apache.hadoop.hbase.HRegionLocation;
23+
import org.apache.hadoop.hbase.ServerName;
24+
import org.apache.yetus.audience.InterfaceAudience;
25+
26+
/**
27+
* Define the necessary method for carrying {@code ClientMetaService}.
28+
*/
29+
@InterfaceAudience.Private
30+
public interface ConnectionRegistryEndpoint {
31+
32+
/**
33+
* Get cluster id.
34+
*/
35+
String getClusterId();
36+
37+
/**
38+
* Get active master address.
39+
*/
40+
Optional<ServerName> getActiveMaster();
41+
42+
/**
43+
* Get backup masters address.
44+
*/
45+
List<ServerName> getBackupMasters();
46+
47+
/**
48+
* Get all the region servers address.
49+
*/
50+
List<ServerName> getRegionServers();
51+
52+
/**
53+
* Get the location of meta regions.
54+
*/
55+
List<HRegionLocation> getMetaLocations();
56+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/client/RegionServerRegistry.java renamed to hbase-server/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitConnectionRegistry.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,48 +24,42 @@
2424
import org.apache.hadoop.hbase.HRegionLocation;
2525
import org.apache.hadoop.hbase.RegionLocations;
2626
import org.apache.hadoop.hbase.ServerName;
27-
import org.apache.hadoop.hbase.regionserver.HRegionServer;
2827
import org.apache.yetus.audience.InterfaceAudience;
2928

3029
/**
31-
* Connection registry implementation for region server.
30+
* A {@link ConnectionRegistry} implementation used at server side, where we could use the
31+
* {@link ConnectionRegistryEndpoint} directly, without any rpcs.
3232
*/
3333
@InterfaceAudience.Private
34-
public class RegionServerRegistry implements ConnectionRegistry {
34+
class ShortCircuitConnectionRegistry implements ConnectionRegistry {
3535

36-
private final HRegionServer regionServer;
36+
private final ConnectionRegistryEndpoint endpoint;
3737

38-
public RegionServerRegistry(HRegionServer regionServer) {
39-
this.regionServer = regionServer;
38+
public ShortCircuitConnectionRegistry(ConnectionRegistryEndpoint endpoint) {
39+
this.endpoint = endpoint;
4040
}
4141

4242
@Override
4343
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
4444
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
45-
Optional<List<HRegionLocation>> locs =
46-
regionServer.getMetaRegionLocationCache().getMetaRegionLocations();
47-
if (locs.isPresent()) {
48-
List<HRegionLocation> list = locs.get();
49-
if (list.isEmpty()) {
50-
future.completeExceptionally(new IOException("no meta location available"));
51-
} else {
52-
future.complete(new RegionLocations(list));
53-
}
54-
} else {
45+
List<HRegionLocation> locs = endpoint.getMetaLocations();
46+
if (locs.isEmpty()) {
5547
future.completeExceptionally(new IOException("no meta location available"));
48+
} else {
49+
future.complete(new RegionLocations(locs));
5650
}
5751
return future;
5852
}
5953

6054
@Override
6155
public CompletableFuture<String> getClusterId() {
62-
return CompletableFuture.completedFuture(regionServer.getClusterId());
56+
return CompletableFuture.completedFuture(endpoint.getClusterId());
6357
}
6458

6559
@Override
6660
public CompletableFuture<ServerName> getActiveMaster() {
6761
CompletableFuture<ServerName> future = new CompletableFuture<>();
68-
Optional<ServerName> activeMaster = regionServer.getActiveMaster();
62+
Optional<ServerName> activeMaster = endpoint.getActiveMaster();
6963
if (activeMaster.isPresent()) {
7064
future.complete(activeMaster.get());
7165
} else {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
2424
import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
2525

26+
import com.google.errorprone.annotations.RestrictedApi;
2627
import java.io.IOException;
2728
import java.io.PrintWriter;
2829
import java.lang.management.MemoryType;
@@ -75,6 +76,7 @@
7576
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
7677
import org.apache.hadoop.hbase.HConstants;
7778
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
79+
import org.apache.hadoop.hbase.HRegionLocation;
7880
import org.apache.hadoop.hbase.HealthCheckChore;
7981
import org.apache.hadoop.hbase.MetaRegionLocationCache;
8082
import org.apache.hadoop.hbase.MetaTableAccessor;
@@ -91,6 +93,7 @@
9193
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
9294
import org.apache.hadoop.hbase.client.Connection;
9395
import org.apache.hadoop.hbase.client.ConnectionFactory;
96+
import org.apache.hadoop.hbase.client.ConnectionRegistryEndpoint;
9497
import org.apache.hadoop.hbase.client.ConnectionUtils;
9598
import org.apache.hadoop.hbase.client.RegionInfo;
9699
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
@@ -247,8 +250,9 @@
247250
*/
248251
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
249252
@SuppressWarnings({ "deprecation"})
250-
public class HRegionServer extends Thread implements
251-
RegionServerServices, LastSequenceId, ConfigurationObserver {
253+
public class HRegionServer extends Thread implements RegionServerServices, LastSequenceId,
254+
ConnectionRegistryEndpoint, ConfigurationObserver {
255+
252256
private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
253257

254258
/**
@@ -887,7 +891,11 @@ public String getClusterId() {
887891
*/
888892
protected final synchronized void setupClusterConnection() throws IOException {
889893
if (asyncClusterConnection == null) {
890-
asyncClusterConnection = ClusterConnectionFactory.createAsyncClusterConnection(this);
894+
InetSocketAddress localAddress =
895+
new InetSocketAddress(rpcServices.getSocketAddress().getAddress(), 0);
896+
User user = userProvider.getCurrent();
897+
asyncClusterConnection =
898+
ClusterConnectionFactory.createAsyncClusterConnection(this, conf, localAddress, user);
891899
}
892900
}
893901

@@ -3983,23 +3991,29 @@ public long getRetryPauseTime() {
39833991
return this.retryPauseTime;
39843992
}
39853993

3994+
@Override
39863995
public Optional<ServerName> getActiveMaster() {
39873996
return Optional.ofNullable(masterAddressTracker.getMasterAddress());
39883997
}
39893998

3999+
@Override
39904000
public List<ServerName> getBackupMasters() {
39914001
return masterAddressTracker.getBackupMasters();
39924002
}
39934003

4004+
@Override
39944005
public List<ServerName> getRegionServers() {
39954006
return regionServerAddressTracker.getRegionServers();
39964007
}
39974008

3998-
public MetaRegionLocationCache getMetaRegionLocationCache() {
3999-
return this.metaRegionLocationCache;
4009+
@Override
4010+
public List<HRegionLocation> getMetaLocations() {
4011+
return metaRegionLocationCache.getMetaRegionLocations();
40004012
}
40014013

4002-
public UserProvider getUserProvider() {
4003-
return userProvider;
4014+
@RestrictedApi(explanation = "Should only be called in tests", link = "",
4015+
allowedOnPath = ".*/src/test/.*")
4016+
public MetaRegionLocationCache getMetaRegionLocationCache() {
4017+
return metaRegionLocationCache;
40044018
}
40054019
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
import org.apache.hadoop.hbase.DroppedSnapshotException;
6161
import org.apache.hadoop.hbase.HBaseIOException;
6262
import org.apache.hadoop.hbase.HConstants;
63-
import org.apache.hadoop.hbase.HRegionLocation;
6463
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
6564
import org.apache.hadoop.hbase.NotServingRegionException;
6665
import org.apache.hadoop.hbase.PrivateCellUtil;
@@ -4114,10 +4113,8 @@ public GetMastersResponse getMasters(RpcController controller, GetMastersRequest
41144113
public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller,
41154114
GetMetaRegionLocationsRequest request) throws ServiceException {
41164115
GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder();
4117-
Optional<List<HRegionLocation>> metaLocations =
4118-
regionServer.getMetaRegionLocationCache().getMetaRegionLocations();
4119-
metaLocations.ifPresent(hRegionLocations -> hRegionLocations
4120-
.forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
4116+
regionServer.getMetaLocations()
4117+
.forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location)));
41214118
return builder.build();
41224119
}
41234120

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,7 @@ public void testMasterAddressParsing() throws IOException {
112112
public void testRegistryRPCs() throws Exception {
113113
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
114114
HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
115-
final int size =
116-
activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get().size();
115+
final int size = activeMaster.getMetaLocations().size();
117116
for (int numHedgedReqs = 1; numHedgedReqs <= size; numHedgedReqs++) {
118117
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
119118
try (MasterRegistry registry = new MasterRegistry(conf)) {
@@ -124,8 +123,7 @@ public void testRegistryRPCs() throws Exception {
124123
assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
125124
List<HRegionLocation> metaLocations =
126125
Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
127-
List<HRegionLocation> actualMetaLocations =
128-
activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get();
126+
List<HRegionLocation> actualMetaLocations = activeMaster.getMetaLocations();
129127
Collections.sort(metaLocations);
130128
Collections.sort(actualMetaLocations);
131129
assertEquals(actualMetaLocations, metaLocations);

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertTrue;
2223

2324
import java.util.ArrayList;
2425
import java.util.Collections;
@@ -88,7 +89,7 @@ private List<HRegionLocation> getCurrentMetaLocations(ZKWatcher zk) throws Excep
8889
private void verifyCachedMetaLocations(HMaster master) throws Exception {
8990
// Wait until initial meta locations are loaded.
9091
int retries = 0;
91-
while (!master.getMetaRegionLocationCache().getMetaRegionLocations().isPresent()) {
92+
while (master.getMetaRegionLocationCache().getMetaRegionLocations().isEmpty()) {
9293
Thread.sleep(1000);
9394
if (++retries == 10) {
9495
break;
@@ -98,15 +99,14 @@ private void verifyCachedMetaLocations(HMaster master) throws Exception {
9899
List<String> metaZnodes = zk.getMetaReplicaNodes();
99100
// Wait till all replicas available.
100101
retries = 0;
101-
while (master.getMetaRegionLocationCache().getMetaRegionLocations().get().size() != metaZnodes
102+
while (master.getMetaRegionLocationCache().getMetaRegionLocations().size() != metaZnodes
102103
.size()) {
103104
Thread.sleep(1000);
104105
if (++retries == 10) {
105106
break;
106107
}
107108
}
108-
List<HRegionLocation> metaHRLs =
109-
master.getMetaRegionLocationCache().getMetaRegionLocations().get();
109+
List<HRegionLocation> metaHRLs = master.getMetaRegionLocationCache().getMetaRegionLocations();
110110
assertFalse(metaHRLs.isEmpty());
111111
assertEquals(metaZnodes.size(), metaHRLs.size());
112112
List<HRegionLocation> actualHRLs = getCurrentMetaLocations(zk);
@@ -167,7 +167,7 @@ public void testStandByMetaLocations() throws Exception {
167167
try {
168168
MetaRegionLocationCache metaCache = new MetaRegionLocationCache(zkWatcher);
169169
// meta znodes do not exist at this point, cache should be empty.
170-
assertFalse(metaCache.getMetaRegionLocations().isPresent());
170+
assertTrue(metaCache.getMetaRegionLocations().isEmpty());
171171
// Set the meta locations for a random meta replicas, simulating an active hmaster meta
172172
// assignment.
173173
for (int i = 0; i < 3; i++) {
@@ -177,13 +177,12 @@ public void testStandByMetaLocations() throws Exception {
177177
// Wait until the meta cache is populated.
178178
int iters = 0;
179179
while (iters++ < 10) {
180-
if (metaCache.getMetaRegionLocations().isPresent()
181-
&& metaCache.getMetaRegionLocations().get().size() == 3) {
180+
if (metaCache.getMetaRegionLocations().size() == 3) {
182181
break;
183182
}
184183
Thread.sleep(1000);
185184
}
186-
List<HRegionLocation> metaLocations = metaCache.getMetaRegionLocations().get();
185+
List<HRegionLocation> metaLocations = metaCache.getMetaRegionLocations();
187186
assertEquals(3, metaLocations.size());
188187
for (HRegionLocation location : metaLocations) {
189188
assertEquals(sn, location.getServerName());

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,7 @@ public void testRegistryRPCs() throws Exception {
9494
assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
9595
List<HRegionLocation> metaLocations =
9696
Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
97-
List<HRegionLocation> actualMetaLocations =
98-
activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get();
97+
List<HRegionLocation> actualMetaLocations = activeMaster.getMetaLocations();
9998
Collections.sort(metaLocations);
10099
Collections.sort(actualMetaLocations);
101100
assertEquals(actualMetaLocations, metaLocations);

hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,8 @@ private static HBaseRpcController getRpcController() {
144144
*/
145145
@Test public void TestMetaLocations() throws Exception {
146146
HBaseRpcController rpcController = getRpcController();
147-
List<HRegionLocation> metaLocations = TEST_UTIL.getMiniHBaseCluster().getMaster()
148-
.getMetaRegionLocationCache().getMetaRegionLocations().get();
147+
List<HRegionLocation> metaLocations =
148+
TEST_UTIL.getMiniHBaseCluster().getMaster().getMetaLocations();
149149
Collections.sort(metaLocations);
150150
int rpcCount = 0;
151151
for (JVMClusterUtil.MasterThread masterThread:

0 commit comments

Comments
 (0)