33
33
import java .util .concurrent .atomic .AtomicInteger ;
34
34
import java .util .function .Predicate ;
35
35
import org .apache .hadoop .conf .Configuration ;
36
- import org .apache .hadoop .hbase .HConstants ;
37
36
import org .apache .hadoop .hbase .HRegionLocation ;
38
37
import org .apache .hadoop .hbase .RegionLocations ;
39
38
import org .apache .hadoop .hbase .ServerName ;
40
39
import org .apache .hadoop .hbase .exceptions .ClientExceptionsUtil ;
41
40
import org .apache .hadoop .hbase .exceptions .MasterRegistryFetchException ;
42
41
import org .apache .hadoop .hbase .ipc .HBaseRpcController ;
43
- import org .apache .hadoop .hbase .ipc .RpcClient ;
44
- import org .apache .hadoop .hbase .ipc .RpcClientFactory ;
45
42
import org .apache .hadoop .hbase .ipc .RpcControllerFactory ;
46
43
import org .apache .hadoop .hbase .security .User ;
47
44
import org .apache .hadoop .hbase .util .FutureUtils ;
48
45
import org .apache .yetus .audience .InterfaceAudience ;
49
46
50
- import org .apache .hbase .thirdparty .com .google .common .base .Preconditions ;
51
- import org .apache .hbase .thirdparty .com .google .common .collect .ImmutableMap ;
52
47
import org .apache .hbase .thirdparty .com .google .protobuf .Message ;
53
48
import org .apache .hbase .thirdparty .com .google .protobuf .RpcCallback ;
54
49
@@ -79,30 +74,21 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry
79
74
80
75
private final int hedgedReadFanOut ;
81
76
82
- // Configured list of end points to probe the meta information from.
83
- private volatile ImmutableMap <ServerName , ClientMetaService .Interface > addr2Stub ;
84
-
85
77
// RPC client used to talk to the masters.
86
- private final RpcClient rpcClient ;
78
+ private final ConnectionRegistryRpcStubHolder rpcStubHolder ;
87
79
private final RpcControllerFactory rpcControllerFactory ;
88
- private final int rpcTimeoutMs ;
89
80
90
81
private final RegistryEndpointsRefresher registryEndpointRefresher ;
91
82
92
- protected AbstractRpcBasedConnectionRegistry (Configuration conf ,
83
+ protected AbstractRpcBasedConnectionRegistry (Configuration conf , User user ,
93
84
String hedgedReqsFanoutConfigName , String initialRefreshDelaySecsConfigName ,
94
85
String refreshIntervalSecsConfigName , String minRefreshIntervalSecsConfigName )
95
86
throws IOException {
96
87
this .hedgedReadFanOut =
97
88
Math .max (1 , conf .getInt (hedgedReqsFanoutConfigName , HEDGED_REQS_FANOUT_DEFAULT ));
98
- rpcTimeoutMs = (int ) Math .min (Integer .MAX_VALUE ,
99
- conf .getLong (HConstants .HBASE_RPC_TIMEOUT_KEY , HConstants .DEFAULT_HBASE_RPC_TIMEOUT ));
100
- // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
101
- // this through the master registry...
102
- // This is a problem as we will use the cluster id to determine the authentication method
103
- rpcClient = RpcClientFactory .createClient (conf , null );
104
89
rpcControllerFactory = RpcControllerFactory .instantiate (conf );
105
- populateStubs (getBootstrapNodes (conf ));
90
+ rpcStubHolder = new ConnectionRegistryRpcStubHolder (conf , user , rpcControllerFactory ,
91
+ getBootstrapNodes (conf ));
106
92
// could return null here is refresh interval is less than zero
107
93
registryEndpointRefresher =
108
94
RegistryEndpointsRefresher .create (conf , initialRefreshDelaySecsConfigName ,
@@ -114,19 +100,7 @@ protected AbstractRpcBasedConnectionRegistry(Configuration conf,
114
100
protected abstract CompletableFuture <Set <ServerName >> fetchEndpoints ();
115
101
116
102
private void refreshStubs () throws IOException {
117
- populateStubs (FutureUtils .get (fetchEndpoints ()));
118
- }
119
-
120
- private void populateStubs (Set <ServerName > addrs ) throws IOException {
121
- Preconditions .checkNotNull (addrs );
122
- ImmutableMap .Builder <ServerName , ClientMetaService .Interface > builder =
123
- ImmutableMap .builderWithExpectedSize (addrs .size ());
124
- User user = User .getCurrent ();
125
- for (ServerName masterAddr : addrs ) {
126
- builder .put (masterAddr ,
127
- ClientMetaService .newStub (rpcClient .createRpcChannel (masterAddr , user , rpcTimeoutMs )));
128
- }
129
- addr2Stub = builder .build ();
103
+ rpcStubHolder .refreshStubs (() -> FutureUtils .get (fetchEndpoints ()));
130
104
}
131
105
132
106
/**
@@ -211,20 +185,25 @@ private <T extends Message> void groupCall(CompletableFuture<T> future, Set<Serv
211
185
212
186
protected final <T extends Message > CompletableFuture <T > call (Callable <T > callable ,
213
187
Predicate <T > isValidResp , String debug ) {
214
- ImmutableMap <ServerName , ClientMetaService .Interface > addr2StubRef = addr2Stub ;
215
- Set <ServerName > servers = addr2StubRef .keySet ();
216
- List <ClientMetaService .Interface > stubs = new ArrayList <>(addr2StubRef .values ());
217
- Collections .shuffle (stubs , ThreadLocalRandom .current ());
218
188
CompletableFuture <T > future = new CompletableFuture <>();
219
- groupCall (future , servers , stubs , 0 , callable , isValidResp , debug ,
220
- new ConcurrentLinkedQueue <>());
189
+ FutureUtils .addListener (rpcStubHolder .getStubs (), (addr2Stub , error ) -> {
190
+ if (error != null ) {
191
+ future .completeExceptionally (error );
192
+ return ;
193
+ }
194
+ Set <ServerName > servers = addr2Stub .keySet ();
195
+ List <ClientMetaService .Interface > stubs = new ArrayList <>(addr2Stub .values ());
196
+ Collections .shuffle (stubs , ThreadLocalRandom .current ());
197
+ groupCall (future , servers , stubs , 0 , callable , isValidResp , debug ,
198
+ new ConcurrentLinkedQueue <>());
199
+ });
221
200
return future ;
222
201
}
223
202
224
203
@ RestrictedApi (explanation = "Should only be called in tests" , link = "" ,
225
204
allowedOnPath = ".*/src/test/.*" )
226
- Set <ServerName > getParsedServers () {
227
- return addr2Stub .keySet ();
205
+ Set <ServerName > getParsedServers () throws IOException {
206
+ return FutureUtils . get ( rpcStubHolder . getStubs ()) .keySet ();
228
207
}
229
208
230
209
/**
@@ -277,8 +256,8 @@ public void close() {
277
256
if (registryEndpointRefresher != null ) {
278
257
registryEndpointRefresher .stop ();
279
258
}
280
- if (rpcClient != null ) {
281
- rpcClient .close ();
259
+ if (rpcStubHolder != null ) {
260
+ rpcStubHolder .close ();
282
261
}
283
262
}, getClass ().getSimpleName () + ".close" );
284
263
}
0 commit comments