Skip to content

Commit 7ade32f

Browse files
committed
HBASE-25051 DIGEST based auth broken for rpc based ConnectionRegistry
1 parent 6b0ce08 commit 7ade32f

File tree

46 files changed

+716
-199
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+716
-199
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java

Lines changed: 19 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,17 @@
3333
import java.util.concurrent.atomic.AtomicInteger;
3434
import java.util.function.Predicate;
3535
import org.apache.hadoop.conf.Configuration;
36-
import org.apache.hadoop.hbase.HConstants;
3736
import org.apache.hadoop.hbase.HRegionLocation;
3837
import org.apache.hadoop.hbase.RegionLocations;
3938
import org.apache.hadoop.hbase.ServerName;
4039
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
4140
import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
4241
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
43-
import org.apache.hadoop.hbase.ipc.RpcClient;
44-
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
4542
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
4643
import org.apache.hadoop.hbase.security.User;
4744
import org.apache.hadoop.hbase.util.FutureUtils;
4845
import org.apache.yetus.audience.InterfaceAudience;
4946

50-
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
51-
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
5247
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
5348
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
5449

@@ -79,30 +74,21 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry
7974

8075
private final int hedgedReadFanOut;
8176

82-
// Configured list of end points to probe the meta information from.
83-
private volatile ImmutableMap<ServerName, ClientMetaService.Interface> addr2Stub;
84-
8577
// RPC client used to talk to the masters.
86-
private final RpcClient rpcClient;
78+
private final ConnectionRegistryRpcStubHolder rpcStubHolder;
8779
private final RpcControllerFactory rpcControllerFactory;
88-
private final int rpcTimeoutMs;
8980

9081
private final RegistryEndpointsRefresher registryEndpointRefresher;
9182

