Skip to content

Commit 6df2a9d

Browse files
committed
HBASE-26474 Implement connection-level attributes
Add support for `db.system`, `db.connection_string`, `db.user`.
1 parent 8f5a12f commit 6df2a9d

File tree

21 files changed

+477
-124
lines changed

21 files changed

+477
-124
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,11 @@ public CompletableFuture<ServerName> getActiveMaster() {
271271
getClass().getSimpleName() + ".getClusterId");
272272
}
273273

274+
@Override
275+
public String getConnectionString() {
276+
return "unimplemented";
277+
}
278+
274279
@Override
275280
public void close() {
276281
trace(() -> {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
* The implementation of AsyncConnection.
7575
*/
7676
@InterfaceAudience.Private
77-
class AsyncConnectionImpl implements AsyncConnection {
77+
public class AsyncConnectionImpl implements AsyncConnection {
7878

7979
private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
8080

@@ -198,6 +198,14 @@ synchronized ChoreService getChoreService() {
198198
return choreService;
199199
}
200200

201+
public User getUser() {
202+
return user;
203+
}
204+
205+
public ConnectionRegistry getConnectionRegistry() {
206+
return registry;
207+
}
208+
201209
@Override
202210
public Configuration getConfiguration() {
203211
return conf;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
@@ -20,24 +20,27 @@
2020
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
2121
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY;
2222
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
23-
import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan;
24-
import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan;
2523
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
2624

2725
import io.opentelemetry.api.trace.Span;
2826
import io.opentelemetry.api.trace.StatusCode;
2927
import io.opentelemetry.context.Scope;
30-
import java.util.ArrayList;
3128
import java.util.Arrays;
29+
import java.util.Collections;
3230
import java.util.List;
31+
import java.util.Objects;
32+
import java.util.Optional;
3333
import java.util.concurrent.CompletableFuture;
3434
import java.util.concurrent.TimeUnit;
3535
import java.util.function.Function;
3636
import java.util.function.Supplier;
37+
import java.util.stream.Collectors;
3738
import org.apache.hadoop.hbase.HRegionLocation;
3839
import org.apache.hadoop.hbase.RegionLocations;
3940
import org.apache.hadoop.hbase.ServerName;
4041
import org.apache.hadoop.hbase.TableName;
42+
import org.apache.hadoop.hbase.client.trace.ConnectionSpanBuilder;
43+
import org.apache.hadoop.hbase.client.trace.TableSpanBuilder;
4144
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
4245
import org.apache.hadoop.hbase.trace.TraceUtil;
4346
import org.apache.hadoop.hbase.util.Bytes;
@@ -96,9 +99,12 @@ private boolean isMeta(TableName tableName) {
9699
return TableName.isMetaTableName(tableName);
97100
}
98101

99-
private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture<T>> action,
100-
Function<T, List<String>> getRegionNames, TableName tableName, String methodName) {
101-
Span span = createTableSpan("AsyncRegionLocator." + methodName, tableName);
102+
private <T> CompletableFuture<T> tracedLocationFuture(
103+
Supplier<CompletableFuture<T>> action,
104+
Function<T, List<String>> getRegionNames,
105+
Supplier<Span> spanSupplier
106+
) {
107+
final Span span = spanSupplier.get();
102108
try (Scope scope = span.makeCurrent()) {
103109
CompletableFuture<T> future = action.get();
104110
FutureUtils.addListener(future, (resp, error) -> {
@@ -117,18 +123,29 @@ private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture
117123
}
118124
}
119125

120-
private List<String> getRegionName(RegionLocations locs) {
121-
List<String> names = new ArrayList<>();
122-
for (HRegionLocation loc : locs.getRegionLocations()) {
123-
if (loc != null) {
124-
names.add(loc.getRegion().getRegionNameAsString());
125-
}
126-
}
127-
return names;
126+
private static List<String> getRegionNames(RegionLocations locs) {
127+
if (locs == null) { return Collections.emptyList(); }
128+
if (locs.getRegionLocations() == null) { return Collections.emptyList(); }
129+
return Arrays.stream(locs.getRegionLocations())
130+
.filter(Objects::nonNull)
131+
.map(HRegionLocation::getRegion)
132+
.map(RegionInfo::getRegionNameAsString)
133+
.collect(Collectors.toList());
134+
}
135+
136+
private static List<String> getRegionNames(HRegionLocation location) {
137+
return Optional.ofNullable(location)
138+
.map(HRegionLocation::getRegion)
139+
.map(RegionInfo::getRegionNameAsString)
140+
.map(Collections::singletonList)
141+
.orElseGet(Collections::emptyList);
128142
}
129143

130144
CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
131145
RegionLocateType type, boolean reload, long timeoutNs) {
146+
final Supplier<Span> supplier = new TableSpanBuilder<>(conn)
147+
.setName("AsyncRegionLocator.getRegionLocations")
148+
.setTableName(tableName);
132149
return tracedLocationFuture(() -> {
133150
CompletableFuture<RegionLocations> future = isMeta(tableName) ?
134151
metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) :
@@ -138,11 +155,14 @@ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[
138155
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
139156
"ms) waiting for region locations for " + tableName + ", row='" +
140157
Bytes.toStringBinary(row) + "'");
141-
}, this::getRegionName, tableName, "getRegionLocations");
158+
}, AsyncRegionLocator::getRegionNames, supplier);
142159
}
143160

144161
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
145162
int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
163+
final Supplier<Span> supplier = new TableSpanBuilder<>(conn)
164+
.setName("AsyncRegionLocator.getRegionLocation")
165+
.setTableName(tableName);
146166
return tracedLocationFuture(() -> {
147167
// meta region can not be split right now so we always call the same method.
148168
// Change it later if the meta table can have more than one regions.
@@ -173,8 +193,7 @@ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[]
173193
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
174194
"ms) waiting for region location for " + tableName + ", row='" +
175195
Bytes.toStringBinary(row) + "', replicaId=" + replicaId);
176-
}, loc -> Arrays.asList(loc.getRegion().getRegionNameAsString()), tableName,
177-
"getRegionLocation");
196+
}, AsyncRegionLocator::getRegionNames, supplier);
178197
}
179198

180199
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
@@ -202,31 +221,38 @@ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
202221
}
203222

