Skip to content

HBASE-26474 Implement connection-level attributes #3952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ public CompletableFuture<ServerName> getActiveMaster() {
getClass().getSimpleName() + ".getClusterId");
}

@Override
public String getConnectionString() {
return "unimplemented";
}

@Override
public void close() {
trace(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
* The implementation of AsyncConnection.
*/
@InterfaceAudience.Private
class AsyncConnectionImpl implements AsyncConnection {
public class AsyncConnectionImpl implements AsyncConnection {

private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);

Expand Down Expand Up @@ -198,6 +198,14 @@ synchronized ChoreService getChoreService() {
return choreService;
}

public User getUser() {
return user;
}

public ConnectionRegistry getConnectionRegistry() {
return registry;
}

@Override
public Configuration getConfiguration() {
return conf;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -20,24 +20,27 @@
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan;
import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.trace.ConnectionSpanBuilder;
import org.apache.hadoop.hbase.client.trace.TableSpanBuilder;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -96,9 +99,12 @@ private boolean isMeta(TableName tableName) {
return TableName.isMetaTableName(tableName);
}

private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture<T>> action,
Function<T, List<String>> getRegionNames, TableName tableName, String methodName) {
Span span = createTableSpan("AsyncRegionLocator." + methodName, tableName);
private <T> CompletableFuture<T> tracedLocationFuture(
Supplier<CompletableFuture<T>> action,
Function<T, List<String>> getRegionNames,
Supplier<Span> spanSupplier
) {
final Span span = spanSupplier.get();
try (Scope scope = span.makeCurrent()) {
CompletableFuture<T> future = action.get();
FutureUtils.addListener(future, (resp, error) -> {
Expand All @@ -117,18 +123,30 @@ private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture
}
}

private List<String> getRegionName(RegionLocations locs) {
List<String> names = new ArrayList<>();
for (HRegionLocation loc : locs.getRegionLocations()) {
if (loc != null) {
names.add(loc.getRegion().getRegionNameAsString());
}
private static List<String> getRegionNames(RegionLocations locs) {
if (locs == null || locs.getRegionLocations() == null) {
return Collections.emptyList();
}
return names;
return Arrays.stream(locs.getRegionLocations())
.filter(Objects::nonNull)
.map(HRegionLocation::getRegion)
.map(RegionInfo::getRegionNameAsString)
.collect(Collectors.toList());
}

private static List<String> getRegionNames(HRegionLocation location) {
return Optional.ofNullable(location)
.map(HRegionLocation::getRegion)
.map(RegionInfo::getRegionNameAsString)
.map(Collections::singletonList)
.orElseGet(Collections::emptyList);
}

CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
RegionLocateType type, boolean reload, long timeoutNs) {
final Supplier<Span> supplier = new TableSpanBuilder(conn)
.setName("AsyncRegionLocator.getRegionLocations")
.setTableName(tableName);
return tracedLocationFuture(() -> {
CompletableFuture<RegionLocations> future = isMeta(tableName) ?
metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) :
Expand All @@ -138,11 +156,14 @@ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region locations for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "'");
}, this::getRegionName, tableName, "getRegionLocations");
}, AsyncRegionLocator::getRegionNames, supplier);
}

CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
final Supplier<Span> supplier = new TableSpanBuilder(conn)
.setName("AsyncRegionLocator.getRegionLocation")
.setTableName(tableName);
return tracedLocationFuture(() -> {
// meta region can not be split right now so we always call the same method.
// Change it later if the meta table can have more than one regions.
Expand Down Expand Up @@ -173,8 +194,7 @@ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[]
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region location for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "', replicaId=" + replicaId);
}, loc -> Arrays.asList(loc.getRegion().getRegionNameAsString()), tableName,
"getRegionLocation");
}, AsyncRegionLocator::getRegionNames, supplier);
}

CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
Expand Down Expand Up @@ -202,31 +222,38 @@ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
}

