Skip to content

Commit 3e3c8fa

Browse files
authored
HBASE-27466: Making metrics instance containing one or more connections. (#4874) (#4909)
Signed-off-by: David Manning <67607031+d-c-manning@users.noreply.github.com> Signed-off-by: Viraj Jasani <vjasani@apache.org>
1 parent 1bfc58a commit 3e3c8fa

File tree

9 files changed

+288
-68
lines changed

9 files changed

+288
-68
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public class AsyncConnectionImpl implements AsyncConnection {
115115

116116
private volatile boolean closed = false;
117117

118+
private final String metricsScope;
118119
private final Optional<MetricsConnection> metrics;
119120

120121
private final ClusterStatusListener clusterStatusListener;
@@ -123,15 +124,16 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri
123124
User user) {
124125
this.conf = conf;
125126
this.user = user;
127+
this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);
126128

127129
if (user.isLoginFromKeytab()) {
128130
spawnRenewalChore(user.getUGI());
129131
}
130132
this.connConf = new AsyncConnectionConfiguration(conf);
131133
this.registry = registry;
132134
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
133-
String scope = MetricsConnection.getScope(conf, clusterId, this);
134-
this.metrics = Optional.of(new MetricsConnection(scope, () -> null, () -> null));
135+
this.metrics =
136+
Optional.of(MetricsConnection.getMetricsConnection(metricsScope, () -> null, () -> null));
135137
} else {
136138
this.metrics = Optional.empty();
137139
}
@@ -226,7 +228,9 @@ public void close() {
226228
choreService = null;
227229
}
228230
}
229-
metrics.ifPresent(MetricsConnection::shutdown);
231+
if (metrics.isPresent()) {
232+
MetricsConnection.deleteMetricsConnection(metricsScope);
233+
}
230234
closed = true;
231235
}, "AsyncConnection.close");
232236
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
224224
private final RpcClient rpcClient;
225225

226226
private final MetaCache metaCache;
227+
228+
private String metricsScope = null;
227229
private final MetricsConnection metrics;
228230

229231
protected User user;
@@ -312,8 +314,9 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
312314
retrieveClusterId();
313315

