Skip to content

Commit

Permalink
feat: add client-core metrics to veneer (#2978)
Browse files Browse the repository at this point in the history
* feature: add client-core metrics to veneer

* update

* fix test

* fix test

* try fix flaky test again

* trying again

* rename class

* close batcher

* updates based on review

* remove channel interceptor

* fix deps

Co-authored-by: Mattie Fu <mattiefu@google.com>
  • Loading branch information
igorbernstein2 and mutianf authored May 24, 2021
1 parent 31719ad commit 68c2773
Show file tree
Hide file tree
Showing 8 changed files with 344 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,58 @@
package com.google.cloud.bigtable.hbase.wrappers.veneer;

import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminSettings;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.cloud.bigtable.hbase.wrappers.AdminClientWrapper;
import com.google.cloud.bigtable.hbase.wrappers.BigtableApi;
import com.google.cloud.bigtable.hbase.wrappers.DataClientWrapper;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics.MetricLevel;
import com.google.cloud.bigtable.metrics.Counter;
import java.io.IOException;

/** For internal use only - public for technical reasons. */
@InternalApi("For internal usage only")
public class BigtableVeneerApi extends BigtableApi {

private final Counter activeSessions =
BigtableClientMetrics.counter(MetricLevel.Info, "session.active");

private static final SharedDataClientWrapperFactory sharedClientFactory =
new SharedDataClientWrapperFactory();
private final DataClientWrapper dataClientWrapper;
private final AdminClientWrapper adminClientWrapper;
private final int channelPoolSize;

public BigtableVeneerApi(BigtableHBaseVeneerSettings settings) throws IOException {
super(settings);

// active channel count is hard coded at client creation time based on the setting. If
// transportChannelProvider in the data setting is not InstantiatingGrpcChannelProvider, this
// count wil not be present. If channel pool caching is enabled, channel pool size is calculated
// in SharedDataClientWrapperFactory to avoid incrementing/decrementing the same channel
// multiple times for the same key.
if (settings.isChannelPoolCachingEnabled()) {
dataClientWrapper = sharedClientFactory.createDataClient(settings);
channelPoolSize = 0;
} else {
dataClientWrapper =
new DataClientVeneerApi(BigtableDataClient.create(settings.getDataSettings()));
channelPoolSize = getChannelPoolSize(settings.getDataSettings().getStubSettings());
for (int i = 0; i < channelPoolSize; i++) {
BigtableClientMetrics.counter(MetricLevel.Info, "grpc.channel.active").inc();
}
}
BigtableInstanceAdminSettings instanceAdminSettings = settings.getInstanceAdminSettings();
adminClientWrapper =
new AdminClientVeneerApi(
BigtableTableAdminClient.create(settings.getTableAdminSettings()),
BigtableInstanceAdminClient.create(instanceAdminSettings));
activeSessions.inc();
}

@Override
Expand All @@ -63,5 +84,18 @@ public DataClientWrapper getDataClient() {
public void close() throws IOException {
dataClientWrapper.close();
adminClientWrapper.close();
activeSessions.dec();
for (int i = 0; i < channelPoolSize; i++) {
BigtableClientMetrics.counter(MetricLevel.Info, "grpc.channel.active").dec();
}
}

static int getChannelPoolSize(EnhancedBigtableStubSettings stubSettings) {
if (stubSettings.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) {
InstantiatingGrpcChannelProvider channelProvider =
(InstantiatingGrpcChannelProvider) stubSettings.getTransportChannelProvider();
return channelProvider.toBuilder().getPoolSize();
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@
import com.google.api.gax.batching.Batcher;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.hbase.wrappers.BulkMutationWrapper;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics.MetricLevel;
import com.google.cloud.bigtable.metrics.Meter;
import com.google.common.base.Preconditions;
import java.io.IOException;

/** For internal use only - public for technical reasons. */
@InternalApi("For internal usage only")
public class BulkMutationVeneerApi implements BulkMutationWrapper {

private final Meter mutationAdded =
BigtableClientMetrics.meter(MetricLevel.Info, "bulk-mutator.mutations.added");
private final Batcher<RowMutationEntry, Void> bulkMutateBatcher;

BulkMutationVeneerApi(Batcher<RowMutationEntry, Void> bulkMutateBatcher) {
Expand All @@ -37,6 +42,7 @@ public class BulkMutationVeneerApi implements BulkMutationWrapper {
@Override
public synchronized ApiFuture<Void> add(RowMutationEntry rowMutation) {
Preconditions.checkNotNull(rowMutation, "mutation details cannot be null");
mutationAdded.mark();
return bulkMutateBatcher.add(rowMutation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
import com.google.cloud.bigtable.hbase.wrappers.BulkMutationWrapper;
import com.google.cloud.bigtable.hbase.wrappers.BulkReadWrapper;
import com.google.cloud.bigtable.hbase.wrappers.DataClientWrapper;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics;
import com.google.cloud.bigtable.metrics.Meter;
import com.google.cloud.bigtable.metrics.Timer;
import com.google.cloud.bigtable.metrics.Timer.Context;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -154,6 +158,12 @@ protected void onCompleteImpl() {
/** wraps {@link ServerStream} onto HBase {@link ResultScanner}. */
private static class RowResultScanner extends AbstractClientScanner {

private final Meter scannerResultMeter =
BigtableClientMetrics.meter(BigtableClientMetrics.MetricLevel.Info, "scanner.results");
private final Timer scannerResultTimer =
BigtableClientMetrics.timer(
BigtableClientMetrics.MetricLevel.Debug, "scanner.results.latency");

private final ServerStream<Result> serverStream;
private final Iterator<Result> iterator;

Expand All @@ -164,12 +174,15 @@ private static class RowResultScanner extends AbstractClientScanner {

@Override
public Result next() {
if (!iterator.hasNext()) {
// null signals EOF
return null;
try (Context ignored = scannerResultTimer.time()) {
if (!iterator.hasNext()) {
// null signals EOF
return null;
}

scannerResultMeter.mark();
return iterator.next();
}

return iterator.next();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.google.cloud.bigtable.data.v2.BigtableDataSettings.Builder;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.cloud.bigtable.hbase.wrappers.DataClientWrapper;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics.MetricLevel;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import java.io.IOException;
Expand All @@ -43,6 +45,8 @@ class SharedDataClientWrapperFactory {
private final Map<Key, ClientContext> cachedContexts = new HashMap<>();
private final Map<Key, Integer> refCounts = new HashMap<>();

private final Map<Key, Integer> channelPoolSizes = new HashMap<>();

synchronized DataClientWrapper createDataClient(BigtableHBaseVeneerSettings settings)
throws IOException {
Preconditions.checkArgument(
Expand All @@ -54,9 +58,15 @@ synchronized DataClientWrapper createDataClient(BigtableHBaseVeneerSettings sett
// Get or create ClientContext that will contained the shared resources
ClientContext sharedCtx = cachedContexts.get(key);
if (sharedCtx == null) {
sharedCtx = ClientContext.create(settings.getDataSettings().getStubSettings());
EnhancedBigtableStubSettings stubSettings = settings.getDataSettings().getStubSettings();
sharedCtx = ClientContext.create(stubSettings);
cachedContexts.put(key, sharedCtx);
refCounts.put(key, 0);
int channelPoolSize = BigtableVeneerApi.getChannelPoolSize(stubSettings);
for (int i = 0; i < channelPoolSize; i++) {
BigtableClientMetrics.counter(MetricLevel.Info, "grpc.channel.active").inc();
}
channelPoolSizes.put(key, channelPoolSize);
}
// Increment the count
refCounts.put(key, refCounts.get(key) + 1);
Expand Down Expand Up @@ -92,6 +102,10 @@ synchronized void release(Key key) {

refCounts.remove(key);
ClientContext clientContext = cachedContexts.remove(key);
for (int i = 0; i < channelPoolSizes.get(key); i++) {
BigtableClientMetrics.counter(MetricLevel.Info, "grpc.channel.active").dec();
}
channelPoolSizes.remove(key);
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
resource.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,16 @@

import com.google.api.core.InternalApi;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracerFactory.OperationType;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics;
import com.google.cloud.bigtable.metrics.BigtableClientMetrics.MetricLevel;
import com.google.cloud.bigtable.metrics.Counter;
import com.google.cloud.bigtable.metrics.Meter;
import com.google.cloud.bigtable.metrics.RpcMetrics;
import com.google.cloud.bigtable.metrics.Timer;
import com.google.cloud.bigtable.metrics.Timer.Context;
import io.grpc.Status;
import java.util.concurrent.atomic.AtomicBoolean;
import org.threeten.bp.Duration;

/*
Expand All @@ -29,16 +37,31 @@
@InternalApi
public class MetricsApiTracerAdapter implements ApiTracer {

private final Timer firstResponseLatencyTimer =
BigtableClientMetrics.timer(MetricLevel.Info, "grpc.method.ReadRows.firstResponse.latency");
private final Counter activeRpcCounter =
BigtableClientMetrics.counter(MetricLevel.Info, "grpc.rpc.active");
private final Meter rpcMeter =
BigtableClientMetrics.meter(MetricLevel.Info, "grpc.rpc.performed");

private final RpcMetrics rpcMetrics;
private final Context operationTimer;

private volatile Context rpcTimer;
private volatile RetryStatus lastRetryStatus;

public MetricsApiTracerAdapter(RpcMetrics rpcMetrics) {
private final AtomicBoolean firstResponseRecorded;
private volatile Context firstResponseTimer;

public MetricsApiTracerAdapter(
RpcMetrics rpcMetrics, String methodName, OperationType operationType) {
this.rpcMetrics = rpcMetrics;
operationTimer = rpcMetrics.timeOperation();
lastRetryStatus = RetryStatus.PERMANENT_FAILURE;
if (methodName.equals("ReadRows") && operationType == OperationType.ServerStreaming) {
this.firstResponseTimer = firstResponseLatencyTimer.time();
}
firstResponseRecorded = new AtomicBoolean(false);
}

@Override
Expand Down Expand Up @@ -76,34 +99,44 @@ public void connectionSelected(String id) {}
public void attemptStarted(int attemptNumber) {
lastRetryStatus = RetryStatus.PERMANENT_FAILURE;
rpcTimer = rpcMetrics.timeRpc();
activeRpcCounter.inc();
rpcMeter.mark();
}

@Override
public void attemptSucceeded() {
rpcTimer.close();
activeRpcCounter.dec();
}

@Override
public void attemptCancelled() {
rpcTimer.close();
activeRpcCounter.dec();
}

@Override
public void attemptFailed(Throwable error, Duration delay) {
rpcTimer.close();
lastRetryStatus = RetryStatus.ATTEMPT_RETRYABLE_FAILURE;
rpcMetrics.markRetry();
activeRpcCounter.dec();
BigtableClientMetrics.meter(
MetricLevel.Info, "grpc.errors." + Status.fromThrowable(error).getCode())
.mark();
}

@Override
public void attemptFailedRetriesExhausted(Throwable error) {
rpcTimer.close();
activeRpcCounter.dec();
lastRetryStatus = RetryStatus.RETRIES_EXHAUSTED;
}

@Override
public void attemptPermanentFailure(Throwable error) {
rpcTimer.close();
activeRpcCounter.dec();
lastRetryStatus = RetryStatus.PERMANENT_FAILURE;
}

Expand All @@ -118,7 +151,11 @@ public void lroStartSucceeded() {
}

@Override
public void responseReceived() {}
public void responseReceived() {
if (firstResponseTimer != null && firstResponseRecorded.compareAndSet(false, true)) {
firstResponseTimer.close();
}
}

@Override
public void requestSent() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class MetricsApiTracerAdapterFactory implements ApiTracerFactory {
@Override
public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) {
RpcMetrics rpcMetrics = getRpcMetrics(spanName);
return new MetricsApiTracerAdapter(rpcMetrics);
return new MetricsApiTracerAdapter(rpcMetrics, spanName.getMethodName(), operationType);
}

@VisibleForTesting
Expand Down
10 changes: 10 additions & 0 deletions bigtable-hbase-1.x-parent/bigtable-hbase-1.x/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ limitations under the License.
<version>${hbase1-metrics.version}</version>
</dependency>

<!-- deps for metrics instrumentation, these should be removed once we expose more hooks in veneer metrics -->
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
</dependency>

<!-- These are direct dependencies of TestBigtableConnection.java which is in test scope,
but they are transitive dependencies of bigtable-client-core, which needs them during runtime.
For more information please see https://github.com/googleapis/java-bigtable-hbase/pull/2447 -->
Expand Down
Loading

0 comments on commit 68c2773

Please sign in to comment.