Skip to content

Commit bbb177a

Browse files
authored
Support tracing for async producing, batch sync consuming, and batch async consuming in rocketMQ-client-java-5.x-plugin (#665)
1 parent d5b99f9 commit bbb177a

File tree

19 files changed

+1163
-131
lines changed

19 files changed

+1163
-131
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Release Notes.
1111
* Fix re-transform bug when plugin enhanced class proxy parent method.
1212
* Fix error HTTP status codes not recording as SLA failures in Vert.x plugins.
1313
* Support for HttpExchange request tracing
14+
* Support tracing for async producing, batch sync consuming, and batch async consuming in rocketMQ-client-java-5.x-plugin.
1415

1516
#### Documentation
1617

apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
2424
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
2525
import org.apache.skywalking.apm.agent.core.context.ContextManager;
26+
import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
2627
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
2728
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
2829
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
@@ -37,6 +38,7 @@
3738
public class MessageListenerInterceptor implements InstanceMethodsAroundInterceptor {
3839

3940
public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";
41+
public static final StringTag MQ_MESSAGE_ID = new StringTag("mq.message.id");
4042

4143
@Override
4244
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
@@ -47,7 +49,7 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
4749
AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + messageView.getTopic()
4850
+ "/Consumer", contextCarrier);
4951
Tags.MQ_TOPIC.set(span, messageView.getTopic());
50-
52+
span.tag(MQ_MESSAGE_ID, messageView.getMessageId().toString());
5153
Object skyWalkingDynamicField = objInst.getSkyWalkingDynamicField();
5254
if (skyWalkingDynamicField != null) {
5355
ConsumerEnhanceInfos consumerEnhanceInfos = (ConsumerEnhanceInfos) skyWalkingDynamicField;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;
20+
21+
import java.lang.reflect.Method;
22+
import java.util.Collection;
23+
import java.util.Map;
24+
import java.util.Optional;
25+
import java.util.concurrent.CompletableFuture;
26+
import org.apache.rocketmq.client.apis.message.Message;
27+
import org.apache.rocketmq.client.apis.message.MessageBuilder;
28+
import org.apache.rocketmq.client.apis.producer.SendReceipt;
29+
import org.apache.rocketmq.client.java.impl.ClientImpl;
30+
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
31+
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
32+
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
33+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
34+
import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
35+
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
36+
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
37+
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
38+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
39+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
40+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
41+
import org.apache.skywalking.apm.agent.core.util.CollectionUtil;
42+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
43+
import org.apache.skywalking.apm.util.StringUtil;
44+
45+
/**
46+
* {@link MessageSendAsyncInterceptor} create exit span when the method {@link org.apache.rocketmq.client.java.impl.producer.ProducerImpl#sendAsync(org.apache.rocketmq.client.apis.message.Message)}
47+
* execute
48+
*/
49+
public class MessageSendAsyncInterceptor implements InstanceMethodsAroundInterceptor {
50+
51+
public static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";
52+
public static final StringTag MQ_MESSAGE_ID = new StringTag("mq.message.id");
53+
public static final StringTag MQ_MESSAGE_KEYS = new StringTag("mq.message.keys");
54+
public static final StringTag MQ_MESSAGE_TAGS = new StringTag("mq.message.tags");
55+
56+
@Override
57+
public void beforeMethod(EnhancedInstance objInst,
58+
Method method,
59+
Object[] allArguments,
60+
Class<?>[] argumentsTypes,
61+
MethodInterceptResult result) throws Throwable {
62+
Message message = (Message) allArguments[0];
63+
ClientImpl producerImpl = (ClientImpl) objInst;
64+
65+
ContextCarrier contextCarrier = new ContextCarrier();
66+
String namingServiceAddress = producerImpl.getClientConfiguration().getEndpoints();
67+
AbstractSpan span = ContextManager.createExitSpan(
68+
buildOperationName(message.getTopic()), contextCarrier, namingServiceAddress);
69+
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
70+
Tags.MQ_BROKER.set(span, namingServiceAddress);
71+
Tags.MQ_TOPIC.set(span, message.getTopic());
72+
if (RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_KEYS) {
73+
Collection<String> keys = message.getKeys();
74+
if (!CollectionUtil.isEmpty(keys)) {
75+
span.tag(MQ_MESSAGE_KEYS, String.join(",", keys));
76+
}
77+
}
78+
if (RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_TAGS) {
79+
Optional<String> tag = message.getTag();
80+
tag.ifPresent(s -> span.tag(MQ_MESSAGE_TAGS, s));
81+
}
82+
83+
contextCarrier.extensionInjector().injectSendingTimestamp();
84+
SpanLayer.asMQ(span);
85+
86+
Map<String, String> properties = message.getProperties();
87+
CarrierItem next = contextCarrier.items();
88+
while (next.hasNext()) {
89+
next = next.next();
90+
if (!StringUtil.isEmpty(next.getHeadValue())) {
91+
properties.put(next.getHeadKey(), next.getHeadValue());
92+
}
93+
}
94+
95+
MessageBuilder messageBuilder = new MessageBuilderImpl();
96+
messageBuilder.setTopic(message.getTopic());
97+
if (message.getTag().isPresent()) {
98+
messageBuilder.setTag(message.getTag().get());
99+
}
100+
messageBuilder.setKeys(message.getKeys().toArray(new String[0]));
101+
if (message.getMessageGroup().isPresent()) {
102+
messageBuilder.setMessageGroup(message.getMessageGroup().get());
103+
}
104+
105+
byte[] body = new byte[message.getBody().limit()];
106+
message.getBody().get(body);
107+
messageBuilder.setBody(body);
108+
if (message.getDeliveryTimestamp().isPresent()) {
109+
messageBuilder.setDeliveryTimestamp(message.getDeliveryTimestamp().get());
110+
}
111+
112+
properties.forEach(messageBuilder::addProperty);
113+
allArguments[0] = messageBuilder.build();
114+
}
115+
116+
@Override
117+
public Object afterMethod(EnhancedInstance objInst,
118+
Method method,
119+
Object[] allArguments,
120+
Class<?>[] argumentsTypes,
121+
Object ret) throws Throwable {
122+
CompletableFuture<SendReceipt> future = (CompletableFuture<SendReceipt>) ret;
123+
AbstractSpan span = ContextManager.activeSpan();
124+
span.prepareForAsync();
125+
ContextManager.stopSpan();
126+
return future.whenCompleteAsync((sendReceipt, throwable) -> {
127+
if (null != throwable) {
128+
span.log(throwable);
129+
span.errorOccurred();
130+
span.asyncFinish();
131+
return;
132+
}
133+
if (sendReceipt == null || sendReceipt.getMessageId() == null) {
134+
span.asyncFinish();
135+
return;
136+
}
137+
span.tag(MQ_MESSAGE_ID, sendReceipt.getMessageId().toString());
138+
span.asyncFinish();
139+
});
140+
}
141+
142+
@Override
143+
public void handleMethodException(EnhancedInstance objInst,
144+
Method method,
145+
Object[] allArguments,
146+
Class<?>[] argumentsTypes,
147+
Throwable t) {
148+
ContextManager.activeSpan().log(t);
149+
}
150+
151+
private String buildOperationName(String topicName) {
152+
return ASYNC_SEND_OPERATION_NAME_PREFIX + topicName + "/Producer";
153+
}
154+
}

apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
2828
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
2929
import org.apache.skywalking.apm.agent.core.context.ContextManager;
30+
import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
3031
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
3132
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
3233
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
@@ -50,6 +51,9 @@
5051
public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor {
5152

5253
public static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";
54+
public static final StringTag MQ_MESSAGE_ID = new StringTag("mq.message.id");
55+
public static final StringTag MQ_MESSAGE_KEYS = new StringTag("mq.message.keys");
56+
public static final StringTag MQ_MESSAGE_TAGS = new StringTag("mq.message.tags");
5357

5458
@Override
5559
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
@@ -62,13 +66,17 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
6266
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
6367
Tags.MQ_BROKER.set(span, namingServiceAddress);
6468
Tags.MQ_TOPIC.set(span, message.getTopic());
65-
Collection<String> keys = message.getKeys();
66-
if (!CollectionUtil.isEmpty(keys)) {
67-
span.tag(Tags.ofKey("mq.message.keys"), keys.stream().collect(Collectors.joining(",")));
69+
if (RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_KEYS) {
70+
Collection<String> keys = message.getKeys();
71+
if (!CollectionUtil.isEmpty(keys)) {
72+
span.tag(MQ_MESSAGE_KEYS, keys.stream().collect(Collectors.joining(",")));
73+
}
6874
}
69-
Optional<String> tag = message.getTag();
70-
if (tag.isPresent()) {
71-
span.tag(Tags.ofKey("mq.message.tags"), tag.get());
75+
if (RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_TAGS) {
76+
Optional<String> tag = message.getTag();
77+
if (tag.isPresent()) {
78+
span.tag(MQ_MESSAGE_TAGS, tag.get());
79+
}
7280
}
7381

7482
contextCarrier.extensionInjector().injectSendingTimestamp();
@@ -108,7 +116,7 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
108116
SendReceipt sendReceipt = (SendReceipt) ret;
109117
if (sendReceipt != null && sendReceipt.getMessageId() != null) {
110118
AbstractSpan activeSpan = ContextManager.activeSpan();
111-
activeSpan.tag(Tags.ofKey("mq.message.id"), sendReceipt.getMessageId().toString());
119+
activeSpan.tag(MQ_MESSAGE_ID, sendReceipt.getMessageId().toString());
112120
}
113121
ContextManager.stopSpan();
114122
return ret;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;
20+
21+
import org.apache.skywalking.apm.agent.core.boot.PluginConfig;
22+
23+
public class RocketMqClientJavaPluginConfig {
24+
public static class Plugin {
25+
@PluginConfig(root = RocketMqClientJavaPluginConfig.class)
26+
public static class Rocketmqclient {
27+
/**
28+
* This config item controls that whether the RocketMqClientJava plugin should collect the keys of the message.
29+
*/
30+
public static boolean COLLECT_MESSAGE_KEYS = false;
31+
/**
32+
* This config item controls that whether the RocketMqClientJava plugin should collect the tags of the message.
33+
*/
34+
public static boolean COLLECT_MESSAGE_TAGS = false;
35+
}
36+
}
37+
}

0 commit comments

Comments
 (0)