314316
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
315-
String scope = MetricsConnection.getScope(conf, clusterId, this);
316-
this.metrics = new MetricsConnection(scope, this::getBatchPool, this::getMetaLookupPool);
317+
this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);
318+
this.metrics = MetricsConnection.getMetricsConnection(this.metricsScope, this::getBatchPool,
319+
this::getMetaLookupPool);
317320
} else {
318321
this.metrics = null;
319322
}
@@ -2131,7 +2134,7 @@ public void close() {
21312134
closeMaster();
21322135
shutdownPools();
21332136
if (this.metrics != null) {
2134-
this.metrics.shutdown();
2137+
MetricsConnection.deleteMetricsConnection(metricsScope);
21352138
}
21362139
this.closed = true;
21372140
if (this.registry != null) {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java

Lines changed: 201 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.codahale.metrics.MetricRegistry;
2727
import com.codahale.metrics.RatioGauge;
2828
import com.codahale.metrics.Timer;
29+
import java.util.ArrayList;
30+
import java.util.List;
2931
import java.util.concurrent.ConcurrentHashMap;
3032
import java.util.concurrent.ConcurrentMap;
3133
import java.util.concurrent.ConcurrentSkipListMap;
@@ -47,12 +49,43 @@
4749
/**
4850
* This class is for maintaining the various connection statistics and publishing them through the
4951
* metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so
50-
* as to not conflict with other uses of Yammer Metrics within the client application. Instantiating
51-
* this class implicitly creates and "starts" instances of these classes; be sure to call
52-
* {@link #shutdown()} to terminate the thread pools they allocate.
52+
* as to not conflict with other uses of Yammer Metrics within the client application. Calling
53+
* {@link #getMetricsConnection(String, Supplier, Supplier)} implicitly creates and "starts"
54+
* instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to terminate
55+
* the thread pools they allocate. The metrics reporter will be shutdown {@link #shutdown()} when
56+
* all connections within this metrics instances are closed.
5357
*/
5458
@InterfaceAudience.Private
55-
public class MetricsConnection implements StatisticTrackable {
59+
public final class MetricsConnection implements StatisticTrackable {
60+
61+
private static final ConcurrentMap<String, MetricsConnection> METRICS_INSTANCES =
62+
new ConcurrentHashMap<>();
63+
64+
static MetricsConnection getMetricsConnection(final String scope,
65+
Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
66+
return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> {
67+
if (metricsConnection == null) {
68+
MetricsConnection newMetricsConn = new MetricsConnection(scope, batchPool, metaPool);
69+
newMetricsConn.incrConnectionCount();
70+
return newMetricsConn;
71+
} else {
72+
metricsConnection.addThreadPools(batchPool, metaPool);
73+
metricsConnection.incrConnectionCount();
74+
return metricsConnection;
75+
}
76+
});
77+
}
78+
79+
static void deleteMetricsConnection(final String scope) {
80+
METRICS_INSTANCES.computeIfPresent(scope, (s, metricsConnection) -> {
81+
metricsConnection.decrConnectionCount();
82+
if (metricsConnection.getConnectionCount() == 0) {
83+
metricsConnection.shutdown();
84+
return null;
85+
}
86+
return metricsConnection;
87+
});
88+
}
5689

5790
/** Set this key to {@code true} to enable metrics collection of client requests. */
5891
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
@@ -231,7 +264,7 @@ public void updateDelayInterval(long interval) {
231264
}
232265
}
233266

234-
protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
267+
private ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
235268
new ConcurrentHashMap<>();
236269

237270
public void updateServerStats(ServerName serverName, byte[] regionName, Object r) {
@@ -272,7 +305,7 @@ private static interface NewMetric<T> {
272305

273306
private final MetricRegistry registry;
274307
private final JmxReporter reporter;
275-
protected final String scope;
308+
private final String scope;
276309

277310
private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
278311
@Override
@@ -295,66 +328,93 @@ public Counter newMetric(Class<?> clazz, String name, String scope) {
295328
}
296329
};
297330

331+
// List of thread pool per connection of the metrics.
332+
private final List<Supplier<ThreadPoolExecutor>> batchPools = new ArrayList<>();
333+
private final List<Supplier<ThreadPoolExecutor>> metaPools = new ArrayList<>();
334+
298335
// static metrics
299336

300-
protected final Counter metaCacheHits;
301-
protected final Counter metaCacheMisses;
302-
protected final CallTracker getTracker;
303-
protected final CallTracker scanTracker;
304-
protected final CallTracker appendTracker;
305-
protected final CallTracker deleteTracker;
306-
protected final CallTracker incrementTracker;
307-
protected final CallTracker putTracker;
308-
protected final CallTracker multiTracker;
309-
protected final RunnerStats runnerStats;
310-
protected final Counter metaCacheNumClearServer;
311-
protected final Counter metaCacheNumClearRegion;
312-
protected final Counter hedgedReadOps;
313-
protected final Counter hedgedReadWin;
314-
protected final Histogram concurrentCallsPerServerHist;
315-
protected final Histogram numActionsPerServerHist;
316-
protected final Counter nsLookups;
317-
protected final Counter nsLookupsFailed;
318-
protected final Timer overloadedBackoffTimer;
337+
private final Counter connectionCount;
338+
private final Counter metaCacheHits;
339+
private final Counter metaCacheMisses;
340+
private final CallTracker getTracker;
341+
private final CallTracker scanTracker;
342+
private final CallTracker appendTracker;
343+
private final CallTracker deleteTracker;
344+
private final CallTracker incrementTracker;
345+
private final CallTracker putTracker;
346+
private final CallTracker multiTracker;
347+
private final RunnerStats runnerStats;
348+
private final Counter metaCacheNumClearServer;
349+
private final Counter metaCacheNumClearRegion;
350+
private final Counter hedgedReadOps;
351+
private final Counter hedgedReadWin;
352+
private final Histogram concurrentCallsPerServerHist;
353+
private final Histogram numActionsPerServerHist;
354+
private final Counter nsLookups;
355+
private final Counter nsLookupsFailed;
356+
private final Timer overloadedBackoffTimer;
319357

320358
// dynamic metrics
321359

322360
// These maps are used to cache references to the metric instances that are managed by the
323361
// registry. I don't think their use perfectly removes redundant allocations, but it's
324362
// a big improvement over calling registry.newMetric each time.
325-
protected final ConcurrentMap<String, Timer> rpcTimers =
363+
private final ConcurrentMap<String, Timer> rpcTimers =
326364
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
327-
protected final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>(
365+
private final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>(
328366
CAPACITY * 2 /* tracking both request and response sizes */, LOAD_FACTOR, CONCURRENCY_LEVEL);
329367
private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
330368
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
331-
protected final ConcurrentMap<String, Counter> rpcCounters =
369+
private final ConcurrentMap<String, Counter> rpcCounters =
332370
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
333371

334-
MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
372+
private MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
335373
Supplier<ThreadPoolExecutor> metaPool) {
336374
this.scope = scope;
375+
addThreadPools(batchPool, metaPool);
337376
this.registry = new MetricRegistry();
338377
this.registry.register(getExecutorPoolName(), new RatioGauge() {
339378
@Override
340379
protected Ratio getRatio() {
341-
ThreadPoolExecutor pool = batchPool.get();
342-
if (pool == null) {
343-
return Ratio.of(0, 0);
380+
int numerator = 0;
381+
int denominator = 0;
382+
for (Supplier<ThreadPoolExecutor> poolSupplier : batchPools) {
383+
ThreadPoolExecutor pool = poolSupplier.get();
384+
if (pool != null) {
385+
int activeCount = pool.getActiveCount();
386+
int maxPoolSize = pool.getMaximumPoolSize();
387+
/* The max thread usage ratio among batch pools of all connections */
388+
if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
389+
numerator = activeCount;
390+
denominator = maxPoolSize;
391+
}
392+
}
344393
}
345-
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
394+
return Ratio.of(numerator, denominator);
346395
}
347396
});
348397
this.registry.register(getMetaPoolName(), new RatioGauge() {
349398
@Override
350399
protected Ratio getRatio() {
351-
ThreadPoolExecutor pool = metaPool.get();
352-
if (pool == null) {
353-
return Ratio.of(0, 0);
400+
int numerator = 0;
401+
int denominator = 0;
402+
for (Supplier<ThreadPoolExecutor> poolSupplier : metaPools) {
403+
ThreadPoolExecutor pool = poolSupplier.get();
404+
if (pool != null) {
405+
int activeCount = pool.getActiveCount();
406+
int maxPoolSize = pool.getMaximumPoolSize();
407+
/* The max thread usage ratio among meta lookup pools of all connections */
408+
if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
409+
numerator = activeCount;
410+
denominator = maxPoolSize;
411+
}
412+
}
354413
}
355-
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
414+
return Ratio.of(numerator, denominator);
356415
}
357416
});
417+
this.connectionCount = registry.counter(name(this.getClass(), "connectionCount", scope));
358418
this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
359419
this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
360420
this.metaCacheNumClearServer =
@@ -397,8 +457,84 @@ MetricRegistry getMetricRegistry() {
397457
return registry;
398458
}
399459

400-
public void shutdown() {
401-
this.reporter.stop();
460+
/** scope of the metrics object */
461+
public String getMetricScope() {
462+
return scope;
463+
}
464+
465+
/** serverStats metric */
466+
public ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> getServerStats() {
467+
return serverStats;
468+
}
469+
470+
/** runnerStats metric */
471+
public RunnerStats getRunnerStats() {
472+
return runnerStats;
473+
}
474+
475+
/** metaCacheNumClearServer metric */
476+
public Counter getMetaCacheNumClearServer() {
477+
return metaCacheNumClearServer;
478+
}
479+
480+
/** metaCacheNumClearRegion metric */
481+
public Counter getMetaCacheNumClearRegion() {
482+
return metaCacheNumClearRegion;
483+
}
484+
485+
/** hedgedReadOps metric */
486+
public Counter getHedgedReadOps() {
487+
return hedgedReadOps;
488+
}
489+
490+
/** hedgedReadWin metric */
491+
public Counter getHedgedReadWin() {
492+
return hedgedReadWin;
493+
}
494+
495+
/** numActionsPerServerHist metric */
496+
public Histogram getNumActionsPerServerHist() {
497+
return numActionsPerServerHist;
498+
}
499+
500+
/** rpcCounters metric */
501+
public ConcurrentMap<String, Counter> getRpcCounters() {
502+
return rpcCounters;
503+
}
504+
505+
/** getTracker metric */
506+
public CallTracker getGetTracker() {
507+
return getTracker;
508+
}
509+
510+
/** scanTracker metric */
511+
public CallTracker getScanTracker() {
512+
return scanTracker;
513+
}
514+
515+
/** multiTracker metric */
516+
public CallTracker getMultiTracker() {
517+
return multiTracker;
518+
}
519+
520+
/** appendTracker metric */
521+
public CallTracker getAppendTracker() {
522+
return appendTracker;
523+
}
524+
525+
/** deleteTracker metric */
526+
public CallTracker getDeleteTracker() {
527+
return deleteTracker;
528+
}
529+
530+
/** incrementTracker metric */
531+
public CallTracker getIncrementTracker() {
532+
return incrementTracker;
533+
}
534+
535+
/** putTracker metric */
536+
public CallTracker getPutTracker() {
537+
return putTracker;
402538
}
403539

404540
/** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
@@ -457,6 +593,28 @@ public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) {
457593
overloadedBackoffTimer.update(time, timeUnit);
458594
}
459595

596+
/** Return the connection count of the metrics within a scope */
597+
public long getConnectionCount() {
598+
return connectionCount.getCount();
599+
}
600+
601+
/** Increment the connection count of the metrics within a scope */
602+
private void incrConnectionCount() {
603+
connectionCount.inc();
604+
}
605+
606+
/** Decrement the connection count of the metrics within a scope */
607+
private void decrConnectionCount() {
608+
connectionCount.dec();
609+
}
610+
611+
/** Add thread pools of additional connections to the metrics */
612+
private void addThreadPools(Supplier<ThreadPoolExecutor> batchPool,
613+
Supplier<ThreadPoolExecutor> metaPool) {
614+
batchPools.add(batchPool);
615+
metaPools.add(metaPool);
616+
}
617+
460618
/**
461619
* Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
462620
*/
@@ -474,6 +632,10 @@ private void updateRpcGeneric(String methodName, CallStats stats) {
474632
.update(stats.getResponseSizeBytes());
475633
}
476634

635+
private void shutdown() {
636+
this.reporter.stop();
637+
}
638+
477639
/** Report RPC context to metrics system. */
478640
public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
479641
int callsPerServer = stats.getConcurrentCallsPerServer();

0 commit comments

Comments
 (0)