Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate Filibuster. #1

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion instrumentation-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ dependencies {
testImplementation("org.awaitility:awaitility")
testImplementation("io.opentelemetry:opentelemetry-sdk-metrics")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.opentelemetry.instrumentation.api.filibuster;

import io.opentelemetry.context.ContextKey;

public class OpenTelemetryContextStorageConstants {
final public static ContextKey<String> VCLOCK_KEY = ContextKey.named("filibuster-vclock");
final public static ContextKey<String> ORIGIN_VCLOCK_KEY = ContextKey.named("filibuster-origin-vclock");
final public static ContextKey<String> REQUEST_ID_KEY = ContextKey.named("filibuster-request-id");
final public static ContextKey<String> EXECUTION_INDEX_KEY = ContextKey.named("filibuster-execution-index");
}
17 changes: 17 additions & 0 deletions instrumentation/armeria-1.3/library/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,22 @@ plugins {
dependencies {
library("com.linecorp.armeria:armeria:1.3.0")

library("cloud.filibuster:instrumentation:0.19-SNAPSHOT")

library("com.github.cliftonlabs:json-simple:2.1.2")
library("org.json:json:20210307")

testImplementation(project(":instrumentation:armeria-1.3:testing"))
}

repositories {
mavenCentral()

maven {
url = uri("https://maven.pkg.github.com/filibuster-testing/filibuster-java")
credentials {
username = project.findProperty("gpr.user") as String? ?: System.getenv("GITHUB_USERNAME")
password = project.findProperty("gpr.key") as String? ?: System.getenv("GITHUB_TOKEN")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.function.Function;

/** Entrypoint for tracing Armeria services or clients. */
@SuppressWarnings("FieldCanBeLocal")
public final class ArmeriaTracing {

/** Returns a new {@link ArmeriaTracing} configured with the given {@link OpenTelemetry}. */
Expand All @@ -41,14 +42,14 @@ public static ArmeriaTracingBuilder newBuilder(OpenTelemetry openTelemetry) {
* com.linecorp.armeria.client.ClientBuilder#decorator(Function)}.
*/
public Function<? super HttpClient, ? extends HttpClient> newClientDecorator() {
return client -> new OpenTelemetryClient(client, clientInstrumenter);
return client -> new OpenTelemetryFilibusterDecoratingHttpClient(client, clientInstrumenter);
}

/**
* Returns a new {@link HttpService} decorator for use with methods like {@link
* HttpService#decorate(Function)}.
*/
public Function<? super HttpService, ? extends HttpService> newServiceDecorator() {
return service -> new OpenTelemetryService(service, serverInstrumenter);
return service -> new OpenTelemetryFilibusterDecoratingHttpService(service, serverInstrumenter);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package io.opentelemetry.instrumentation.armeria.v1_3;

import cloud.filibuster.instrumentation.datatypes.VectorClock;
import cloud.filibuster.instrumentation.storage.ContextStorage;
import io.opentelemetry.context.Context;
import javax.annotation.Nullable;
import java.util.logging.Level;
import java.util.logging.Logger;

import static io.opentelemetry.instrumentation.api.filibuster.OpenTelemetryContextStorageConstants.EXECUTION_INDEX_KEY;
import static io.opentelemetry.instrumentation.api.filibuster.OpenTelemetryContextStorageConstants.ORIGIN_VCLOCK_KEY;
import static io.opentelemetry.instrumentation.api.filibuster.OpenTelemetryContextStorageConstants.REQUEST_ID_KEY;
import static io.opentelemetry.instrumentation.api.filibuster.OpenTelemetryContextStorageConstants.VCLOCK_KEY;

public class OpenTelemetryContextStorage implements ContextStorage {
private static final Logger logger = Logger.getLogger(OpenTelemetryContextStorage.class.getName());

private Context context;

// Context.current() should maybe be cached, who knows?

public OpenTelemetryContextStorage() {
this.context = Context.current();
}

public Context getContext() {
return this.context;
}

@Override
@Nullable
public String getRequestId() {
return Context.current().get(REQUEST_ID_KEY);
}

@Override
@Nullable
public VectorClock getVectorClock() {
String vectorClockStr = Context.current().get(VCLOCK_KEY);

VectorClock newVclock = new VectorClock();

if (vectorClockStr != null) {
newVclock.fromString(vectorClockStr);
}

return newVclock;
}

@Override
@Nullable
public VectorClock getOriginVectorClock() {
String originVectorClockStr = Context.current().get(ORIGIN_VCLOCK_KEY);

VectorClock newVclock = new VectorClock();

if (originVectorClockStr != null) {
newVclock.fromString(originVectorClockStr);
}

return newVclock;
}

@Override
@Nullable
public String getExecutionIndex() {
return Context.current().get(EXECUTION_INDEX_KEY);
}

@Override
public void setRequestId(String requestId) {
this.context = this.context.with(REQUEST_ID_KEY, requestId);
logger.log(Level.SEVERE, "setRequestId: " + requestId);
}

@Override
public void setVectorClock(VectorClock vectorClock) {
this.context = this.context.with(VCLOCK_KEY, vectorClock.toString());
logger.log(Level.SEVERE, "setVectorClock: " + vectorClock);
}

@Override
public void setOriginVectorClock(VectorClock originVectorClock) {
this.context = this.context.with(ORIGIN_VCLOCK_KEY, originVectorClock.toString());
logger.log(Level.SEVERE, "setOriginVectorClock: " + originVectorClock);
}

@Override
public void setExecutionIndex(String executionIndex) {
this.context = this.context.with(EXECUTION_INDEX_KEY, executionIndex);
logger.log(Level.SEVERE, "setExecutionIndex: " + executionIndex);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.opentelemetry.instrumentation.armeria.v1_3;

import cloud.filibuster.instrumentation.libraries.armeria.http.FilibusterDecoratingHttpClient;
import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.logging.RequestLog;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;

import javax.annotation.Nullable;
import java.util.logging.Level;
import java.util.logging.Logger;

public class OpenTelemetryFilibusterDecoratingHttpClient extends FilibusterDecoratingHttpClient {
private static final Logger logger = Logger.getLogger(OpenTelemetryFilibusterDecoratingHttpClient.class.getName());

@Nullable
private final Instrumenter<ClientRequestContext, RequestLog> clientInstrumentor;

@SuppressWarnings("NullAway")
private Context parentContext;

@SuppressWarnings("NullAway")
private Context context;

public OpenTelemetryFilibusterDecoratingHttpClient(HttpClient delegate, String serviceName, Instrumenter<ClientRequestContext, RequestLog> clientInstrumentor) {
super(delegate);
this.serviceName = serviceName;
this.clientInstrumentor = clientInstrumentor;
this.contextStorage = new OpenTelemetryContextStorage();
}

public OpenTelemetryFilibusterDecoratingHttpClient(HttpClient delegate, Instrumenter<ClientRequestContext, RequestLog> clientInstrumentor) {
super(delegate);
this.serviceName = System.getenv("SERVICE_NAME");
this.clientInstrumentor = clientInstrumentor;
this.contextStorage = new OpenTelemetryContextStorage();
}

@Override
protected void setupContext(ClientRequestContext ctx, HttpRequest req) {
this.parentContext = Context.current();
this.context = Context.current();

logger.log(Level.INFO, "****************************************************************");
logger.log(Level.SEVERE, "CLIENT parentContext: " + parentContext.toString());
logger.log(Level.INFO, "****************************************************************");

if (clientInstrumentor != null) {
this.context = clientInstrumentor.start(Context.current(), ctx);
}

logger.log(Level.INFO, "****************************************************************");
logger.log(Level.SEVERE, "CLIENT context: " + context.toString());
logger.log(Level.INFO, "****************************************************************");
}

@Override
protected void contextWhenComplete(ClientRequestContext ctx) {
ctx.log().whenComplete().thenAccept(log -> {
if (clientInstrumentor != null) {
clientInstrumentor.end(context, ctx, log, log.responseCause());
}
});
}

@Override
protected HttpResponse delegateWithContext(ClientRequestContext ctx, HttpRequest req) throws Exception {
HttpResponse response;

try (Scope ignored = context.makeCurrent()) {
logger.log(Level.INFO, "!!!!!!! with context: " + context.toString());
response = unwrap().execute(ctx, req);
}

return response;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.opentelemetry.instrumentation.armeria.v1_3;

import cloud.filibuster.instrumentation.libraries.armeria.http.FilibusterDecoratingHttpService;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.ServiceRequestContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;

import javax.annotation.Nullable;
import java.util.logging.Logger;

public class OpenTelemetryFilibusterDecoratingHttpService extends FilibusterDecoratingHttpService {
private static final Logger logger = Logger.getLogger(OpenTelemetryFilibusterDecoratingHttpService.class.getName());

@Nullable
private final Instrumenter<ServiceRequestContext, RequestLog> serverInstrumenter;

@SuppressWarnings("NullAway")
private Context context;

public OpenTelemetryFilibusterDecoratingHttpService(HttpService delegate, String serviceName, Instrumenter<ServiceRequestContext, RequestLog> serverInstrumenter) {
super(delegate);
this.serviceName = serviceName;
this.serverInstrumenter = serverInstrumenter;
this.contextStorage = new OpenTelemetryContextStorage();
}

public OpenTelemetryFilibusterDecoratingHttpService(HttpService delegate, Instrumenter<ServiceRequestContext, RequestLog> serverInstrumenter) {
super(delegate);
this.serviceName = System.getenv("SERVICE_NAME");
this.serverInstrumenter = serverInstrumenter;
this.contextStorage = new OpenTelemetryContextStorage();
}

@Override
protected void setupContext(ServiceRequestContext ctx, HttpRequest req) {
OpenTelemetryContextStorage openTelemetryContextStorage = (OpenTelemetryContextStorage) this.contextStorage;
context = openTelemetryContextStorage.getContext();

if (serverInstrumenter != null) {
context = serverInstrumenter.start(context, ctx);
}
}

@Override
protected void contextWhenComplete(ServiceRequestContext ctx) {
ctx.log().whenComplete().thenAccept(log -> {
if (serverInstrumenter != null) {
serverInstrumenter.end(context, ctx, log, log.responseCause());
}
});
}

@Override
protected HttpResponse delegateWithContext(ServiceRequestContext ctx, HttpRequest req) throws Exception {
try (Scope ignored = context.makeCurrent()) {
HttpService delegate = (HttpService) unwrap();
return delegate.serve(ctx, req);
}
}
}
17 changes: 17 additions & 0 deletions instrumentation/grpc-1.6/library/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,28 @@ val grpcVersion = "1.6.0"
dependencies {
library("io.grpc:grpc-core:$grpcVersion")

library("cloud.filibuster:instrumentation:0.19-SNAPSHOT")

testLibrary("io.grpc:grpc-netty:$grpcVersion")
testLibrary("io.grpc:grpc-protobuf:$grpcVersion")
testLibrary("io.grpc:grpc-services:$grpcVersion")
testLibrary("io.grpc:grpc-stub:$grpcVersion")

library("com.github.cliftonlabs:json-simple:2.1.2")
library("org.json:json:20210307")

testImplementation("org.assertj:assertj-core")
testImplementation(project(":instrumentation:grpc-1.6:testing"))
}

repositories {
mavenCentral()

maven {
url = uri("https://maven.pkg.github.com/filibuster-testing/filibuster-java")
credentials {
username = project.findProperty("gpr.user") as String? ?: System.getenv("GITHUB_USERNAME")
password = project.findProperty("gpr.key") as String? ?: System.getenv("GITHUB_TOKEN")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;

/** Entrypoint for tracing gRPC servers or clients. */
@SuppressWarnings("FieldCanBeLocal")
public final class GrpcTracing {

/** Returns a new {@link GrpcTracing} configured with the given {@link OpenTelemetry}. */
Expand Down Expand Up @@ -46,14 +47,14 @@ public static GrpcTracingBuilder newBuilder(OpenTelemetry openTelemetry) {
* io.grpc.ManagedChannelBuilder#intercept(ClientInterceptor...)}.
*/
public ClientInterceptor newClientInterceptor() {
return new TracingClientInterceptor(clientInstrumenter, propagators);
return new OpenTelemetryFilibusterClientInterceptor(clientInstrumenter, propagators);
}

/**
* Returns a new {@link ServerInterceptor} for use with methods like {@link
* io.grpc.ServerBuilder#intercept(ServerInterceptor)}.
*/
public ServerInterceptor newServerInterceptor() {
return new TracingServerInterceptor(serverInstrumenter, captureExperimentalSpanAttributes);
return new OpenTelemetryFilibusterServerInterceptor(serverInstrumenter);
}
}
Loading