34
34
import java .util .concurrent .ThreadPoolExecutor ;
35
35
import java .util .concurrent .TimeUnit ;
36
36
import java .util .function .Supplier ;
37
+ import org .apache .commons .lang3 .StringUtils ;
37
38
import org .apache .hadoop .conf .Configuration ;
38
39
import org .apache .hadoop .hbase .ServerName ;
40
+ import org .apache .hadoop .hbase .TableName ;
39
41
import org .apache .hadoop .hbase .util .Bytes ;
40
42
import org .apache .hadoop .ipc .RemoteException ;
41
43
import org .apache .yetus .audience .InterfaceAudience ;
51
53
* This class is for maintaining the various connection statistics and publishing them through the
52
54
* metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so
53
55
* as to not conflict with other uses of Yammer Metrics within the client application. Calling
54
- * {@link #getMetricsConnection(String, Supplier, Supplier)} implicitly creates and "starts"
55
- * instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to terminate
56
- * the thread pools they allocate. The metrics reporter will be shutdown {@link #shutdown()} when
57
- * all connections within this metrics instances are closed.
56
+ * {@link #getMetricsConnection(Configuration, String, Supplier, Supplier)} implicitly creates and
57
+ * "starts" instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to
58
+ * terminate the thread pools they allocate. The metrics reporter will be shutdown
59
+ * {@link #shutdown()} when all connections within this metrics instances are closed.
58
60
*/
59
61
@ InterfaceAudience .Private
60
62
public final class MetricsConnection implements StatisticTrackable {
61
63
62
64
private static final ConcurrentMap <String , MetricsConnection > METRICS_INSTANCES =
63
65
new ConcurrentHashMap <>();
64
66
65
- static MetricsConnection getMetricsConnection (final String scope ,
67
+ static MetricsConnection getMetricsConnection (final Configuration conf , final String scope ,
66
68
Supplier <ThreadPoolExecutor > batchPool , Supplier <ThreadPoolExecutor > metaPool ) {
67
69
return METRICS_INSTANCES .compute (scope , (s , metricsConnection ) -> {
68
70
if (metricsConnection == null ) {
69
- MetricsConnection newMetricsConn = new MetricsConnection (scope , batchPool , metaPool );
71
+ MetricsConnection newMetricsConn = new MetricsConnection (conf , scope , batchPool , metaPool );
70
72
newMetricsConn .incrConnectionCount ();
71
73
return newMetricsConn ;
72
74
} else {
@@ -91,6 +93,10 @@ static void deleteMetricsConnection(final String scope) {
91
93
/** Set this key to {@code true} to enable metrics collection of client requests. */
92
94
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable" ;
93
95
96
+ /** Set this key to {@code true} to enable table metrics collection of client requests. */
97
+ public static final String CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY =
98
+ "hbase.client.table.metrics.enable" ;
99
+
94
100
/**
95
101
* Set to specify a custom scope for the metrics published through {@link MetricsConnection}. The
96
102
* scope is added to JMX MBean objectName, and defaults to a combination of the Connection's
@@ -311,6 +317,7 @@ private static interface NewMetric<T> {
311
317
private final MetricRegistry registry ;
312
318
private final JmxReporter reporter ;
313
319
private final String scope ;
320
+ private final boolean tableMetricsEnabled ;
314
321
315
322
private final NewMetric <Timer > timerFactory = new NewMetric <Timer >() {
316
323
@ Override
@@ -378,9 +385,10 @@ public Counter newMetric(Class<?> clazz, String name, String scope) {
378
385
private final ConcurrentMap <String , Counter > rpcCounters =
379
386
new ConcurrentHashMap <>(CAPACITY , LOAD_FACTOR , CONCURRENCY_LEVEL );
380
387
381
- private MetricsConnection (String scope , Supplier < ThreadPoolExecutor > batchPool ,
382
- Supplier <ThreadPoolExecutor > metaPool ) {
388
+ private MetricsConnection (Configuration conf , String scope ,
389
+ Supplier <ThreadPoolExecutor > batchPool , Supplier < ThreadPoolExecutor > metaPool ) {
383
390
this .scope = scope ;
391
+ this .tableMetricsEnabled = conf .getBoolean (CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY , false );
384
392
addThreadPools (batchPool , metaPool );
385
393
this .registry = new MetricRegistry ();
386
394
this .registry .register (getExecutorPoolName (), new RatioGauge () {
@@ -520,6 +528,16 @@ public ConcurrentMap<String, Counter> getRpcCounters() {
520
528
return rpcCounters ;
521
529
}
522
530
531
+ /** rpcTimers metric */
532
+ public ConcurrentMap <String , Timer > getRpcTimers () {
533
+ return rpcTimers ;
534
+ }
535
+
536
+ /** rpcHistograms metric */
537
+ public ConcurrentMap <String , Histogram > getRpcHistograms () {
538
+ return rpcHistograms ;
539
+ }
540
+
523
541
/** getTracker metric */
524
542
public CallTracker getGetTracker () {
525
543
return getTracker ;
@@ -694,7 +712,8 @@ private void shutdown() {
694
712
}
695
713
696
714
/** Report RPC context to metrics system. */
697
- public void updateRpc (MethodDescriptor method , Message param , CallStats stats , Throwable e ) {
715
+ public void updateRpc (MethodDescriptor method , TableName tableName , Message param ,
716
+ CallStats stats , Throwable e ) {
698
717
int callsPerServer = stats .getConcurrentCallsPerServer ();
699
718
if (callsPerServer > 0 ) {
700
719
concurrentCallsPerServerHist .update (callsPerServer );
@@ -744,29 +763,33 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats, T
744
763
case 0 :
745
764
assert "Get" .equals (method .getName ());
746
765
getTracker .updateRpc (stats );
766
+ updateTableMetric (methodName .toString (), tableName , stats , e );
747
767
return ;
748
768
case 1 :
749
769
assert "Mutate" .equals (method .getName ());
750
770
final MutationType mutationType = ((MutateRequest ) param ).getMutation ().getMutateType ();
751
771
switch (mutationType ) {
752
772
case APPEND :
753
773
appendTracker .updateRpc (stats );
754
- return ;
774
+ break ;
755
775
case DELETE :
756
776
deleteTracker .updateRpc (stats );
757
- return ;
777
+ break ;
758
778
case INCREMENT :
759
779
incrementTracker .updateRpc (stats );
760
- return ;
780
+ break ;
761
781
case PUT :
762
782
putTracker .updateRpc (stats );
763
- return ;
783
+ break ;
764
784
default :
765
785
throw new RuntimeException ("Unrecognized mutation type " + mutationType );
766
786
}
787
+ updateTableMetric (methodName .toString (), tableName , stats , e );
788
+ return ;
767
789
case 2 :
768
790
assert "Scan" .equals (method .getName ());
769
791
scanTracker .updateRpc (stats );
792
+ updateTableMetric (methodName .toString (), tableName , stats , e );
770
793
return ;
771
794
case 3 :
772
795
assert "BulkLoadHFile" .equals (method .getName ());
@@ -792,6 +815,7 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats, T
792
815
assert "Multi" .equals (method .getName ());
793
816
numActionsPerServerHist .update (stats .getNumActionsPerServer ());
794
817
multiTracker .updateRpc (stats );
818
+ updateTableMetric (methodName .toString (), tableName , stats , e );
795
819
return ;
796
820
default :
797
821
throw new RuntimeException ("Unrecognized ClientService RPC type " + method .getFullName ());
@@ -801,6 +825,26 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats, T
801
825
updateRpcGeneric (methodName .toString (), stats );
802
826
}
803
827
828
+ /** Report table rpc context to metrics system. */
829
+ private void updateTableMetric (String methodName , TableName tableName , CallStats stats ,
830
+ Throwable e ) {
831
+ if (tableMetricsEnabled ) {
832
+ if (methodName != null ) {
833
+ String table = tableName != null && StringUtils .isNotEmpty (tableName .getNameAsString ())
834
+ ? tableName .getNameAsString ()
835
+ : "unknown" ;
836
+ String metricKey = methodName + "_" + table ;
837
+ // update table rpc context to metrics system,
838
+ // includes rpc call duration, rpc call request/response size(bytes).
839
+ updateRpcGeneric (metricKey , stats );
840
+ if (e != null ) {
841
+ // rpc failure call counter with table name.
842
+ getMetric (FAILURE_CNT_BASE + metricKey , rpcCounters , counterFactory ).inc ();
843
+ }
844
+ }
845
+ }
846
+ }
847
+
804
848
public void incrCacheDroppingExceptions (Object exception ) {
805
849
getMetric (
806
850
CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception .getClass ().getSimpleName ()),
0 commit comments