Skip to content

Commit e4a7db1

Browse files
committed
Fix gRPC plugin not working for server side in some case
1 parent 9cb5a13 commit e4a7db1

File tree

6 files changed

+51
-36
lines changed

6 files changed

+51
-36
lines changed

CHANGES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ Release Notes.
1212
* Remove Powermock entirely from the test cases.
1313
* Fix H2 instrumentation point
1414
* Refactor pipeline in jedis-plugin.
15-
* Enhance kotlin coroutine plugin for stack tracing.
1615
* Add plugin to support ClickHouse JDBC driver (0.3.2.*).
16+
* Change gRPC instrumentation point to fix plugin not working for server side.
1717

1818
#### Documentation
1919
* Update docs of Tracing APIs, reorganize the API docs into six parts.

apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/define/AbstractServerImplBuilderInstrumentation.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,14 @@
2626
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
2727

2828
import static net.bytebuddy.matcher.ElementMatchers.named;
29-
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
29+
import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments;
3030
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
3131

3232
public class AbstractServerImplBuilderInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
3333

3434
public static final String ENHANCE_CLASS = "io.grpc.internal.AbstractServerImplBuilder";
35-
public static final String ENHANCE_METHOD = "addService";
35+
public static final String ENHANCE_METHOD = "build";
3636
public static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.grpc.v1.server.AbstractServerImplBuilderInterceptor";
37-
public static final String ARGUMENT_TYPE = "io.grpc.ServerServiceDefinition";
3837

3938
@Override
4039
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
@@ -47,7 +46,7 @@ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
4746
new InstanceMethodsInterceptPoint() {
4847
@Override
4948
public ElementMatcher<MethodDescription> getMethodsMatcher() {
50-
return named(ENHANCE_METHOD).and(takesArgumentWithType(0, ARGUMENT_TYPE));
49+
return named(ENHANCE_METHOD).and(takesNoArguments());
5150
}
5251

5352
@Override

apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/AbstractServerImplBuilderInterceptor.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818

1919
package org.apache.skywalking.apm.plugin.grpc.v1.server;
2020

21-
import io.grpc.ServerInterceptors;
22-
import io.grpc.ServerServiceDefinition;
21+
import io.grpc.ServerBuilder;
2322

2423
import java.lang.reflect.Method;
2524

@@ -34,7 +33,12 @@ public class AbstractServerImplBuilderInterceptor implements InstanceMethodsArou
3433
@Override
3534
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
3635
MethodInterceptResult result) {
37-
allArguments[0] = ServerInterceptors.intercept((ServerServiceDefinition) allArguments[0], new ServerInterceptor());
36+
if (objInst.getSkyWalkingDynamicField() == null) {
37+
ServerBuilder<?> builder = (ServerBuilder) objInst;
38+
ServerInterceptor interceptor = new ServerInterceptor();
39+
builder.intercept(interceptor);
40+
objInst.setSkyWalkingDynamicField(interceptor);
41+
}
3842
}
3943

4044
@Override

apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,25 @@
1818

1919
package org.apache.skywalking.apm.plugin.grpc.v1.server;
2020

21+
import io.grpc.Context;
22+
import io.grpc.Contexts;
2123
import io.grpc.Metadata;
2224
import io.grpc.ServerCall;
2325
import io.grpc.ServerCallHandler;
2426
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
2527
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
28+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
29+
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
30+
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
31+
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
32+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
2633
import org.apache.skywalking.apm.util.StringUtil;
2734

2835
public class ServerInterceptor implements io.grpc.ServerInterceptor {
36+
37+
static final Context.Key<ContextSnapshot> CONTEXT_SNAPSHOT_KEY = Context.key("skywalking-grpc-context-snapshot");
38+
static final Context.Key<AbstractSpan> ACTIVE_SPAN_KEY = Context.key("skywalking-grpc-active-span");
39+
2940
@Override
3041
public <REQUEST, RESPONSE> ServerCall.Listener<REQUEST> interceptCall(ServerCall<REQUEST, RESPONSE> call,
3142
Metadata headers, ServerCallHandler<REQUEST, RESPONSE> handler) {
@@ -38,7 +49,26 @@ public <REQUEST, RESPONSE> ServerCall.Listener<REQUEST> interceptCall(ServerCall
3849
next.setHeadValue(contextValue);
3950
}
4051
}
41-
return new TracingServerCallListener<>(handler.startCall(new TracingServerCall<>(call), headers), call
42-
.getMethodDescriptor(), contextCarrier);
52+
53+
final AbstractSpan span = ContextManager
54+
.createEntrySpan(call.getMethodDescriptor().getFullMethodName(), contextCarrier);
55+
span.setComponent(ComponentsDefine.GRPC);
56+
span.setLayer(SpanLayer.RPC_FRAMEWORK);
57+
ContextSnapshot contextSnapshot = ContextManager.capture();
58+
AbstractSpan asyncSpan = span.prepareForAsync();
59+
60+
Context context = Context.current().withValues(CONTEXT_SNAPSHOT_KEY, contextSnapshot, ACTIVE_SPAN_KEY, asyncSpan);
61+
62+
ServerCall.Listener<REQUEST> listener = Contexts.interceptCall(
63+
context,
64+
new TracingServerCall<>(call),
65+
headers,
66+
(serverCall, metadata) -> new TracingServerCallListener<>(
67+
handler.startCall(serverCall, metadata),
68+
serverCall.getMethodDescriptor()
69+
)
70+
);
71+
ContextManager.stopSpan(asyncSpan);
72+
return listener;
4373
}
4474
}

apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public void sendMessage(RESPONSE message) {
4949
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + RESPONSE_ON_MESSAGE_OPERATION_NAME);
5050
span.setComponent(ComponentsDefine.GRPC);
5151
span.setLayer(SpanLayer.RPC_FRAMEWORK);
52-
52+
ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
5353
try {
5454
super.sendMessage(message);
5555
} catch (Throwable t) {
@@ -68,6 +68,7 @@ public void close(Status status, Metadata trailers) {
6868
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + RESPONSE_ON_CLOSE_OPERATION_NAME);
6969
span.setComponent(ComponentsDefine.GRPC);
7070
span.setLayer(SpanLayer.RPC_FRAMEWORK);
71+
ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
7172
switch (status.getCode()) {
7273
case OK:
7374
break;
@@ -93,6 +94,7 @@ public void close(Status status, Metadata trailers) {
9394
break;
9495
}
9596
Tags.RPC_RESPONSE_STATUS_CODE.set(span, status.getCode().name());
97+
ServerInterceptor.ACTIVE_SPAN_KEY.get().asyncFinish();
9698

9799
try {
98100
super.close(status, trailers);

apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@
2121
import io.grpc.ForwardingServerCallListener;
2222
import io.grpc.MethodDescriptor;
2323
import io.grpc.ServerCall;
24-
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
2524
import org.apache.skywalking.apm.agent.core.context.ContextManager;
26-
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
2725
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
2826
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
2927
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
@@ -36,18 +34,11 @@
3634
public class TracingServerCallListener<REQUEST> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<REQUEST> {
3735
private final MethodDescriptor.MethodType methodType;
3836
private final String operationPrefix;
39-
private final String operation;
40-
private final ContextCarrier contextCarrier;
4137

42-
private AbstractSpan asyncSpan;
43-
private ContextSnapshot contextSnapshot;
44-
45-
protected TracingServerCallListener(ServerCall.Listener<REQUEST> delegate, MethodDescriptor<REQUEST, ?> descriptor, ContextCarrier contextCarrier) {
38+
protected TracingServerCallListener(ServerCall.Listener<REQUEST> delegate, MethodDescriptor<REQUEST, ?> descriptor) {
4639
super(delegate);
4740
this.methodType = descriptor.getType();
4841
this.operationPrefix = OperationNameFormatUtil.formatOperationName(descriptor) + SERVER;
49-
this.operation = OperationNameFormatUtil.formatOperationName(descriptor);
50-
this.contextCarrier = contextCarrier;
5142
}
5243

5344
@Override
@@ -57,7 +48,7 @@ public void onMessage(REQUEST message) {
5748
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_MESSAGE_OPERATION_NAME);
5849
span.setComponent(ComponentsDefine.GRPC);
5950
span.setLayer(SpanLayer.RPC_FRAMEWORK);
60-
ContextManager.continued(contextSnapshot);
51+
ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
6152
try {
6253
super.onMessage(message);
6354
} catch (Throwable t) {
@@ -73,21 +64,18 @@ public void onMessage(REQUEST message) {
7364

7465
@Override
7566
public void onCancel() {
76-
if (contextSnapshot == null) {
77-
return;
78-
}
7967
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_CANCEL_OPERATION_NAME);
8068
span.setComponent(ComponentsDefine.GRPC);
8169
span.setLayer(SpanLayer.RPC_FRAMEWORK);
82-
ContextManager.continued(contextSnapshot);
70+
ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
8371
try {
8472
super.onCancel();
8573
} catch (Throwable t) {
8674
ContextManager.activeSpan().log(t);
8775
throw t;
8876
} finally {
8977
ContextManager.stopSpan();
90-
asyncSpan.asyncFinish();
78+
ServerInterceptor.ACTIVE_SPAN_KEY.get().asyncFinish();
9179
}
9280
}
9381

@@ -96,7 +84,7 @@ public void onHalfClose() {
9684
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_HALF_CLOSE_OPERATION_NAME);
9785
span.setComponent(ComponentsDefine.GRPC);
9886
span.setLayer(SpanLayer.RPC_FRAMEWORK);
99-
ContextManager.continued(contextSnapshot);
87+
ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
10088
try {
10189
super.onHalfClose();
10290
} catch (Throwable t) {
@@ -110,18 +98,10 @@ public void onHalfClose() {
11098
@Override
11199
public void onComplete() {
112100
super.onComplete();
113-
asyncSpan.asyncFinish();
114101
}
115102

116103
@Override
117104
public void onReady() {
118-
final AbstractSpan span = ContextManager.createEntrySpan(operation, contextCarrier);
119-
span.setComponent(ComponentsDefine.GRPC);
120-
span.setLayer(SpanLayer.RPC_FRAMEWORK);
121-
contextSnapshot = ContextManager.capture();
122-
asyncSpan = span.prepareForAsync();
123-
ContextManager.stopSpan(asyncSpan);
124-
125105
super.onReady();
126106
}
127107
}

0 commit comments

Comments
 (0)