Skip to content

HBASE-27890 Expose a getter on Connection/AsyncConnection for getting… #5257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,9 @@ default AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorSer
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
Hbck getHbck(ServerName masterServer) throws IOException;

/** Returns a statistical sample of {@link MetricsConnection} */
default MetricsConnectionSnapshot getMetrics() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -453,4 +453,9 @@ public Hbck getHbck(ServerName masterServer) {
Optional<MetricsConnection> getConnectionMetrics() {
return metrics;
}

@Override
public MetricsConnectionSnapshot getMetrics() {
return metrics.map(MetricsConnection::snapshot).orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,9 @@ default Hbck getHbck() throws IOException {
default Hbck getHbck(ServerName masterServer) throws IOException {
return toAsyncConnection().getHbck(masterServer);
}

/** Returns a statistical sample of {@link MetricsConnection} */
default MetricsConnectionSnapshot getMetrics() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,9 @@ public Hbck getHbck(ServerName masterServer) throws IOException {
public String toString() {
return "connection-over-async-connection-0x" + Integer.toHexString(hashCode());
}

@Override
public MetricsConnectionSnapshot getMetrics() {
return conn.getMetrics();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;

import static com.codahale.metrics.MetricRegistry.name;
import static org.apache.hadoop.hbase.client.MetricsConnectionSnapshot.snapshotMap;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;

import com.codahale.metrics.Counter;
Expand All @@ -36,6 +37,9 @@
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnectionSnapshot.CallTrackerSnapshot;
import org.apache.hadoop.hbase.client.MetricsConnectionSnapshot.RegionStatsSnapshot;
import org.apache.hadoop.hbase.client.MetricsConnectionSnapshot.RunnerStatsSnapshot;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -219,6 +223,10 @@ public void updateRpc(CallStats stats) {
this.respHist.update(stats.getResponseSizeBytes());
}

String getName() {
return name;
}

@Override
public String toString() {
return "CallTracker:" + name;
Expand Down Expand Up @@ -247,12 +255,12 @@ public void update(RegionLoadStats regionStatistics) {
protected static class RunnerStats {
final Counter normalRunners;
final Counter delayRunners;
final Histogram delayIntevalHist;
final Histogram delayIntervalHist;

public RunnerStats(MetricRegistry registry) {
this.normalRunners = registry.counter(name(MetricsConnection.class, "normalRunnersCount"));
this.delayRunners = registry.counter(name(MetricsConnection.class, "delayRunnersCount"));
this.delayIntevalHist =
this.delayIntervalHist =
registry.histogram(name(MetricsConnection.class, "delayIntervalHist"));
}

Expand All @@ -265,11 +273,11 @@ public void incrDelayRunners() {
}

public void updateDelayInterval(long interval) {
this.delayIntevalHist.update(interval);
this.delayIntervalHist.update(interval);
}
}

private ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
private final ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
new ConcurrentHashMap<>();

public void updateServerStats(ServerName serverName, byte[] regionName, Object r) {
Expand Down Expand Up @@ -359,6 +367,8 @@ public Counter newMetric(Class<?> clazz, String name, String scope) {
private final Counter nsLookups;
private final Counter nsLookupsFailed;
private final Timer overloadedBackoffTimer;
private final RatioGauge executorPoolUsageRatio;
private final RatioGauge metaPoolUsageRatio;

// dynamic metrics

Expand All @@ -379,46 +389,20 @@ private MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
this.scope = scope;
addThreadPools(batchPool, metaPool);
this.registry = new MetricRegistry();
this.registry.register(getExecutorPoolName(), new RatioGauge() {
this.executorPoolUsageRatio = new RatioGauge() {
@Override
protected Ratio getRatio() {
int numerator = 0;
int denominator = 0;
for (Supplier<ThreadPoolExecutor> poolSupplier : batchPools) {
ThreadPoolExecutor pool = poolSupplier.get();
if (pool != null) {
int activeCount = pool.getActiveCount();
int maxPoolSize = pool.getMaximumPoolSize();
/* The max thread usage ratio among batch pools of all connections */
if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
numerator = activeCount;
denominator = maxPoolSize;
}
}
}
return Ratio.of(numerator, denominator);
return calculateThreadPoolUsageRatio(batchPools);
}
});
this.registry.register(getMetaPoolName(), new RatioGauge() {
};
this.metaPoolUsageRatio = new RatioGauge() {
@Override
protected Ratio getRatio() {
int numerator = 0;
int denominator = 0;
for (Supplier<ThreadPoolExecutor> poolSupplier : metaPools) {
ThreadPoolExecutor pool = poolSupplier.get();
if (pool != null) {
int activeCount = pool.getActiveCount();
int maxPoolSize = pool.getMaximumPoolSize();
/* The max thread usage ratio among meta lookup pools of all connections */
if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
numerator = activeCount;
denominator = maxPoolSize;
}
}
}
return Ratio.of(numerator, denominator);
return calculateThreadPoolUsageRatio(metaPools);
}
});
};
this.registry.register(getExecutorPoolName(), executorPoolUsageRatio);
this.registry.register(getMetaPoolName(), metaPoolUsageRatio);
this.connectionCount = registry.counter(name(this.getClass(), "connectionCount", scope));
this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
Expand Down Expand Up @@ -745,4 +729,51 @@ public void incrNsLookups() {
public void incrNsLookupsFailed() {
this.nsLookupsFailed.inc();
}

private RatioGauge.Ratio
calculateThreadPoolUsageRatio(Iterable<Supplier<ThreadPoolExecutor>> pools) {
int numerator = 0;
int denominator = 0;
for (Supplier<ThreadPoolExecutor> poolSupplier : pools) {
ThreadPoolExecutor pool = poolSupplier.get();
if (pool != null) {
int activeCount = pool.getActiveCount();
int maxPoolSize = pool.getMaximumPoolSize();
/* The max thread usage ratio among pools of all connections */
if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
numerator = activeCount;
denominator = maxPoolSize;
}
}
}
return RatioGauge.Ratio.of(numerator, denominator);
}

synchronized MetricsConnectionSnapshot snapshot() {
return MetricsConnectionSnapshot.newBuilder().connectionCount(connectionCount.getCount())
.metaCacheHits(metaCacheHits.getCount()).metaCacheMisses(metaCacheMisses.getCount())
.getTrackerSnap(CallTrackerSnapshot.snapshot(getTracker))
.scanTrackerSnap(CallTrackerSnapshot.snapshot(scanTracker))
.appendTrackerSnap(CallTrackerSnapshot.snapshot(appendTracker))
.deleteTrackerSnap(CallTrackerSnapshot.snapshot(deleteTracker))
.incrementTrackerSnap(CallTrackerSnapshot.snapshot(incrementTracker))
.putTrackerSnap(CallTrackerSnapshot.snapshot(putTracker))
.multiTrackerSnap(CallTrackerSnapshot.snapshot(multiTracker))
.runnerStatsSnap(RunnerStatsSnapshot.snapshot(runnerStats))
.metaCacheNumClearServer(metaCacheNumClearServer.getCount())
.metaCacheNumClearRegion(metaCacheNumClearRegion.getCount())
.hedgedReadOps(hedgedReadOps.getCount()).hedgedReadWin(hedgedReadWin.getCount())
.concurrentCallsPerServerHistSnap(concurrentCallsPerServerHist.getSnapshot())
.numActionsPerServerHistSnap(numActionsPerServerHist.getSnapshot())
.nsLookups(nsLookups.getCount()).nsLookupsFailed(nsLookupsFailed.getCount())
.overloadedBackoffTimerSnap(overloadedBackoffTimer.getSnapshot())
.executorPoolUsageRatio(executorPoolUsageRatio.getValue())
.metaPoolUsageRatio(metaPoolUsageRatio.getValue())
.rpcTimersSnap(snapshotMap(rpcTimers, Timer::getSnapshot))
.rpcHistSnap(snapshotMap(rpcHistograms, Histogram::getSnapshot))
.cacheDroppingExceptions(snapshotMap(cacheDroppingExceptions, Counter::getCount))
.rpcCounters(snapshotMap(rpcCounters, Counter::getCount)).serverStats(snapshotMap(serverStats,
regionStatsMap -> snapshotMap(regionStatsMap, RegionStatsSnapshot::snapshot)))
.build();
}
}
Loading