Skip to content

Commit

Permalink
#43 and #45
Browse files Browse the repository at this point in the history
  • Loading branch information
songlonqi-java committed Jul 11, 2023
1 parent 5664db4 commit ab83eb8
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ public class MessageMapSetter implements AgentPropagation.Setter<MessageBuilderI
@Override
public void set(MessageBuilderImpl carrier, String key, String value) {
if (log.isDebugEnabled()) {
// System.out.println("dubbo Inject " + key + ":\t" + value);
log.debug("dubbo Inject {} :\t {}" , key , value);
log.debug("rocketmq Inject {} :\t {}" , key , value);
}
carrier.addProperty(key,value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ public void forEachKey(MessageView carrier, AgentPropagation.KeyClassifier class
log.debug("Extract size: {}",properties.entrySet().size());
}
for (Map.Entry<String,String> entry : properties.entrySet()){
log.debug("Extract "+entry.getKey()+"\t"+entry.getValue());
System.out.println("Extract "+entry.getKey()+"\t"+entry.getValue());
if (null != entry.getValue()) {
if (!classifier.accept(entry.getKey(), entry.getValue())) {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package datadog.trace.instrumentation.rocketmq;

import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
Expand All @@ -22,7 +21,7 @@ public class RocketMqDecorator extends BaseDecorator {
private static final String BROKER_HOST = "bornHost";
private static final String BROKER_ADDR = "bornAddr";
private static final String BROKER_NAME = "brokerName";
private static final String TOPIC = "brokerName";
private static final String TOPIC = "topic";
private static final String MESSAGING_ROCKETMQ_TAGS = "messaging.rocketmq.tags";
private static final String MESSAGING_ROCKETMQ_BROKER_ADDRESS = "messaging.rocketmq.broker_address";
private static final String MESSAGING_ROCKETMQ_SEND_RESULT = "messaging.rocketmq.send_result";
Expand All @@ -48,16 +47,17 @@ protected CharSequence component() {
return null;
}

private static final String LOCAL_SERVICE_NAME =
Config.get().getServiceName();
private static final String LOCAL_SERVICE_NAME = "rocketmq";


public AgentScope start(ConsumeMessageContext context) {
MessageExt ext = context.getMsgList().get(0);
AgentSpan.Context parentContext = propagate().extract(ext, GETTER);
UTF8BytesString name = UTF8BytesString.create(ext.getTopic() + " send");
UTF8BytesString name = UTF8BytesString.create(ext.getTopic() + " receive");
final AgentSpan span = startSpan(name, parentContext);
span.setResourceName(LOCAL_SERVICE_NAME);
span.setResourceName(name);

span.setServiceName(LOCAL_SERVICE_NAME);

span.setTag(BROKER_NAME, ext.getBrokerName());
String tags = ext.getTags();
Expand Down Expand Up @@ -94,10 +94,11 @@ public AgentScope start(SendMessageContext context) {
String topic = context.getMessage().getTopic();
UTF8BytesString spanName = UTF8BytesString.create(topic + " send");
final AgentSpan span = startSpan(spanName);
span.setResourceName(LOCAL_SERVICE_NAME);
span.setResourceName(spanName);

span.setTag(BROKER_HOST, context.getBornHost());
span.setTag(BROKER_ADDR, context.getBrokerAddr());

span.setServiceName(LOCAL_SERVICE_NAME);
if (context.getMessage() != null) {
String tags = context.getMessage().getTags();
if (tags != null) {
Expand Down Expand Up @@ -129,7 +130,10 @@ public void end(SendMessageContext context, AgentScope scope) {
if (null != exception) {
onError(scope, exception);
}
scope.span().setTag(MESSAGING_ROCKETMQ_SEND_RESULT, context.getSendResult().getSendStatus().name());
if (context.getSendResult() != null&&context.getSendResult().getSendStatus() != null){
scope.span().setTag(MESSAGING_ROCKETMQ_SEND_RESULT, context.getSendResult().getSendStatus().name());
}

beforeFinish(scope.span());
scope.close();
scope.span().finish();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package datadog.trace.instrumentation.rocketmq;

import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;

public final class RocketMqHook {
public static final ConsumeMessageHook CONSUME_MESSAGE_HOOK = new TracingConsumeMessageHookImpl();

public static final SendMessageHook SEND_MESSAGE_HOOK = new TracingSendMessageHookImpl();
public static ConsumeMessageHook buildConsumerHook(ContextStore<ConsumeMessageContext, AgentScope> contextStore){
return new TracingConsumeMessageHookImpl(contextStore);
}
public static SendMessageHook buildSendHook(ContextStore<SendMessageContext, AgentScope> contextStore){
return new TracingSendMessageHookImpl(contextStore);
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package datadog.trace.instrumentation.rocketmq;


import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import java.util.HashMap;
import java.util.Map;

import static net.bytebuddy.matcher.ElementMatchers.*;

Expand Down Expand Up @@ -43,6 +48,13 @@ public String[] helperClassNames() {
};
}

@Override
public Map<String, String> contextStore() {
Map<String, String> map = new HashMap<>(1);
map.put("org.apache.rocketmq.client.hook.ConsumeMessageContext", "datadog.trace.bootstrap.instrumentation.api.AgentScope");
return map;
}

@Override
public void adviceTransformations(AdviceTransformation transformation) {

Expand All @@ -60,8 +72,11 @@ public static void onEnter(
@Advice.FieldValue(
value = "defaultMQPushConsumerImpl", declaringType = DefaultMQPushConsumer.class)
DefaultMQPushConsumerImpl defaultMqPushConsumerImpl) {
defaultMqPushConsumerImpl.registerConsumeMessageHook(
RocketMqHook.CONSUME_MESSAGE_HOOK);
final ContextStore<ConsumeMessageContext, AgentScope> contextStore =
InstrumentationContext.get(ConsumeMessageContext.class, AgentScope.class);

defaultMqPushConsumerImpl.registerConsumeMessageHook(RocketMqHook.buildConsumerHook(contextStore));

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

import java.util.HashMap;
import java.util.Map;

import static net.bytebuddy.matcher.ElementMatchers.*;

@AutoService(Instrumenter.class)
Expand All @@ -20,11 +27,6 @@ public RocketMqSendInstrumentation() {
super("rocketmq", "rocketmq-client");
}

// @Override
// public ElementMatcher<ClassLoader> classLoaderMatcher() {
// return hasClassesNamed(CLASS_NAME);
// }

@Override
public String hierarchyMarkerType() {
return CLASS_NAME;
Expand All @@ -46,6 +48,12 @@ public String[] helperClassNames() {
packageName + ".TextMapInjectAdapter",
};
}
@Override
public Map<String, String> contextStore() {
Map<String, String> map = new HashMap<>(1);
map.put("org.apache.rocketmq.client.hook.SendMessageContext", "datadog.trace.bootstrap.instrumentation.api.AgentScope");
return map;
}

@Override
public void adviceTransformations(AdviceTransformation transformation) {
Expand All @@ -61,8 +69,10 @@ public static class AdviceStart {
public static void onEnter(
@Advice.FieldValue(value = "defaultMQProducerImpl", declaringType = DefaultMQProducer.class)
DefaultMQProducerImpl defaultMqProducerImpl) {
defaultMqProducerImpl.registerSendMessageHook(
RocketMqHook.SEND_MESSAGE_HOOK);
final ContextStore<SendMessageContext, AgentScope> contextStore =
InstrumentationContext.get(SendMessageContext.class, AgentScope.class);

defaultMqProducerImpl.registerSendMessageHook(RocketMqHook.buildSendHook(contextStore));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package datadog.trace.instrumentation.rocketmq;


import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;

final class TracingConsumeMessageHookImpl implements ConsumeMessageHook {
public final class TracingConsumeMessageHookImpl implements ConsumeMessageHook {
private final RocketMqDecorator rocketMqDecorator;
private final ContextStore<ConsumeMessageContext,AgentScope> scopeAccessor;

TracingConsumeMessageHookImpl() {
TracingConsumeMessageHookImpl(ContextStore<ConsumeMessageContext,AgentScope> scopeAccessor) {
this.rocketMqDecorator = new RocketMqDecorator();
this.scopeAccessor = scopeAccessor;
}

@Override
Expand All @@ -23,18 +25,16 @@ public void consumeMessageBefore(ConsumeMessageContext context) {
return;
}
AgentScope scope = rocketMqDecorator.start(context);
Object o = context.getMqTraceContext();
if (o == null){
context.setMqTraceContext(scope);
}
System.out.println("start Span and put to ContextStore");
scopeAccessor.put(context,scope);
}

@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
AgentScope scope = (AgentScope) context.getMqTraceContext();
AgentScope scope = scopeAccessor.get(context);
if (scope!=null){
rocketMqDecorator.end(context, scope);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package datadog.trace.instrumentation.rocketmq;

import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;

final class TracingSendMessageHookImpl implements SendMessageHook {
public final class TracingSendMessageHookImpl implements SendMessageHook {

private final RocketMqDecorator rocketMqDecorator;
private final ContextStore<SendMessageContext,AgentScope> scopeAccessor;

TracingSendMessageHookImpl() {
TracingSendMessageHookImpl(ContextStore<SendMessageContext,AgentScope> scopeAccessor) {
this.rocketMqDecorator = new RocketMqDecorator();
this.scopeAccessor = scopeAccessor;
}

@Override
Expand All @@ -23,18 +26,16 @@ public void sendMessageBefore(SendMessageContext context) {
return;
}
AgentScope scope = rocketMqDecorator.start(context);
Object o = context.getMqTraceContext();
if( o == null){
context.setMqTraceContext(scope);
}
scopeAccessor.put(context,scope);

}

@Override
public void sendMessageAfter(SendMessageContext context) {
if (context == null) {
return;
}
AgentScope scope = (AgentScope) context.getMqTraceContext();
AgentScope scope = scopeAccessor.get(context);
if (scope != null) {
rocketMqDecorator.end(context, scope);
}
Expand Down

0 comments on commit ab83eb8

Please sign in to comment.