-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Backport "HBASE-26474 Implement connection-level attributes" to branch-2 #4014
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
/** | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
|
@@ -20,24 +20,27 @@ | |
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; | ||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY; | ||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY; | ||
import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan; | ||
import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan; | ||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener; | ||
|
||
import io.opentelemetry.api.trace.Span; | ||
import io.opentelemetry.api.trace.StatusCode; | ||
import io.opentelemetry.context.Scope; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Function; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Collectors; | ||
import org.apache.hadoop.hbase.HRegionLocation; | ||
import org.apache.hadoop.hbase.RegionLocations; | ||
import org.apache.hadoop.hbase.ServerName; | ||
import org.apache.hadoop.hbase.TableName; | ||
import org.apache.hadoop.hbase.client.trace.ConnectionSpanBuilder; | ||
import org.apache.hadoop.hbase.client.trace.TableSpanBuilder; | ||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException; | ||
import org.apache.hadoop.hbase.trace.TraceUtil; | ||
import org.apache.hadoop.hbase.util.Bytes; | ||
|
@@ -95,9 +98,12 @@ private boolean isMeta(TableName tableName) { | |
return TableName.isMetaTableName(tableName); | ||
} | ||
|
||
private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture<T>> action, | ||
Function<T, List<String>> getRegionNames, TableName tableName, String methodName) { | ||
Span span = createTableSpan("AsyncRegionLocator." + methodName, tableName); | ||
private <T> CompletableFuture<T> tracedLocationFuture( | ||
Supplier<CompletableFuture<T>> action, | ||
Function<T, List<String>> getRegionNames, | ||
Supplier<Span> spanSupplier | ||
) { | ||
final Span span = spanSupplier.get(); | ||
try (Scope scope = span.makeCurrent()) { | ||
CompletableFuture<T> future = action.get(); | ||
FutureUtils.addListener(future, (resp, error) -> { | ||
|
@@ -116,18 +122,30 @@ private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture | |
} | ||
} | ||
|
||
private List<String> getRegionName(RegionLocations locs) { | ||
List<String> names = new ArrayList<>(); | ||
for (HRegionLocation loc : locs.getRegionLocations()) { | ||
if (loc != null) { | ||
names.add(loc.getRegion().getRegionNameAsString()); | ||
} | ||
static List<String> getRegionNames(RegionLocations locs) { | ||
if (locs == null || locs.getRegionLocations() == null) { | ||
return Collections.emptyList(); | ||
} | ||
return names; | ||
return Arrays.stream(locs.getRegionLocations()) | ||
.filter(Objects::nonNull) | ||
.map(HRegionLocation::getRegion) | ||
.map(RegionInfo::getRegionNameAsString) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
static List<String> getRegionNames(HRegionLocation location) { | ||
return Optional.ofNullable(location) | ||
.map(HRegionLocation::getRegion) | ||
.map(RegionInfo::getRegionNameAsString) | ||
.map(Collections::singletonList) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will net an immutable list whereas There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted. I had not considered mutability of the previous implementation. Consider this change to immutability a happy side-effect. At least on the master PR, this appears to have introduced no issues. Perhaps this is cause for some of the test failures I seem to have caused in the branch-2 backport. Let me investigate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No test failures after reverting the erroneous changes pointed out by Huaxiang, so I'll take that as acceptability of the immutable collection. Shout if that's not okay by you. |
||
.orElseGet(Collections::emptyList); | ||
} | ||
|
||
CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, | ||
RegionLocateType type, boolean reload, long timeoutNs) { | ||
final Supplier<Span> supplier = new TableSpanBuilder(conn) | ||
.setName("AsyncRegionLocator.getRegionLocations") | ||
.setTableName(tableName); | ||
return tracedLocationFuture(() -> { | ||
CompletableFuture<RegionLocations> future = isMeta(tableName) ? | ||
metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) : | ||
|
@@ -137,11 +155,14 @@ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[ | |
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + | ||
"ms) waiting for region locations for " + tableName + ", row='" + | ||
Bytes.toStringBinary(row) + "'"); | ||
}, this::getRegionName, tableName, "getRegionLocations"); | ||
}, AsyncRegionLocator::getRegionNames, supplier); | ||
} | ||
|
||
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, | ||
int replicaId, RegionLocateType type, boolean reload, long timeoutNs) { | ||
final Supplier<Span> supplier = new TableSpanBuilder(conn) | ||
.setName("AsyncRegionLocator.getRegionLocation") | ||
.setTableName(tableName); | ||
return tracedLocationFuture(() -> { | ||
// meta region can not be split right now so we always call the same method. | ||
// Change it later if the meta table can have more than one regions. | ||
|
@@ -172,8 +193,7 @@ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] | |
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + | ||
"ms) waiting for region location for " + tableName + ", row='" + | ||
Bytes.toStringBinary(row) + "', replicaId=" + replicaId); | ||
}, loc -> Arrays.asList(loc.getRegion().getRegionNameAsString()), tableName, | ||
"getRegionLocation"); | ||
}, AsyncRegionLocator::getRegionNames, supplier); | ||
} | ||
|
||
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, | ||
|
@@ -201,31 +221,38 @@ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { | |
} | ||
|
||
void clearCache(TableName tableName) { | ||
Supplier<Span> supplier = new TableSpanBuilder(conn) | ||
.setName("AsyncRegionLocator.clearCache") | ||
.setTableName(tableName); | ||
TraceUtil.trace(() -> { | ||
LOG.debug("Clear meta cache for {}", tableName); | ||
if (tableName.equals(META_TABLE_NAME)) { | ||
metaRegionLocator.clearCache(); | ||
} else { | ||
nonMetaRegionLocator.clearCache(tableName); | ||
} | ||
}, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName)); | ||
}, supplier); | ||
} | ||
|
||
void clearCache(ServerName serverName) { | ||
Supplier<Span> supplier = new ConnectionSpanBuilder(conn) | ||
.setName("AsyncRegionLocator.clearCache") | ||
.addAttribute(SERVER_NAME_KEY, serverName.getServerName()); | ||
TraceUtil.trace(() -> { | ||
LOG.debug("Clear meta cache for {}", serverName); | ||
metaRegionLocator.clearCache(serverName); | ||
nonMetaRegionLocator.clearCache(serverName); | ||
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer); | ||
}, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY, | ||
serverName.getServerName())); | ||
}, supplier); | ||
} | ||
|
||
void clearCache() { | ||
Supplier<Span> supplier = new ConnectionSpanBuilder(conn) | ||
.setName("AsyncRegionLocator.clearCache"); | ||
TraceUtil.trace(() -> { | ||
metaRegionLocator.clearCache(); | ||
nonMetaRegionLocator.clearCache(); | ||
}, "AsyncRegionLocator.clearCache"); | ||
}, supplier); | ||
} | ||
|
||
AsyncNonMetaRegionLocator getNonMetaRegionLocator() { | ||
|
Uh oh!
There was an error while loading. Please reload this page.