Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions google-cloud-bigtable/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.APPLICATION_BLOCKING_LATENCIES_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ATTEMPT_LATENCIES2_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ATTEMPT_LATENCIES_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLIENT_BLOCKING_LATENCIES_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CONNECTIVITY_ERROR_COUNT_NAME;
Expand Down Expand Up @@ -284,6 +285,7 @@ static class PublicTimeSeriesConverter implements TimeSeriesConverter {
ImmutableSet.of(
OPERATION_LATENCIES_NAME,
ATTEMPT_LATENCIES_NAME,
ATTEMPT_LATENCIES2_NAME,
SERVER_LATENCIES_NAME,
FIRST_RESPONSE_LATENCIES_NAME,
CLIENT_BLOCKING_LATENCIES_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,27 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracer.TransportAttrs;
import io.grpc.ClientStreamTracer;
import io.grpc.Metadata;
import io.grpc.Status;

/**
* Records the time a request is enqueued in a grpc channel queue. This a bridge between gRPC stream
* tracing and Bigtable tracing. Its primary purpose is to measure the transition time between
* asking gRPC to start an RPC and gRPC actually serializing that RPC.
*/
class BigtableGrpcStreamTracer extends ClientStreamTracer {
private static final String GRPC_LB_LOCALITY_KEY = "grpc.lb.locality";
private static final String GRPC_LB_BACKEND_SERVICE_KEY = "grpc.lb.backend_service";

private final StreamInfo info;
private final BigtableTracer tracer;
private volatile String backendService = null;
private volatile String locality = null;

public BigtableGrpcStreamTracer(BigtableTracer tracer) {
public BigtableGrpcStreamTracer(StreamInfo info, BigtableTracer tracer) {
this.info = info;
this.tracer = tracer;
}

Expand All @@ -36,6 +44,26 @@ public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalU
tracer.grpcMessageSent();
}

@Override
public void addOptionalLabel(String key, String value) {
switch (key) {
case GRPC_LB_LOCALITY_KEY:
this.locality = value;
break;
case GRPC_LB_BACKEND_SERVICE_KEY:
this.backendService = value;
break;
}

super.addOptionalLabel(key, value);
}

@Override
public void streamClosed(Status status) {
tracer.setTransportAttrs(TransportAttrs.create(locality, backendService));
super.streamClosed(status);
}

static class Factory extends ClientStreamTracer.Factory {

private final BigtableTracer tracer;
Expand All @@ -47,7 +75,7 @@ static class Factory extends ClientStreamTracer.Factory {
@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
return new BigtableGrpcStreamTracer(tracer);
return new BigtableGrpcStreamTracer(info, tracer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public void setLocations(String zone, String cluster) {
// noop
}

/** Set the underlying transport used to process the attempt */
public void setTransportAttrs(BuiltinMetricsTracer.TransportAttrs attrs) {}

@Deprecated
/** @deprecated {@link #grpcMessageSent()} is called instead. */
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,18 @@ public class BuiltinMetricsConstants {
static final AttributeKey<String> STATUS_KEY = AttributeKey.stringKey("status");
static final AttributeKey<String> CLIENT_UID_KEY = AttributeKey.stringKey("client_uid");

static final AttributeKey<String> TRANSPORT_TYPE = AttributeKey.stringKey("transport_type");
static final AttributeKey<String> TRANSPORT_REGION = AttributeKey.stringKey("transport_region");
static final AttributeKey<String> TRANSPORT_ZONE = AttributeKey.stringKey("transport_zone");
static final AttributeKey<String> TRANSPORT_SUBZONE = AttributeKey.stringKey("transport_subzone");

public static final String METER_NAME = "bigtable.googleapis.com/internal/client/";

// Metric names
public static final String OPERATION_LATENCIES_NAME = "operation_latencies";
public static final String ATTEMPT_LATENCIES_NAME = "attempt_latencies";
// Temporary workaround for not being able to add new labels to ATTEMPT_LATENCIES_NAME
public static final String ATTEMPT_LATENCIES2_NAME = "attempt_latencies2";
static final String RETRY_COUNT_NAME = "retry_count";
static final String CONNECTIVITY_ERROR_COUNT_NAME = "connectivity_error_count";
static final String SERVER_LATENCIES_NAME = "server_latencies";
Expand Down Expand Up @@ -211,6 +218,22 @@ public static Map<InstrumentSelector, View> getAllViews() {
.addAll(COMMON_ATTRIBUTES)
.add(STREAMING_KEY, STATUS_KEY)
.build());
defineView(
views,
ATTEMPT_LATENCIES2_NAME,
AGGREGATION_WITH_MILLIS_HISTOGRAM,
InstrumentType.HISTOGRAM,
"ms",
ImmutableSet.<AttributeKey>builder()
.addAll(COMMON_ATTRIBUTES)
.add(
STREAMING_KEY,
STATUS_KEY,
TRANSPORT_TYPE,
TRANSPORT_REGION,
TRANSPORT_ZONE,
TRANSPORT_SUBZONE)
.build());
defineView(
views,
SERVER_LATENCIES_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,34 @@
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.STATUS_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.STREAMING_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.TABLE_ID_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.TRANSPORT_REGION;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.TRANSPORT_SUBZONE;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.TRANSPORT_TYPE;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.TRANSPORT_ZONE;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ZONE_ID_KEY;

import com.google.api.core.ObsoleteApi;
import com.google.api.gax.retrying.ServerStreamingAttemptException;
import com.google.api.gax.tracing.SpanName;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigtable.Version;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.math.IntMath;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.grpc.Deadline;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

Expand All @@ -49,8 +59,23 @@
* bigtable.googleapis.com/client namespace
*/
class BuiltinMetricsTracer extends BigtableTracer {
@AutoValue
abstract static class TransportAttrs {
@Nullable
abstract String getLocality();

@Nullable
abstract String getBackendService();

static TransportAttrs create(@Nullable String locality, @Nullable String backendService) {
return new AutoValue_BuiltinMetricsTracer_TransportAttrs(locality, backendService);
}
}

private static final Logger logger = Logger.getLogger(BuiltinMetricsTracer.class.getName());
private static final Gson GSON = new Gson();
private static final TypeToken<Map<String, String>> LOCALITY_TYPE =
new TypeToken<Map<String, String>>() {};

private static final String NAME = "java-bigtable/" + Version.VERSION;
private final OperationType operationType;
Expand Down Expand Up @@ -95,12 +120,15 @@ class BuiltinMetricsTracer extends BigtableTracer {
private Deadline operationDeadline = null;
private volatile long remainingDeadlineAtAttemptStart = 0;

private TransportAttrs transportAttrs = null;

// OpenCensus (and server) histogram buckets use [start, end), however OpenTelemetry uses (start,
// end]. To work around this, we measure all the latencies in nanoseconds and convert them
// to milliseconds and use DoubleHistogram. This should minimize the chance of a data
// point fall on the bucket boundary that causes off by one errors.
private final DoubleHistogram operationLatenciesHistogram;
private final DoubleHistogram attemptLatenciesHistogram;
private final DoubleHistogram attemptLatencies2Histogram;
private final DoubleHistogram serverLatenciesHistogram;
private final DoubleHistogram firstResponseLatenciesHistogram;
private final DoubleHistogram clientBlockingLatenciesHistogram;
Expand All @@ -115,6 +143,7 @@ class BuiltinMetricsTracer extends BigtableTracer {
Attributes attributes,
DoubleHistogram operationLatenciesHistogram,
DoubleHistogram attemptLatenciesHistogram,
DoubleHistogram attemptLatencies2Histogram,
DoubleHistogram serverLatenciesHistogram,
DoubleHistogram firstResponseLatenciesHistogram,
DoubleHistogram clientBlockingLatenciesHistogram,
Expand All @@ -128,6 +157,7 @@ class BuiltinMetricsTracer extends BigtableTracer {

this.operationLatenciesHistogram = operationLatenciesHistogram;
this.attemptLatenciesHistogram = attemptLatenciesHistogram;
this.attemptLatencies2Histogram = attemptLatencies2Histogram;
this.serverLatenciesHistogram = serverLatenciesHistogram;
this.firstResponseLatenciesHistogram = firstResponseLatenciesHistogram;
this.clientBlockingLatenciesHistogram = clientBlockingLatenciesHistogram;
Expand Down Expand Up @@ -301,6 +331,11 @@ public void setLocations(String zone, String cluster) {
this.cluster = cluster;
}

@Override
public void setTransportAttrs(TransportAttrs attrs) {
this.transportAttrs = attrs;
}

@Override
public void batchRequestThrottled(long throttledTimeNanos) {
totalClientBlockingTime.addAndGet(java.time.Duration.ofNanos(throttledTimeNanos).toMillis());
Expand Down Expand Up @@ -417,6 +452,35 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
attemptLatenciesHistogram.record(
convertToMs(attemptTimer.elapsed(TimeUnit.NANOSECONDS)), attributes);

String transportType = "cloudpath";
String transportRegion = "";
String transportZone = "";
String transportSubzone = "";

try {
if (transportAttrs != null && !Strings.isNullOrEmpty(transportAttrs.getLocality())) {
// only directpath has locality
transportType = "directpath";
Map<String, String> localityMap =
GSON.fromJson(transportAttrs.getLocality(), LOCALITY_TYPE);
transportRegion = localityMap.getOrDefault("region", "");
transportZone = localityMap.getOrDefault("zone", "");
transportSubzone = localityMap.getOrDefault("sub_zone", "");
}
} catch (RuntimeException e) {
logger.log(
Level.WARNING, "Failed to parse transport locality: " + transportAttrs.getLocality(), e);
}
attemptLatencies2Histogram.record(
convertToMs(attemptTimer.elapsed(TimeUnit.NANOSECONDS)),
attributes
.toBuilder()
.put(TRANSPORT_TYPE, transportType)
.put(TRANSPORT_REGION, transportRegion)
.put(TRANSPORT_ZONE, transportZone)
.put(TRANSPORT_SUBZONE, transportSubzone)
.build());

// When operationDeadline is set, it's possible that the deadline is passed by the time we send
// a new attempt. In this case we'll record 0.
if (operationDeadline != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.APPLICATION_BLOCKING_LATENCIES_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ATTEMPT_LATENCIES2_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ATTEMPT_LATENCIES_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CLIENT_BLOCKING_LATENCIES_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.CONNECTIVITY_ERROR_COUNT_NAME;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class BuiltinMetricsTracerFactory extends BaseApiTracerFactory {

private final DoubleHistogram operationLatenciesHistogram;
private final DoubleHistogram attemptLatenciesHistogram;
private final DoubleHistogram attemptLatencies2Histogram;
private final DoubleHistogram serverLatenciesHistogram;
private final DoubleHistogram firstResponseLatenciesHistogram;
private final DoubleHistogram clientBlockingLatenciesHistogram;
Expand Down Expand Up @@ -82,6 +84,12 @@ public static BuiltinMetricsTracerFactory create(
.setDescription("Client observed latency per RPC attempt.")
.setUnit(MILLISECOND)
.build();
attemptLatencies2Histogram =
meter
.histogramBuilder(ATTEMPT_LATENCIES2_NAME)
.setDescription("Client observed latency per RPC attempt with transport labels.")
.setUnit(MILLISECOND)
.build();
serverLatenciesHistogram =
meter
.histogramBuilder(SERVER_LATENCIES_NAME)
Expand Down Expand Up @@ -140,6 +148,7 @@ public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType op
attributes,
operationLatenciesHistogram,
attemptLatenciesHistogram,
attemptLatencies2Histogram,
serverLatenciesHistogram,
firstResponseLatenciesHistogram,
clientBlockingLatenciesHistogram,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ public void setLocations(String zone, String cluster) {
}
}

@Override
public void setTransportAttrs(BuiltinMetricsTracer.TransportAttrs attrs) {
for (BigtableTracer tracer : bigtableTracers) {
tracer.setTransportAttrs(attrs);
}
}

@Override
public void onRequest(int requestCount) {
for (BigtableTracer tracer : bigtableTracers) {
Expand Down
Loading