void clearCache(TableName tableName) {
Supplier<Span> supplier = new TableSpanBuilder(conn)
.setName("AsyncRegionLocator.clearCache")
.setTableName(tableName);
TraceUtil.trace(() -> {
LOG.debug("Clear meta cache for {}", tableName);
if (tableName.equals(META_TABLE_NAME)) {
metaRegionLocator.clearCache();
} else {
nonMetaRegionLocator.clearCache(tableName);
}
}, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName));
}, supplier);
}

void clearCache(ServerName serverName) {
Supplier<Span> supplier = new ConnectionSpanBuilder(conn)
.setName("AsyncRegionLocator.clearCache")
.addAttribute(SERVER_NAME_KEY, serverName.getServerName());
TraceUtil.trace(() -> {
LOG.debug("Clear meta cache for {}", serverName);
metaRegionLocator.clearCache(serverName);
nonMetaRegionLocator.clearCache(serverName);
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
}, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY,
serverName.getServerName()));
}, supplier);
}

void clearCache() {
Supplier<Span> supplier = new ConnectionSpanBuilder(conn)
.setName("AsyncRegionLocator.clearCache");
TraceUtil.trace(() -> {
metaRegionLocator.clearCache();
nonMetaRegionLocator.clearCache();
}, "AsyncRegionLocator.clearCache");
}, supplier);
}

AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* Internal use only.
*/
@InterfaceAudience.Private
interface ConnectionRegistry extends Closeable {
public interface ConnectionRegistry extends Closeable {

/**
* Get the location of meta region(s).
Expand All @@ -48,6 +48,13 @@ interface ConnectionRegistry extends Closeable {
*/
CompletableFuture<ServerName> getActiveMaster();

/**
* Return the connection string associated with this registry instance. This value is
* informational, used for annotating traces. Values returned may not be valid for establishing a
* working cluster connection.
*/
String getConnectionString();

/**
* Closes this instance and releases any system resources associated with it
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,12 @@ public static Set<ServerName> parseMasterAddrs(Configuration conf) throws Unknow
return masterAddrs;
}

private final String connectionString;

MasterRegistry(Configuration conf) throws IOException {
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS,
MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
connectionString = getConnectionString(conf);
}

@Override
Expand All @@ -102,6 +105,15 @@ protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
return getMasters();
}

@Override
public String getConnectionString() {
return connectionString;
}

static String getConnectionString(Configuration conf) throws UnknownHostException {
return getMasterAddr(conf);
}

/**
* Builds the default master address end point if it is not specified in the configuration.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private CompletableFuture<Result> get(Get get, int replicaId) {
}

private TableOperationSpanBuilder newTableOperationSpanBuilder() {
return new TableOperationSpanBuilder().setTableName(tableName);
return new TableOperationSpanBuilder(conn).setTableName(tableName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -72,9 +73,23 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {

private static final char ADDRS_CONF_SEPARATOR = ',';

private final String connectionString;

RpcConnectionRegistry(Configuration conf) throws IOException {
super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS,
MIN_SECS_BETWEEN_REFRESHES);
connectionString = buildConnectionString(conf);
}

private String buildConnectionString(Configuration conf) throws UnknownHostException {
final String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES);
if (StringUtils.isBlank(configuredBootstrapNodes)) {
return MasterRegistry.getConnectionString(conf);
}
return Splitter.on(ADDRS_CONF_SEPARATOR)
.trimResults()
.splitToStream(configuredBootstrapNodes)
.collect(Collectors.joining(String.valueOf(ADDRS_CONF_SEPARATOR)));
}

@Override
Expand All @@ -91,6 +106,11 @@ protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOExcepti
}
}

@Override
public String getConnectionString() {
return connectionString;
}

private static Set<ServerName> transformServerNames(GetBootstrapNodesResponse resp) {
return resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ public CompletableFuture<ServerName> getActiveMaster() {
"ZKConnectionRegistry.getActiveMaster");
}

@Override
public String getConnectionString() {
final String serverList = zk.getConnectString();
final String baseZNode = znodePaths.baseZNode;
return serverList + ":" + baseZNode;
}

@Override
public void close() {
zk.close();
Expand Down
Loading