92-
protected AbstractRpcBasedConnectionRegistry(Configuration conf,
83+
protected AbstractRpcBasedConnectionRegistry(Configuration conf, User user,
9384
String hedgedReqsFanoutConfigName, String initialRefreshDelaySecsConfigName,
9485
String refreshIntervalSecsConfigName, String minRefreshIntervalSecsConfigName)
9586
throws IOException {
9687
this.hedgedReadFanOut =
9788
Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT));
98-
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
99-
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
100-
// XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
101-
// this through the master registry...
102-
// This is a problem as we will use the cluster id to determine the authentication method
103-
rpcClient = RpcClientFactory.createClient(conf, null);
10489
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
105-
populateStubs(getBootstrapNodes(conf));
90+
rpcStubHolder = new ConnectionRegistryRpcStubHolder(conf, user, rpcControllerFactory,
91+
getBootstrapNodes(conf));
10692
// could return null here is refresh interval is less than zero
10793
registryEndpointRefresher =
10894
RegistryEndpointsRefresher.create(conf, initialRefreshDelaySecsConfigName,
@@ -114,19 +100,7 @@ protected AbstractRpcBasedConnectionRegistry(Configuration conf,
114100
protected abstract CompletableFuture<Set<ServerName>> fetchEndpoints();
115101

116102
private void refreshStubs() throws IOException {
117-
populateStubs(FutureUtils.get(fetchEndpoints()));
118-
}
119-
120-
private void populateStubs(Set<ServerName> addrs) throws IOException {
121-
Preconditions.checkNotNull(addrs);
122-
ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
123-
ImmutableMap.builderWithExpectedSize(addrs.size());
124-
User user = User.getCurrent();
125-
for (ServerName masterAddr : addrs) {
126-
builder.put(masterAddr,
127-
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
128-
}
129-
addr2Stub = builder.build();
103+
rpcStubHolder.refreshStubs(() -> FutureUtils.get(fetchEndpoints()));
130104
}
131105

132106
/**
@@ -211,20 +185,24 @@ private <T extends Message> void groupCall(CompletableFuture<T> future, Set<Serv
211185

212186
protected final <T extends Message> CompletableFuture<T> call(Callable<T> callable,
213187
Predicate<T> isValidResp, String debug) {
214-
ImmutableMap<ServerName, ClientMetaService.Interface> addr2StubRef = addr2Stub;
215-
Set<ServerName> servers = addr2StubRef.keySet();
216-
List<ClientMetaService.Interface> stubs = new ArrayList<>(addr2StubRef.values());
217-
Collections.shuffle(stubs, ThreadLocalRandom.current());
218188
CompletableFuture<T> future = new CompletableFuture<>();
219-
groupCall(future, servers, stubs, 0, callable, isValidResp, debug,
220-
new ConcurrentLinkedQueue<>());
189+
FutureUtils.addListener(rpcStubHolder.getStubs(), (addr2Stub, error) -> {
190+
if (error != null) {
191+
future.completeExceptionally(error);
192+
}
193+
Set<ServerName> servers = addr2Stub.keySet();
194+
List<ClientMetaService.Interface> stubs = new ArrayList<>(addr2Stub.values());
195+
Collections.shuffle(stubs, ThreadLocalRandom.current());
196+
groupCall(future, servers, stubs, 0, callable, isValidResp, debug,
197+
new ConcurrentLinkedQueue<>());
198+
});
221199
return future;
222200
}
223201

224202
@RestrictedApi(explanation = "Should only be called in tests", link = "",
225203
allowedOnPath = ".*/src/test/.*")
226-
Set<ServerName> getParsedServers() {
227-
return addr2Stub.keySet();
204+
Set<ServerName> getParsedServers() throws IOException {
205+
return FutureUtils.get(rpcStubHolder.getStubs()).keySet();
228206
}
229207

230208
/**
@@ -277,8 +255,8 @@ public void close() {
277255
if (registryEndpointRefresher != null) {
278256
registryEndpointRefresher.stop();
279257
}
280-
if (rpcClient != null) {
281-
rpcClient.close();
258+
if (rpcStubHolder != null) {
259+
rpcStubHolder.close();
282260
}
283261
}, getClass().getSimpleName() + ".close");
284262
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ public static CompletableFuture<AsyncConnection> createAsyncConnection(Configura
338338
final User user, Map<String, byte[]> connectionAttributes) {
339339
return TraceUtil.tracedFuture(() -> {
340340
CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
341-
ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf);
341+
ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf, user);
342342
addListener(registry.getClusterId(), (clusterId, error) -> {
343343
if (error != null) {
344344
registry.close();

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import org.apache.hadoop.hbase.ServerName;
3030
import org.apache.hadoop.hbase.TableName;
3131
import org.apache.hadoop.hbase.log.HBaseMarkers;
32-
import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier;
3332
import org.apache.hadoop.hbase.util.FutureUtils;
33+
import org.apache.hadoop.hbase.util.IOExceptionSupplier;
3434
import org.apache.yetus.audience.InterfaceAudience;
3535
import org.slf4j.Logger;
3636
import org.slf4j.LoggerFactory;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
2121

2222
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.hbase.security.User;
2324
import org.apache.hadoop.hbase.util.ReflectionUtils;
2425
import org.apache.yetus.audience.InterfaceAudience;
2526

@@ -33,10 +34,10 @@ private ConnectionRegistryFactory() {
3334
}
3435

3536
/** Returns The connection registry implementation to use. */
36-
static ConnectionRegistry getRegistry(Configuration conf) {
37+
static ConnectionRegistry getRegistry(Configuration conf, User user) {
3738
Class<? extends ConnectionRegistry> clazz =
3839
conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, RpcConnectionRegistry.class,
3940
ConnectionRegistry.class);
40-
return ReflectionUtils.newInstance(clazz, conf);
41+
return ReflectionUtils.newInstance(clazz, conf, user);
4142
}
4243
}
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import java.io.Closeable;
21+
import java.io.IOException;
22+
import java.util.ArrayList;
23+
import java.util.Collection;
24+
import java.util.Collections;
25+
import java.util.List;
26+
import java.util.Set;
27+
import java.util.concurrent.CompletableFuture;
28+
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.hadoop.hbase.HConstants;
30+
import org.apache.hadoop.hbase.ServerName;
31+
import org.apache.hadoop.hbase.ipc.BadAuthException;
32+
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
33+
import org.apache.hadoop.hbase.ipc.RpcClient;
34+
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
35+
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
36+
import org.apache.hadoop.hbase.security.User;
37+
import org.apache.hadoop.hbase.util.IOExceptionSupplier;
38+
import org.apache.hadoop.ipc.RemoteException;
39+
import org.apache.yetus.audience.InterfaceAudience;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
43+
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
44+
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
45+
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
46+
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
47+
48+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
49+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService;
50+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest;
51+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
52+
53+
/**
54+
* A class for creating {@link RpcClient} and related stubs used by
55+
* {@link AbstractRpcBasedConnectionRegistry}. We need to connect to bootstrap nodes to get the
56+
* cluster id first, before creating the final {@link RpcClient} and related stubs.
57+
* <p>
58+
* See HBASE-25051 for more details.
59+
*/
60+
@InterfaceAudience.Private
61+
class ConnectionRegistryRpcStubHolder implements Closeable {
62+
63+
private static final Logger LOG = LoggerFactory.getLogger(ConnectionRegistryRpcStubHolder.class);
64+
65+
private final Configuration conf;
66+
67+
// used for getting cluster id
68+
private final Configuration noAuthConf;
69+
70+
private final User user;
71+
72+
private final RpcControllerFactory rpcControllerFactory;
73+
74+
private final Set<ServerName> bootstrapNodes;
75+
76+
private final int rpcTimeoutMs;
77+
78+
private volatile ImmutableMap<ServerName, ClientMetaService.Interface> addr2Stub;
79+
80+
private volatile RpcClient rpcClient;
81+
82+
private CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> addr2StubFuture;
83+
84+
ConnectionRegistryRpcStubHolder(Configuration conf, User user,
85+
RpcControllerFactory rpcControllerFactory, Set<ServerName> bootstrapNodes) {
86+
this.conf = conf;
87+
if (User.isHBaseSecurityEnabled(conf)) {
88+
this.noAuthConf = new Configuration(conf);
89+
this.noAuthConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
90+
} else {
91+
this.noAuthConf = conf;
92+
}
93+
this.user = user;
94+
this.rpcControllerFactory = rpcControllerFactory;
95+
this.bootstrapNodes = Collections.unmodifiableSet(bootstrapNodes);
96+
this.rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
97+
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
98+
}
99+
100+
private ImmutableMap<ServerName, ClientMetaService.Interface> createStubs(RpcClient rpcClient,
101+
Collection<ServerName> addrs) {
102+
LOG.debug("Going to use new servers to create stubs: {}", addrs);
103+
Preconditions.checkNotNull(addrs);
104+
ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
105+
ImmutableMap.builderWithExpectedSize(addrs.size());
106+
for (ServerName masterAddr : addrs) {
107+
builder.put(masterAddr,
108+
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
109+
}
110+
return builder.build();
111+
}
112+
113+
private boolean isBadAuthException(IOException e) {
114+
return e instanceof RemoteException
115+
&& BadAuthException.class.getName().equals(((RemoteException) e).getClassName());
116+
}
117+
118+
private void createStubsAndComplete(String clusterId,
119+
CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> future) {
120+
RpcClient c = RpcClientFactory.createClient(conf, clusterId);
121+
ImmutableMap<ServerName, ClientMetaService.Interface> m = createStubs(c, bootstrapNodes);
122+
rpcClient = c;
123+
addr2Stub = m;
124+
future.complete(m);
125+
}
126+
127+
private void createStubs(List<ServerName> bootstrapServers, int index,
128+
CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> future) {
129+
// use null cluster id here as we do not know the cluster id yet, we will fetch it through this
130+
// rpc client and use it to create the final rpc client
131+
RpcClient getConnectionRegistryRpcClient = RpcClientFactory.createClient(noAuthConf, null);
132+
// user and rpcTimeout are both not important here, as we will not actually send any rpc calls
133+
// out, only a preamble connection header, but if we pass null as user, there will be NPE in
134+
// some code paths...
135+
RpcChannel channel =
136+
getConnectionRegistryRpcClient.createRpcChannel(bootstrapServers.get(index), user, 0);
137+
ConnectionRegistryService.Interface stub = ConnectionRegistryService.newStub(channel);
138+
HBaseRpcController controller = rpcControllerFactory.newController();
139+
stub.getConnectionRegistry(controller, GetConnectionRegistryRequest.getDefaultInstance(),
140+
new RpcCallback<GetConnectionRegistryResponse>() {
141+
142+
@Override
143+
public void run(GetConnectionRegistryResponse resp) {
144+
synchronized (ConnectionRegistryRpcStubHolder.this) {
145+
addr2StubFuture = null;
146+
if (controller.failed()) {
147+
if (isBadAuthException(controller.getFailed())) {
148+
// this means we have connected to an old server where it does not support passing
149+
// cluster id through preamble connnection header, so we fallback to use null
150+
// cluster id, which is the old behavior
151+
LOG.debug("Failed to get connection registry info, should be an old server,"
152+
+ " fallback to use null cluster id", controller.getFailed());
153+
createStubsAndComplete(null, future);
154+
} else {
155+
LOG.debug("Failed to get connection registry info", controller.getFailed());
156+
if (index == bootstrapServers.size() - 1) {
157+
future.completeExceptionally(controller.getFailed());
158+
} else {
159+
// try next bootstrap server
160+
createStubs(bootstrapServers, index + 1, future);
161+
}
162+
}
163+
} else {
164+
LOG.debug("Got connection registry info: {}", resp);
165+
String clusterId = resp.getClusterId();
166+
createStubsAndComplete(clusterId, future);
167+
}
168+
}
169+
getConnectionRegistryRpcClient.close();
170+
}
171+
});
172+
}
173+
174+
private CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> createStubs() {
175+
CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> future =
176+
new CompletableFuture<>();
177+
List<ServerName> bootstrapServers = new ArrayList<ServerName>(bootstrapNodes);
178+
Collections.shuffle(bootstrapServers);
179+
createStubs(bootstrapServers, 0, future);
180+
return future;
181+
}
182+
183+
CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> getStubs() {
184+
ImmutableMap<ServerName, ClientMetaService.Interface> s = this.addr2Stub;
185+
if (s != null) {
186+
return CompletableFuture.completedFuture(s);
187+
}
188+
synchronized (this) {
189+
s = this.addr2Stub;
190+
if (s != null) {
191+
return CompletableFuture.completedFuture(s);
192+
}
193+
if (addr2StubFuture != null) {
194+
return addr2StubFuture;
195+
}
196+
addr2StubFuture = createStubs();
197+
return addr2StubFuture;
198+
}
199+
}
200+
201+
void refreshStubs(IOExceptionSupplier<Collection<ServerName>> fetchEndpoints) throws IOException {
202+
// There is no actual call yet so we have not initialize the rpc client and related stubs yet,
203+
// give up refreshing
204+
if (addr2Stub == null) {
205+
LOG.debug("Skip refreshing stubs as we have not initialized rpc client yet");
206+
return;
207+
}
208+
LOG.debug("Going to refresh stubs");
209+
assert rpcClient != null;
210+
addr2Stub = createStubs(rpcClient, fetchEndpoints.get());
211+
}
212+
213+
@Override
214+
public void close() {
215+
if (rpcClient != null) {
216+
rpcClient.close();
217+
}
218+
}
219+
}

0 commit comments

Comments
 (0)