204223
void clearCache(TableName tableName) {
224+
Supplier<Span> supplier = new TableSpanBuilder<>(conn)
225+
.setName("AsyncRegionLocator.clearCache")
226+
.setTableName(tableName);
205227
TraceUtil.trace(() -> {
206228
LOG.debug("Clear meta cache for {}", tableName);
207229
if (tableName.equals(META_TABLE_NAME)) {
208230
metaRegionLocator.clearCache();
209231
} else {
210232
nonMetaRegionLocator.clearCache(tableName);
211233
}
212-
}, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName));
234+
}, supplier);
213235
}
214236

215237
void clearCache(ServerName serverName) {
238+
Supplier<Span> supplier = new ConnectionSpanBuilder<>(conn)
239+
.setName("AsyncRegionLocator.clearCache")
240+
.addAttribute(SERVER_NAME_KEY, serverName.getServerName());
216241
TraceUtil.trace(() -> {
217242
LOG.debug("Clear meta cache for {}", serverName);
218243
metaRegionLocator.clearCache(serverName);
219244
nonMetaRegionLocator.clearCache(serverName);
220245
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
221-
}, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY,
222-
serverName.getServerName()));
246+
}, supplier);
223247
}
224248

225249
void clearCache() {
250+
Supplier<Span> supplier = new ConnectionSpanBuilder<>(conn)
251+
.setName("AsyncRegionLocator.clearCache");
226252
TraceUtil.trace(() -> {
227253
metaRegionLocator.clearCache();
228254
nonMetaRegionLocator.clearCache();
229-
}, "AsyncRegionLocator.clearCache");
255+
}, supplier);
230256
}
231257

