Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new target attribute to attempts and operation attributes #2260

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ public static ClientContext createClientContext(EnhancedBigtableStubSettings set
}

managedChannelBuilder.intercept(errorCountPerConnectionMetricTracker.getInterceptor());

if (oldChannelConfigurator != null) {
managedChannelBuilder = oldChannelConfigurator.apply(managedChannelBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private static final int MAX_MESSAGE_SIZE = 256 * 1024 * 1024;
private static final String SERVER_DEFAULT_APP_PROFILE_ID = "";

private static final String CBT_ENABLE_DIRECTPATH = "CBT_ENABLE_DIRECTPATH";
private static final Set<Code> IDEMPOTENT_RETRY_CODES =
ImmutableSet.of(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE);

Expand Down Expand Up @@ -345,7 +346,17 @@ public boolean getEnableRetryInfo() {

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
String enableDirectpathEnv = System.getenv(CBT_ENABLE_DIRECTPATH);
Boolean isDirectpathEnabled = Boolean.parseBoolean(enableDirectpathEnv);

InstantiatingGrpcChannelProvider.Builder grpcTransportProviderBuilder =
BigtableStubSettings.defaultGrpcTransportProviderBuilder();
if (isDirectpathEnabled) {
// Attempts direct access to CBT service over gRPC to improve throughput,
// whether the attempt is allowed is totally controlled by service owner.
grpcTransportProviderBuilder.setAttemptDirectPathXds().setAttemptDirectPath(true);
}
return grpcTransportProviderBuilder
meeral-k marked this conversation as resolved.
Show resolved Hide resolved
.setChannelPoolSettings(
ChannelPoolSettings.builder()
.setInitialChannelCount(10)
Expand All @@ -356,10 +367,7 @@ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProvi
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setKeepAliveTime(Duration.ofSeconds(30)) // sends ping in this interval
.setKeepAliveTimeout(
Duration.ofSeconds(10)) // wait this long before considering the connection dead
// Attempts direct access to CBT service over gRPC to improve throughput,
// whether the attempt is allowed is totally controlled by service owner.
.setAttemptDirectPath(true);
Duration.ofSeconds(10)); // wait this long before considering the connection dead
}

@SuppressWarnings("WeakerAccess")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,8 @@ public void setLocations(String zone, String cluster) {
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
// noop
}

public void addTarget(String target) {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.View;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -48,6 +49,7 @@ public class BuiltinMetricsConstants {
public static final AttributeKey<String> CLIENT_NAME_KEY = AttributeKey.stringKey("client_name");
static final AttributeKey<String> METHOD_KEY = AttributeKey.stringKey("method");
static final AttributeKey<String> STATUS_KEY = AttributeKey.stringKey("status");
static AttributeKey<List<String>> TARGET_KEY = AttributeKey.stringArrayKey("target");
static final AttributeKey<String> CLIENT_UID_KEY = AttributeKey.stringKey("client_uid");

// Metric names
Expand Down Expand Up @@ -107,7 +109,8 @@ public class BuiltinMetricsConstants {
CLUSTER_ID_KEY,
ZONE_ID_KEY,
METHOD_KEY,
CLIENT_NAME_KEY);
CLIENT_NAME_KEY,
TARGET_KEY);

static void defineView(
ImmutableMap.Builder<InstrumentSelector, View> viewMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,21 @@
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.STATUS_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.STREAMING_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.TABLE_ID_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.TARGET_KEY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ZONE_ID_KEY;

import com.google.api.gax.retrying.ServerStreamingAttemptException;
import com.google.api.gax.tracing.SpanName;
import com.google.cloud.bigtable.Version;
import com.google.common.base.Stopwatch;
import com.google.common.math.IntMath;
import io.grpc.CallOptions;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.internal.StringUtils;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -46,6 +51,8 @@
*/
class BuiltinMetricsTracer extends BigtableTracer {

static final CallOptions.Key<BuiltinMetricsTracer> BUILTIN_METRICSTRACER_KEY =
meeral-k marked this conversation as resolved.
Show resolved Hide resolved
CallOptions.Key.create("builtin-metrics-tracer");
private static final String NAME = "java-bigtable/" + Version.VERSION;
private final OperationType operationType;
private final SpanName spanName;
Expand Down Expand Up @@ -85,6 +92,8 @@ class BuiltinMetricsTracer extends BigtableTracer {

private Long serverLatencies = null;

private HashSet<String> targets = new HashSet<>();

// OpenCensus (and server) histogram buckets use [start, end), however OpenTelemetry uses (start,
// end]. To work around this, we measure all the latencies in nanoseconds and convert them
// to milliseconds and use DoubleHistogram. This should minimize the chance of a data
Expand Down Expand Up @@ -175,6 +184,12 @@ public void attemptSucceeded() {
recordAttemptCompletion(null);
}

public void addTarget(String target) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setAttemptTarget - there should only be a single target per attempt ... add implies there are multiple targets

if (!StringUtils.isNullOrEmpty(target)) {
this.targets.add(target);
}
}

@Override
public void attemptCancelled() {
recordAttemptCompletion(new CancellationException());
Expand Down Expand Up @@ -293,6 +308,7 @@ private void recordOperationCompletion(@Nullable Throwable status) {
.put(CLIENT_NAME_KEY, NAME)
.put(STREAMING_KEY, isStreaming)
.put(STATUS_KEY, statusStr)
.put(TARGET_KEY, new ArrayList<>(this.targets))
.build();

long operationLatencyNano = operationTimer.elapsed(TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -338,7 +354,6 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
}

String statusStr = Util.extractStatus(status);

Attributes attributes =
baseAttributes
.toBuilder()
Expand All @@ -349,6 +364,7 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
.put(CLIENT_NAME_KEY, NAME)
.put(STREAMING_KEY, isStreaming)
.put(STATUS_KEY, statusStr)
.put(TARGET_KEY, new ArrayList<>(this.targets))
meeral-k marked this conversation as resolved.
Show resolved Hide resolved
.build();

clientBlockingLatenciesHistogram.record(convertToMs(totalClientBlockingTime.get()), attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.api.gax.tracing.ApiTracer;
import com.google.common.collect.ImmutableList;
import io.opentelemetry.api.internal.StringUtils;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you forget to push? its still present

import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -225,4 +226,12 @@ public void grpcChannelQueuedLatencies(long queuedTimeMs) {
tracer.grpcChannelQueuedLatencies(queuedTimeMs);
}
}

public void addTarget(String target) {
if (StringUtils.isNullOrEmpty(target)) {
for (BigtableTracer tracer : bigtableTracers) {
tracer.addTarget(target);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.opencensus.tags.TagValue;
import io.opentelemetry.api.internal.StringUtils;
meeral-k marked this conversation as resolved.
Show resolved Hide resolved
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
Expand All @@ -64,7 +65,7 @@ public class Util {
private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?<dur>\\d+)");
static final Metadata.Key<byte[]> LOCATION_METADATA_KEY =
Metadata.Key.of("x-goog-ext-425905942-bin", Metadata.BINARY_BYTE_MARSHALLER);

/** Convert an exception into a value that can be used to create an OpenCensus tag value. */
static String extractStatus(@Nullable Throwable error) {
final String statusString;
Expand Down Expand Up @@ -209,8 +210,18 @@ static void recordMetricsFromMetadata(
if (responseParams != null && latency == null) {
latency = 0L;
}

// Record gfe metrics
tracer.recordGfeMetadata(latency, throwable);
if (responseMetadata.getMetadata() != null) {
Metadata.Key<String> remoteAddressKey =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who sets this metadata?

Metadata.Key.of(
"io.grpc.grpc.transport_attr_remote_addr", Metadata.ASCII_STRING_MARSHALLER);
String remoteAddr = responseMetadata.getMetadata().get(remoteAddressKey);
if (!StringUtils.isNullOrEmpty(remoteAddr)) {
tracer.addTarget(remoteAddr);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ public static void verifyAttributes(MetricData metricData, Attributes attributes
case HISTOGRAM:
List<HistogramPointData> hd =
metricData.getHistogramData().getPoints().stream()
.peek(item -> System.out.println(item))
meeral-k marked this conversation as resolved.
Show resolved Hide resolved
.filter(pd -> pd.getAttributes().equals(attributes))
.collect(Collectors.toList());

assertThat(hd).isNotEmpty();
break;
case LONG_SUM:
Expand Down
Loading
Loading