Skip to content

Commit

Permalink
rocketmq
Browse files Browse the repository at this point in the history
  • Loading branch information
songlonqi-java committed Jul 27, 2023
1 parent ef78da1 commit afa0f43
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,9 @@ public void onFailure(Throwable t) {
scope.finish();
}
}

// Tried to close datadog.trace.agent.core.scopemanager.ContinuableScope@4373e60c->
// DDSpan [ t_id=9178204451511121555, s_id=8568375034643761603, p_id=3362837340041904303 ] trace=order-center/dubbo/com.k1k.ordercenter.order.rpc.api.OrderRpcCmdServiceI.placeOrder(PlaceOrderCmd) tags={_sample_rate=1, component=apache-dubbo, dubbo_method=placeOrder, dubbo_short_url=registry://hw-test-nacos.aidyd.com:8848/com.k1k.ordercenter.order.rpc.api.OrderRpcCmdServiceI.placeOrder(PlaceOrderCmd), dubbo_side=provider, dubbo_url=registry://hw-test-nacos.aidyd.com:8848/org.apache.dubbo.registry.RegistryService?application=order&dubbo=2.0.2&namespace=rpc-dev&pid=9&registry=nacos&release=3.0.5&timestamp=1690192157364, dubbo_version=3.0.5, env=test, language=jvm, process_id=9, runtime-id=e9750af1-b4a0-47d7-a5b3-37f28220ac15, thread.id=4817, thread.name=DubboServerHandler-172.20.202.60:20895-thread-200},duration_ns=0
// scope when not on top.
// Current top: datadog.trace.agent.core.scopemanager.ContinuableScope@65c8e2ec->
// DDSpan [ t_id=9178204451511121555, s_id=4239552935852319480, p_id=8568375034643761603 ] trace=order-center/ORDER_AUTO_CLOSE_TOPIC send/order-center tags={bornAddr=10.20.0.10:10101, bornHost=172.20.202.60, brokerName=ORDER_AUTO_CLOSE_TOPIC, env=test, messaging.rocketmq.broker_address=10.20.0.10:10101, messaging.rocketmq.tags=ORDER_AUTO_CLOSE_TAG, thread.id=4817, thread.name=DubboServerHandler-172.20.202.60:20895-thread-200}, duration_ns=0
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.SocketAddress;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.*;
Expand Down Expand Up @@ -87,12 +86,13 @@ private static String getBrokerHost(SocketAddress storeHost) {
return storeHost.toString().replace("/", "");
}