232258
AsyncNonMetaRegionLocator getNonMetaRegionLocator() {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* Internal use only.
3030
*/
3131
@InterfaceAudience.Private
32-
interface ConnectionRegistry extends Closeable {
32+
public interface ConnectionRegistry extends Closeable {
3333

3434
/**
3535
* Get the location of meta region(s).
@@ -48,6 +48,11 @@ interface ConnectionRegistry extends Closeable {
4848
*/
4949
CompletableFuture<ServerName> getActiveMaster();
5050

51+
/**
52+
* Return the connection string associated with this registry instance.
53+
*/
54+
String getConnectionString();
55+
5156
/**
5257
* Closes this instance and releases any system resources associated with it
5358
*/

hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,12 @@ public static Set<ServerName> parseMasterAddrs(Configuration conf) throws Unknow
8787
return masterAddrs;
8888
}
8989

90+
private final String connectionString;
91+
9092
MasterRegistry(Configuration conf) throws IOException {
9193
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS,
9294
MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
95+
connectionString = getConnectionString(conf);
9396
}
9497

9598
@Override
@@ -102,6 +105,15 @@ protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
102105
return getMasters();
103106
}
104107

108+
@Override
109+
public String getConnectionString() {
110+
return connectionString;
111+
}
112+
113+
static String getConnectionString(Configuration conf) throws UnknownHostException {
114+
return getMasterAddr(conf);
115+
}
116+
105117
/**
106118
* Builds the default master address end point if it is not specified in the configuration.
107119
* <p/>

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,8 @@ private CompletableFuture<Result> get(Get get, int replicaId) {
222222
.replicaId(replicaId).call();
223223
}
224224

225-
private TableOperationSpanBuilder newTableOperationSpanBuilder() {
226-
return new TableOperationSpanBuilder().setTableName(tableName);
225+
private TableOperationSpanBuilder<?> newTableOperationSpanBuilder() {
226+
return new TableOperationSpanBuilder<>(conn).setTableName(tableName);
227227
}
228228

229229
@Override

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.client;
1919

2020
import java.io.IOException;
21+
import java.net.UnknownHostException;
2122
import java.util.Set;
2223
import java.util.concurrent.CompletableFuture;
2324
import java.util.stream.Collectors;
@@ -72,9 +73,23 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
7273

7374
private static final char ADDRS_CONF_SEPARATOR = ',';
7475

76+
private final String connectionString;
77+
7578
RpcConnectionRegistry(Configuration conf) throws IOException {
7679
super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS,
7780
MIN_SECS_BETWEEN_REFRESHES);
81+
connectionString = buildConnectionString(conf);
82+
}
83+
84+
private String buildConnectionString(Configuration conf) throws UnknownHostException {
85+
final String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES);
86+
if (StringUtils.isBlank(configuredBootstrapNodes)) {
87+
return MasterRegistry.getConnectionString(conf);
88+
}
89+
return Splitter.on(ADDRS_CONF_SEPARATOR)
90+
.trimResults()
91+
.splitToStream(configuredBootstrapNodes)
92+
.collect(Collectors.joining(String.valueOf(ADDRS_CONF_SEPARATOR)));
7893
}
7994

8095
@Override
@@ -91,6 +106,11 @@ protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOExcepti
91106
}
92107
}
93108

109+
@Override
110+
public String getConnectionString() {
111+
return connectionString;
112+
}
113+
94114
private static Set<ServerName> transformServerNames(GetBootstrapNodesResponse resp) {
95115
return resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
96116
.collect(Collectors.toSet());

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,13 @@ public CompletableFuture<ServerName> getActiveMaster() {
235235
"ZKConnectionRegistry.getActiveMaster");
236236
}
237237

238+
@Override
239+
public String getConnectionString() {
240+
final String serverList = zk.getConnectString();
241+
final String baseZNode = znodePaths.baseZNode;
242+
return serverList + ":" + baseZNode;
243+
}
244+
238245
@Override
239246
public void close() {
240247
zk.close();

0 commit comments

Comments
 (0)