Skip to content

Commit

Permalink
新增Thrift instrumentation 支持 #3
Browse files Browse the repository at this point in the history
  • Loading branch information
liurui committed May 10, 2023
1 parent 63aae00 commit c0cd413
Show file tree
Hide file tree
Showing 34 changed files with 1,446 additions and 0 deletions.
18 changes: 18 additions & 0 deletions instrumentation/apache-thrift/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("org.apache.thrift")
module.set("libthrift")
versions.set("[0.9.3,)")
assertInverse.set(true)
}
}


dependencies {
implementation(project(":instrumentation:apache-thrift:library-autoconfigure"))
library("org.apache.thrift:libthrift:0.9.3")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.opentelemetry.javaagent.instrumentation.apachethrift;

import io.opentelemetry.instrumentation.apachethrift.ThriftAsyncMethodCallback;
import io.opentelemetry.instrumentation.apachethrift.ThriftConstants;
import net.bytebuddy.asm.Advice;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.async.TAsyncMethodCall;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncMethodCallConstructorAdvice {
private static final Logger logger = LoggerFactory.getLogger(AsyncMethodCallConstructorAdvice.class);

@SuppressWarnings({"rawtypes","unchecked"})
@Advice.OnMethodExit(suppress = Throwable.class)
public static void after(@Advice.This TAsyncMethodCall objInst
, @Advice.AllArguments Object[] args) {
if (args[3] instanceof AsyncMethodCallback) {
AsyncMethodCallback<Object> callback = (AsyncMethodCallback) args[3];
try {
ThriftConstants.setValue(TAsyncMethodCall.class, objInst, "callback", new ThriftAsyncMethodCallback<Object>(callback,null));
} catch (Exception e) {
logger.error("set value error:", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.opentelemetry.javaagent.instrumentation.apachethrift;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.apachethrift.TClientContext;
import io.opentelemetry.instrumentation.apachethrift.ThriftAsyncMethodCallback;
import io.opentelemetry.instrumentation.apachethrift.ThriftConstants;
import net.bytebuddy.asm.Advice;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.async.TAsyncMethodCall;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.opentelemetry.instrumentation.apachethrift.ThriftConstants.CONTEXT_THREAD;
import static io.opentelemetry.instrumentation.apachethrift.ThriftSingletons.clientInstrumenter;
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;


public class AsyncMethodCallMethodAdvice {
public static final Logger logger = LoggerFactory.getLogger(AsyncMethodCallMethodAdvice.class);

@SuppressWarnings({"unchecked","rawtypes","unused"})
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.This TAsyncMethodCall methodCall,
@Advice.AllArguments Object[] args,
@Advice.FieldValue("callback") AsyncMethodCallback<Object> callback) {
Context parentContext = currentContext();
TClientContext request = new TClientContext(methodCall.getClass().getName(),null);
if (!clientInstrumenter().shouldStart(parentContext, request)) {
return;
}
Context context = clientInstrumenter().start(parentContext, request);
Scope scope = context.makeCurrent();
Span span = Span.fromContext(context);
CONTEXT_THREAD.set(request);
try {
ThriftConstants.setValue(TAsyncMethodCall.class, methodCall, "callback", new ThriftAsyncMethodCallback<Object>(callback, context));
} catch (Exception e) {
if (logger.isDebugEnabled()){
logger.debug("set value callback fail",e);
}
logger.error("set value callback fail",e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.opentelemetry.javaagent.instrumentation.apachethrift;

import io.opentelemetry.instrumentation.apachethrift.CTProtocolFactory;
import io.opentelemetry.instrumentation.apachethrift.ThriftConstants;
import net.bytebuddy.asm.Advice;
import org.apache.thrift.async.TAsyncClient;
import org.apache.thrift.protocol.TProtocolFactory;

public class TAsyncClientConstructorAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void exit(@Advice.This TAsyncClient tAsyncClient
, @Advice.FieldValue("___protocolFactory") TProtocolFactory protocolFactory
) throws NoSuchFieldException, IllegalAccessException {
ThriftConstants.setValue(
TAsyncClient.class,
tAsyncClient,
"___protocolFactory",
new CTProtocolFactory(protocolFactory)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.opentelemetry.javaagent.instrumentation.apachethrift;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

import static io.opentelemetry.instrumentation.apachethrift.ThriftConstants.TASYNC_CLIENT;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;

public class TAsyncClientInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named(TASYNC_CLIENT);
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(isConstructor()
,this.getClass().getPackage().getName() + ".TAsyncClientConstructorAdvice");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.opentelemetry.javaagent.instrumentation.apachethrift;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

import static io.opentelemetry.instrumentation.apachethrift.ThriftConstants.T_ASYNC_METHOD_CALL;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.extendsClass;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isProtected;
import static net.bytebuddy.matcher.ElementMatchers.named;

public class TAsyncMethodCallInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return extendsClass(named(T_ASYNC_METHOD_CALL));
}

@Override
public void transform(TypeTransformer transformer) {

transformer.applyAdviceToMethod(isConstructor()
,this.getClass().getPackage().getName()+ ".AsyncMethodCallConstructorAdvice");
transformer.applyAdviceToMethod(isMethod()
.and(isProtected())
.and(named("prepareMethodCall"))
,this.getClass().getPackage().getName() + ".AsyncMethodCallMethodAdvice");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.opentelemetry.javaagent.instrumentation.apachethrift;

import io.opentelemetry.instrumentation.apachethrift.AsyncContext;
import io.opentelemetry.instrumentation.apachethrift.ServerInProtocolWrapper;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.thrift.TBaseAsyncProcessor;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.AbstractNonblockingServer;

import static io.opentelemetry.instrumentation.apachethrift.ThriftConstants.CONTEXT_THREAD;
import static io.opentelemetry.instrumentation.apachethrift.ThriftConstants.T_BASE_ASYNC_PROCESSOR;
import static io.opentelemetry.instrumentation.apachethrift.ThriftSingletons.serverInstrumenter;
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.extendsClass;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;

@SuppressWarnings("unchecked")
public class TBaseAsyncProcessorInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return extendsClass(named(T_BASE_ASYNC_PROCESSOR));
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(isMethod()
.and(isPublic())
.and(named("process"))
, getClass().getName() + "$AsyncProcessAdvice");
}

@SuppressWarnings({"unchecked","rawtypes"})
public static class AsyncProcessAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.This TBaseAsyncProcessor tBaseAsyncProcessor
, @Advice.AllArguments Object[] args) {
TProtocol protocol = ((AbstractNonblockingServer.AsyncFrameBuffer) args[0]).getInputProtocol();
((ServerInProtocolWrapper) protocol).initial(new AsyncContext(tBaseAsyncProcessor.getProcessMapView()),currentContext());
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void after(@Advice.Thrown Throwable throwable) {
serverInstrumenter().end(currentContext(), CONTEXT_THREAD.get(), null, throwable);
CONTEXT_THREAD.remove();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.opentelemetry.javaagent.instrumentation.apachethrift;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.apachethrift.ServerInProtocolWrapper;
import io.opentelemetry.instrumentation.apachethrift.ThriftContext;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.thrift.TBaseProcessor;

import static io.opentelemetry.instrumentation.apachethrift.ThriftConstants.CONTEXT_THREAD;
import static io.opentelemetry.instrumentation.apachethrift.ThriftConstants.T_BASE_PROCESSOR;
import static io.opentelemetry.instrumentation.apachethrift.ThriftSingletons.serverInstrumenter;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.extendsClass;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;

public class TBaseProcessorInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return extendsClass(named(T_BASE_PROCESSOR));
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(isMethod()
.and(isPublic())
.and(named("process"))
, getClass().getName() + "$ProcessAdvice");
}

@SuppressWarnings({"unchecked","rawtypes"})
public static class ProcessAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void before(@Advice.This Object obj
, @Advice.AllArguments Object[] args) {
if (obj instanceof TBaseProcessor) {
Object in = args[0];
if (in instanceof ServerInProtocolWrapper) {
TBaseProcessor tBaseProcessor = (TBaseProcessor) obj;
((ServerInProtocolWrapper) in).initial(new ThriftContext(tBaseProcessor.getProcessMapView()),Context.current());
}
}
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void after(@Advice.Thrown Throwable throwable) {
Context context = Context.current();
if (context!=null) {
serverInstrumenter().end(Context.current(), CONTEXT_THREAD.get(), null, throwable);
CONTEXT_THREAD.remove();
}
//todo end
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.opentelemetry.javaagent.instrumentation.apachethrift;

import io.opentelemetry.instrumentation.apachethrift.ClientOutProtocolWrapper;
import io.opentelemetry.instrumentation.apachethrift.ThriftConstants;
import net.bytebuddy.asm.Advice;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.protocol.TProtocol;

public class TClientConstructorAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void exit(@Advice.This TServiceClient tServiceClient
, @Advice.FieldValue("oprot_") TProtocol oprot
) throws NoSuchFieldException, IllegalAccessException {
if (!(oprot instanceof ClientOutProtocolWrapper)) {
ThriftConstants.setValue(
TServiceClient.class,
tServiceClient,
"oprot_",
new ClientOutProtocolWrapper(oprot)
);
}
}
}
Loading

0 comments on commit c0cd413

Please sign in to comment.