public void end(ConsumeMessageContext context, AgentScope scope) {
public void end(ConsumeMessageContext context) {
String status = context.getStatus();
AgentScope scope = activeScope();
scope.span().setTag("status", status);
beforeFinish(scope);
scope.close();
scope.span().finish();
scope.close();
if (log.isDebugEnabled()){
log.debug("consumer span end");
}
Expand Down Expand Up @@ -136,8 +136,9 @@ public AgentScope start(SendMessageContext context) {
return scope;
}

public void end(SendMessageContext context, AgentScope scope) {
public void end(SendMessageContext context) {
Exception exception = context.getException();
AgentScope scope = activeScope();
if (null != exception) {
onError(scope, exception);
}
Expand All @@ -146,8 +147,8 @@ public void end(SendMessageContext context, AgentScope scope) {
}

beforeFinish(scope);
scope.close();
scope.span().finish();
scope.close();
if (log.isDebugEnabled()){
log.debug("consumer span end");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import org.apache.rocketmq.client.hook.SendMessageHook;

public final class RocketMqHook {
public static ConsumeMessageHook buildConsumerHook(ContextStore<ConsumeMessageContext, AgentScope> contextStore){
return new TracingConsumeMessageHookImpl(contextStore);
public static ConsumeMessageHook buildConsumerHook(){
return new TracingConsumeMessageHookImpl();
}
public static SendMessageHook buildSendHook(ContextStore<SendMessageContext, AgentScope> contextStore){
return new TracingSendMessageHookImpl(contextStore);
public static SendMessageHook buildSendHook(){
return new TracingSendMessageHookImpl();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ public static void onEnter(
@Advice.FieldValue(
value = "defaultMQPushConsumerImpl", declaringType = DefaultMQPushConsumer.class)
DefaultMQPushConsumerImpl defaultMqPushConsumerImpl) {
final ContextStore<ConsumeMessageContext, AgentScope> contextStore =
InstrumentationContext.get(ConsumeMessageContext.class, AgentScope.class);

defaultMqPushConsumerImpl.registerConsumeMessageHook(RocketMqHook.buildConsumerHook(contextStore));
defaultMqPushConsumerImpl.registerConsumeMessageHook(RocketMqHook.buildConsumerHook());

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ public String[] helperClassNames() {
packageName + ".TextMapInjectAdapter",
};
}
@Override
public Map<String, String> contextStore() {
Map<String, String> map = new HashMap<>(1);
map.put("org.apache.rocketmq.client.hook.SendMessageContext", "datadog.trace.bootstrap.instrumentation.api.AgentScope");
return map;
}

@Override
public void adviceTransformations(AdviceTransformation transformation) {
Expand All @@ -69,10 +63,8 @@ public static class AdviceStart {
public static void onEnter(
@Advice.FieldValue(value = "defaultMQProducerImpl", declaringType = DefaultMQProducer.class)
DefaultMQProducerImpl defaultMqProducerImpl) {
final ContextStore<SendMessageContext, AgentScope> contextStore =
InstrumentationContext.get(SendMessageContext.class, AgentScope.class);

defaultMqProducerImpl.registerSendMessageHook(RocketMqHook.buildSendHook(contextStore));
defaultMqProducerImpl.registerSendMessageHook(RocketMqHook.buildSendHook());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

public final class TracingConsumeMessageHookImpl implements ConsumeMessageHook {
private final RocketMqDecorator rocketMqDecorator;
private final ContextStore<ConsumeMessageContext,AgentScope> scopeAccessor;
//private final ContextStore<ConsumeMessageContext,AgentScope> scopeAccessor;

TracingConsumeMessageHookImpl(ContextStore<ConsumeMessageContext,AgentScope> scopeAccessor) {
TracingConsumeMessageHookImpl() {
this.rocketMqDecorator = new RocketMqDecorator();
this.scopeAccessor = scopeAccessor;
// this.scopeAccessor = scopeAccessor;
}

@Override
Expand All @@ -24,20 +24,15 @@ public void consumeMessageBefore(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
AgentScope scope = rocketMqDecorator.start(context);
// System.out.println("start Span and put to ContextStore");
scopeAccessor.put(context,scope);
rocketMqDecorator.start(context);
}

@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
AgentScope scope = scopeAccessor.get(context);
if (scope!=null){
rocketMqDecorator.end(context, scope);
}
rocketMqDecorator.end(context);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
public final class TracingSendMessageHookImpl implements SendMessageHook {

private final RocketMqDecorator rocketMqDecorator;
private final ContextStore<SendMessageContext,AgentScope> scopeAccessor;
// private final ContextStore<SendMessageContext,AgentScope> scopeAccessor;

TracingSendMessageHookImpl(ContextStore<SendMessageContext,AgentScope> scopeAccessor) {
private AgentScope scope;

TracingSendMessageHookImpl() {
this.rocketMqDecorator = new RocketMqDecorator();
this.scopeAccessor = scopeAccessor;
// this.scopeAccessor = scopeAccessor;
}

@Override
Expand All @@ -25,19 +27,14 @@ public void sendMessageBefore(SendMessageContext context) {
if (context == null) {
return;
}
AgentScope scope = rocketMqDecorator.start(context);
scopeAccessor.put(context,scope);

rocketMqDecorator.start(context);
}

@Override
public void sendMessageAfter(SendMessageContext context) {
if (context == null) {
return;
}
AgentScope scope = scopeAccessor.get(context);
if (scope != null) {
rocketMqDecorator.end(context, scope);
}
rocketMqDecorator.end(context);
}
}

0 comments on commit afa0f43

Please sign in to comment.