Skip to content

Commit

Permalink
Make Cosmos logical spans CLIENT and current (Azure#25571)
Browse files Browse the repository at this point in the history
* Make Cosmos logical spans CLIENT and current

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
  • Loading branch information
lmolkova and trask authored Nov 30, 2021
1 parent c32af8d commit 8474a1d
Show file tree
Hide file tree
Showing 5 changed files with 454 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1896,7 +1896,12 @@
<Method name="subscribe"/>
<Bug pattern="DE_MIGHT_IGNORE"/>
</Match>

<Match>
<Class name="com.azure.cosmos.implementation.TracerProvider"/>
<Method name="subscribe"/>
<Bug pattern="DE_MIGHT_IGNORE"/>
</Match>

<!-- Exception is ignored by design which indicate that non-parsable id -->
<Match>
<Class name="com.azure.cosmos.implementation.ResourceId"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
package com.azure.cosmos.implementation;

import com.azure.core.util.Context;
import com.azure.core.util.tracing.SpanKind;
import com.azure.core.util.tracing.StartSpanOptions;
import com.azure.core.util.tracing.Tracer;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
Expand All @@ -21,8 +23,13 @@
import org.HdrHistogram.ConcurrentDoubleHistogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Signal;
import reactor.util.context.ContextView;

import java.time.Duration;
import java.time.OffsetDateTime;
Expand All @@ -32,7 +39,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
Expand All @@ -53,14 +59,47 @@ public class TracerProvider {
public static final String RESOURCE_PROVIDER_NAME = "Microsoft.DocumentDB";
public final Duration CRUD_THRESHOLD_FOR_DIAGNOSTICS = Duration.ofMillis(100);
public final Duration QUERY_THRESHOLD_FOR_DIAGNOSTICS = Duration.ofMillis(500);

private static final String REACTOR_TRACING_CONTEXT_KEY = "tracing-context";
private static final Object DUMMY_VALUE = new Object();
private final Mono<Object> propagatingMono;
private final Flux<Object> propagatingFlux;
public TracerProvider(Tracer tracer) {
this.tracer = tracer;
this.propagatingMono = new PropagatingMono();
this.propagatingFlux = new PropagatingFlux();
}

public boolean isEnabled() {
return tracer != null;
}

/**
* Gets {@link Context} from Reactor {@link ContextView}.
*
* @param reactorContext Reactor context instance.
* @return {@link Context} from reactor context or null if not present.
*/
public static Context getContextFromReactorOrNull(ContextView reactorContext) {
Object context = reactorContext.getOrDefault(REACTOR_TRACING_CONTEXT_KEY, null);

if (context != null && context instanceof Context) {
return (Context) context;
}

return null;
}

/**
* Stores {@link Context} in Reactor {@link reactor.util.context.Context}.
*
* @param traceContext {@link Context} context with trace context to store.
* @return {@link reactor.util.context.Context} Reactor context with trace context.
*/
public static reactor.util.context.Context setContextInReactor(Context traceContext) {
return reactor.util.context.Context.of(REACTOR_TRACING_CONTEXT_KEY, traceContext);
}

/**
* For each tracer plugged into the SDK a new tracing span is created.
* <p>
Expand All @@ -73,16 +112,18 @@ public boolean isEnabled() {
*/
public Context startSpan(String methodName, String databaseId, String endpoint, Context context) {
Context local = Objects.requireNonNull(context, "'context' cannot be null.");
local = local.addData(AZ_TRACING_NAMESPACE_KEY, RESOURCE_PROVIDER_NAME);
local = tracer.start(methodName, local); // start the span and return the started span

StartSpanOptions spanOptions = new StartSpanOptions(SpanKind.CLIENT)
.setAttribute(AZ_TRACING_NAMESPACE_KEY, RESOURCE_PROVIDER_NAME)
.setAttribute(DB_TYPE, DB_TYPE_VALUE)
.setAttribute(TracerProvider.DB_URL, endpoint)
.setAttribute(TracerProvider.DB_STATEMENT, methodName);
if (databaseId != null) {
tracer.setAttribute(TracerProvider.DB_INSTANCE, databaseId, local);
spanOptions.setAttribute(TracerProvider.DB_INSTANCE, databaseId);
}

tracer.setAttribute(TracerProvider.DB_TYPE, DB_TYPE_VALUE, local);
tracer.setAttribute(TracerProvider.DB_URL, endpoint, local);
tracer.setAttribute(TracerProvider.DB_STATEMENT, methodName, local);
return local;
// start the span and return the started span
return tracer.start(methodName, spanOptions, local);
}

/**
Expand All @@ -106,17 +147,19 @@ public void addEvent(String name, Map<String, Object> attributes, OffsetDateTime
* Given a context containing the current tracing span the span is marked completed with status info from
* {@link Signal}. For each tracer plugged into the SDK the current tracing span is marked as completed.
*
* @param context Additional metadata that is passed through the call stack.
* @param signal The signal indicates the status and contains the metadata we need to end the tracing span.
*/
public <T extends CosmosResponse<? extends Resource>> void endSpan(Context context,
Signal<T> signal,
int statusCode) {
Objects.requireNonNull(context, "'context' cannot be null.");
public <T> void endSpan(Signal<T> signal, int statusCode) {
Objects.requireNonNull(signal, "'signal' cannot be null.");

Context context = getContextFromReactorOrNull(signal.getContextView());
if (context == null) {
return;
}

switch (signal.getType()) {
case ON_COMPLETE:
case ON_NEXT:
end(statusCode, null, context);
break;
case ON_ERROR:
Expand All @@ -133,7 +176,7 @@ public <T extends CosmosResponse<? extends Resource>> void endSpan(Context conte
end(statusCode, throwable, context);
break;
default:
// ON_SUBSCRIBE and ON_NEXT don't have the information to end the span so just return.
// ON_SUBSCRIBE isn't the right state to end span
break;
}
}
Expand Down Expand Up @@ -190,6 +233,20 @@ public <T> Mono<CosmosItemResponse<T>> traceEnabledCosmosItemResponsePublisher(M
thresholdForDiagnosticsOnTracer);
}

/**
* Runs given {@code Flux<T>} publisher in the scope of trace context passed in using
* {@link TracerProvider#setContextInReactor(Context, reactor.util.context.Context)} in {@code contextWrite}
* Populates active trace context on Reactor's hot path. Reactor's instrumentation for OpenTelemetry
* (or other hypothetical solution) will take care of the cold path.
*
* @param publisher publisher to run.
* @return wrapped publisher.
*/
public <T> Flux<T> runUnderSpanInContext(Flux<T> publisher) {
return propagatingFlux
.flatMap(ignored -> publisher);
}

private <T> Mono<T> traceEnabledPublisher(Mono<T> resultPublisher,
Context context,
String spanName,
Expand All @@ -198,41 +255,55 @@ private <T> Mono<T> traceEnabledPublisher(Mono<T> resultPublisher,
Function<T, Integer> statusCodeFunc,
Function<T, CosmosDiagnostics> diagnosticFunc,
Duration thresholdForDiagnosticsOnTracer) {
final AtomicReference<Context> parentContext = new AtomicReference<>(Context.NONE);

if (!isEnabled()) {
return resultPublisher;
}

Optional<Object> callDepth = context.getData(COSMOS_CALL_DEPTH);
final boolean isNestedCall = callDepth.isPresent();
return resultPublisher
.doOnSubscribe(ignoredValue -> {
if (isEnabled() && !isNestedCall) {
parentContext.set(this.startSpan(spanName, databaseId, endpoint,
context));
}
}).doOnSuccess(response -> {
if (isEnabled() && !isNestedCall) {
CosmosDiagnostics cosmosDiagnostics = diagnosticFunc.apply(response);
try {
Duration threshold = thresholdForDiagnosticsOnTracer;
if(threshold == null) {
threshold = CRUD_THRESHOLD_FOR_DIAGNOSTICS;
}
if (isNestedCall) {
return resultPublisher;
}

if (cosmosDiagnostics != null
&& cosmosDiagnostics.getDuration() != null
&& cosmosDiagnostics.getDuration().compareTo(threshold) > 0) {
addDiagnosticsOnTracerEvent(cosmosDiagnostics, parentContext.get());
// propagatingMono ensures active span is propagated to the `resultPublisher`
// subscription and hot path. OpenTelemetry reactor's instrumentation will
// propagate it on the cold path.
return propagatingMono
.flatMap(ignored -> resultPublisher)
.doOnEach(signal -> {
switch (signal.getType()) {
case ON_NEXT:
T response = signal.get();
Context traceContext = getContextFromReactorOrNull(signal.getContextView());
CosmosDiagnostics cosmosDiagnostics = diagnosticFunc.apply(response);
try {
Duration threshold = thresholdForDiagnosticsOnTracer;
if (threshold == null) {
threshold = CRUD_THRESHOLD_FOR_DIAGNOSTICS;
}

if (cosmosDiagnostics != null
&& cosmosDiagnostics.getDuration() != null
&& cosmosDiagnostics.getDuration().compareTo(threshold) > 0) {
addDiagnosticsOnTracerEvent(cosmosDiagnostics, traceContext);
}
} catch (JsonProcessingException ex) {
LOGGER.warn("Error while serializing diagnostics for tracer", ex.getMessage());
}
} catch (JsonProcessingException ex) {
LOGGER.warn("Error while serializing diagnostics for tracer", ex.getMessage());
}
this.endSpan(parentContext.get(), Signal.complete(), statusCodeFunc.apply(response));
}
}).doOnError(throwable -> {
if (isEnabled() && !isNestedCall) {
// not adding diagnostics on trace event for exception as this information is already there as
// part of exception message
this.endSpan(parentContext.get(), Signal.error(throwable), ERROR_CODE);
}
});

this.endSpan(signal, statusCodeFunc.apply(response));
break;
case ON_ERROR:
// not adding diagnostics on trace event for exception as this information is already there as
// part of exception message
this.endSpan(signal, ERROR_CODE);
break;
default:
break;
}})
.contextWrite(setContextInReactor(this.startSpan(spanName, databaseId, endpoint,
context)));
}

private <T> Mono<T> publisherWithClientTelemetry(Mono<T> resultPublisher,
Expand Down Expand Up @@ -364,7 +435,7 @@ private ReportPayload createReportPayload(CosmosAsyncClient cosmosAsyncClient,
}

private void addDiagnosticsOnTracerEvent(CosmosDiagnostics cosmosDiagnostics, Context context) throws JsonProcessingException {
if (cosmosDiagnostics == null) {
if (cosmosDiagnostics == null || context == null) {
return;
}

Expand Down Expand Up @@ -499,4 +570,52 @@ private void addDiagnosticsOnTracerEvent(CosmosDiagnostics cosmosDiagnostics, Co
this.addEvent("ClientCfgs", attributes,
OffsetDateTime.ofInstant(clientSideRequestStatistics.getRequestStartTimeUTC(), ZoneOffset.UTC), context);
}

private static void subscribe(Tracer tracer, CoreSubscriber<? super Object> actual) {
Context context = getContextFromReactorOrNull(actual.currentContext());
if (context != null) {
AutoCloseable scope = tracer.makeSpanCurrent(context);
try {
actual.onSubscribe(Operators.scalarSubscription(actual, DUMMY_VALUE));
} finally {
try {
scope.close();
} catch (Exception e) {
// can't happen
}
}
} else {
actual.onSubscribe(Operators.scalarSubscription(actual, DUMMY_VALUE));
}
}

/**
* Helper class allowing running Mono subscription (and anything on the hot path)
* in scope of trace context. This enables OpenTelemetry auto-collection
* to pick it up and correlate lower levels of instrumentation and logs
* to logical Cosmos spans.
*
* OpenTelemetry reactor auto-instrumentation will take care of the cold path.
*/
private final class PropagatingMono extends Mono<Object> {
@Override
public void subscribe(CoreSubscriber<? super Object> actual) {
TracerProvider.subscribe(tracer, actual);
}
}

/**
* Helper class allowing running Flux subscription (and anything on the hot path)
* in scope of trace context. This enables OpenTelemetry auto-collection
* to pick it up and correlate lower levels of instrumentation and logs
* to logical Cosmos spans.
*
* OpenTelemetry reactor auto-instrumentation will take care of the cold path.
*/
private final class PropagatingFlux extends Flux<Object> {
@Override
public void subscribe(CoreSubscriber<? super Object> actual) {
TracerProvider.subscribe(tracer, actual);
}
}
}
Loading

0 comments on commit 8474a1d

Please sign in to comment.