Skip to content

Commit fad934c

Browse files
Implemented WebSocket support for Netty 4.1
1 parent 52b943d commit fad934c

File tree

8 files changed

+378
-10
lines changed

8 files changed

+378
-10
lines changed

dd-java-agent/instrumentation/netty-4.1-shared/src/main/java/datadog/trace/instrumentation/netty41/AttributeKeys.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import datadog.trace.api.GenericClassValue;
66
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
77
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
8+
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
89
import io.netty.handler.codec.http.HttpHeaders;
910
import io.netty.util.AttributeKey;
1011
import java.util.concurrent.ConcurrentHashMap;
@@ -33,6 +34,9 @@ public final class AttributeKeys {
3334
public static final AttributeKey<Boolean> BLOCKED_RESPONSE_KEY =
3435
attributeKey("datadog.server.blocked_response");
3536

37+
public static final AttributeKey<HandlerContext.Sender> WEBSOCKET_HANDLER_CONTEXT =
38+
attributeKey("datadog.server.websocket.handler_context");
39+
3640
/**
3741
* Generate an attribute key or reuse the one existing in the global app map. This implementation
3842
* creates attributes only once even if the current class is loaded by several class loaders and

dd-java-agent/instrumentation/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/NettyChannelPipelineInstrumentation.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818
import datadog.trace.instrumentation.netty41.client.HttpClientRequestTracingHandler;
1919
import datadog.trace.instrumentation.netty41.client.HttpClientResponseTracingHandler;
2020
import datadog.trace.instrumentation.netty41.client.HttpClientTracingHandler;
21-
import datadog.trace.instrumentation.netty41.server.HttpServerRequestTracingHandler;
22-
import datadog.trace.instrumentation.netty41.server.HttpServerResponseTracingHandler;
23-
import datadog.trace.instrumentation.netty41.server.HttpServerTracingHandler;
24-
import datadog.trace.instrumentation.netty41.server.MaybeBlockResponseHandler;
21+
import datadog.trace.instrumentation.netty41.server.*;
22+
import datadog.trace.instrumentation.netty41.server.websocket.WebSocketProtocolHandshakeHandler;
23+
import datadog.trace.instrumentation.netty41.server.websocket.WebSocketServerTracingHandler;
2524
import io.netty.channel.ChannelHandler;
2625
import io.netty.channel.ChannelHandlerContext;
2726
import io.netty.channel.ChannelPipeline;
@@ -31,6 +30,7 @@
3130
import io.netty.handler.codec.http.HttpResponseDecoder;
3231
import io.netty.handler.codec.http.HttpResponseEncoder;
3332
import io.netty.handler.codec.http.HttpServerCodec;
33+
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
3434
import io.netty.util.Attribute;
3535
import net.bytebuddy.asm.Advice;
3636
import net.bytebuddy.description.type.TypeDescription;
@@ -77,6 +77,10 @@ public String[] helperClassNames() {
7777
packageName + ".server.HttpServerResponseTracingHandler",
7878
packageName + ".server.HttpServerTracingHandler",
7979
packageName + ".server.MaybeBlockResponseHandler",
80+
packageName + ".server.websocket.WebSocketProtocolHandshakeHandler",
81+
packageName + ".server.websocket.WebSocketServerTracingHandler",
82+
packageName + ".server.websocket.WebSocketServerResponseTracingHandler",
83+
packageName + ".server.websocket.WebSocketServerRequestTracingHandler",
8084
packageName + ".NettyHttp2Helper",
8185
};
8286
}
@@ -139,15 +143,19 @@ public static void addHandler(
139143
try {
140144
ChannelHandler toAdd = null;
141145
ChannelHandler toAdd2 = null;
146+
ChannelHandler toAdd3 = null;
142147
// Server pipeline handlers
143148
if (handler instanceof HttpServerCodec) {
144149
toAdd = new HttpServerTracingHandler();
145150
toAdd2 = MaybeBlockResponseHandler.INSTANCE;
151+
toAdd3 = new WebSocketServerTracingHandler();
146152
} else if (handler instanceof HttpRequestDecoder) {
147153
toAdd = HttpServerRequestTracingHandler.INSTANCE;
148154
} else if (handler instanceof HttpResponseEncoder) {
149155
toAdd = HttpServerResponseTracingHandler.INSTANCE;
150156
toAdd2 = MaybeBlockResponseHandler.INSTANCE;
157+
} else if (handler instanceof WebSocketServerProtocolHandler) {
158+
toAdd = WebSocketProtocolHandshakeHandler.INSTANCE;
151159
} else
152160
// Client pipeline handlers
153161
if (handler instanceof HttpClientCodec) {
@@ -180,6 +188,13 @@ public static void addHandler(
180188
pipeline.remove(existing2);
181189
}
182190
pipeline.addAfter(pipeline.context(toAdd).name(), null, toAdd2);
191+
if (toAdd3 != null) {
192+
ChannelHandler existing3 = pipeline.get(toAdd3.getClass());
193+
if (existing3 != null) {
194+
pipeline.remove(existing3);
195+
}
196+
pipeline.addAfter(pipeline.context(toAdd2).name(), null, toAdd3);
197+
}
183198
}
184199
}
185200
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package datadog.trace.instrumentation.netty41.server.websocket;
2+
3+
import static datadog.trace.instrumentation.netty41.AttributeKeys.WEBSOCKET_HANDLER_CONTEXT;
4+
5+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
6+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
7+
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
8+
import io.netty.channel.ChannelHandler;
9+
import io.netty.channel.ChannelHandlerContext;
10+
import io.netty.channel.ChannelInboundHandlerAdapter;
11+
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
12+
13+
@ChannelHandler.Sharable
14+
public class WebSocketProtocolHandshakeHandler extends ChannelInboundHandlerAdapter {
15+
public static WebSocketProtocolHandshakeHandler INSTANCE =
16+
new WebSocketProtocolHandshakeHandler();
17+
18+
@Override
19+
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
20+
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
21+
// WebSocket Handshake Completed
22+
final AgentSpan current = AgentTracer.get().activeSpan();
23+
if (current != null) {
24+
ctx.channel()
25+
.attr(WEBSOCKET_HANDLER_CONTEXT)
26+
.set(
27+
new HandlerContext.Sender(
28+
current.getLocalRootSpan(), ctx.channel().id().asShortText()));
29+
}
30+
}
31+
super.userEventTriggered(ctx, evt);
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package datadog.trace.instrumentation.netty41.server.websocket;
2+
3+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
4+
import static datadog.trace.bootstrap.instrumentation.decorator.WebsocketDecorator.DECORATE;
5+
import static datadog.trace.instrumentation.netty41.AttributeKeys.WEBSOCKET_HANDLER_CONTEXT;
6+
7+
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
8+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
9+
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
10+
import io.netty.channel.*;
11+
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
12+
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
13+
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
14+
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
15+
16+
@ChannelHandler.Sharable
17+
public class WebSocketServerRequestTracingHandler extends ChannelInboundHandlerAdapter {
18+
public static WebSocketServerRequestTracingHandler INSTANCE =
19+
new WebSocketServerRequestTracingHandler();
20+
21+
@Override
22+
public void channelRead(ChannelHandlerContext ctx, Object frame) {
23+
24+
if (!(frame instanceof WebSocketFrame)) {
25+
ctx.fireChannelRead(frame);
26+
return;
27+
}
28+
29+
Channel channel = ctx.channel();
30+
final HandlerContext.Sender sessionState = channel.attr(WEBSOCKET_HANDLER_CONTEXT).get();
31+
if (sessionState == null) {
32+
return;
33+
}
34+
35+
final HandlerContext.Receiver handlerContext =
36+
new HandlerContext.Receiver(
37+
sessionState.getHandshakeSpan(), ctx.channel().id().asShortText());
38+
39+
if (frame instanceof TextWebSocketFrame) {
40+
// WebSocket Read Text Start
41+
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
42+
43+
final AgentSpan span =
44+
DECORATE.onReceiveFrameStart(
45+
handlerContext, textFrame.text(), textFrame.isFinalFragment());
46+
if (span == null) {
47+
ctx.fireChannelRead(textFrame);
48+
} else {
49+
try (final AgentScope scope = activateSpan(span)) {
50+
ctx.fireChannelRead(textFrame);
51+
52+
// WebSocket Read Text Start
53+
if (textFrame.isFinalFragment()) {
54+
DECORATE.onFrameEnd(handlerContext);
55+
}
56+
}
57+
}
58+
return;
59+
}
60+
61+
if (frame instanceof BinaryWebSocketFrame) {
62+
// WebSocket Read Binary Start
63+
BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
64+
final AgentSpan span =
65+
DECORATE.onReceiveFrameStart(
66+
handlerContext, binaryFrame.content().nioBuffer(), binaryFrame.isFinalFragment());
67+
if (span == null) {
68+
ctx.fireChannelRead(binaryFrame);
69+
} else {
70+
try (final AgentScope scope = activateSpan(span)) {
71+
ctx.fireChannelRead(binaryFrame);
72+
73+
// WebSocket Read Binary End
74+
if (binaryFrame.isFinalFragment()) {
75+
DECORATE.onFrameEnd(handlerContext);
76+
}
77+
}
78+
}
79+
return;
80+
}
81+
82+
if (frame instanceof CloseWebSocketFrame) {
83+
// WebSocket Closed by client
84+
CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
85+
int statusCode = closeFrame.statusCode();
86+
String reasonText = closeFrame.reasonText();
87+
channel.attr(WEBSOCKET_HANDLER_CONTEXT).remove();
88+
final AgentSpan span =
89+
DECORATE.onSessionCloseReceived(handlerContext, reasonText, statusCode);
90+
if (span == null) {
91+
ctx.fireChannelRead(closeFrame);
92+
} else {
93+
try (final AgentScope scope = activateSpan(span)) {
94+
ctx.fireChannelRead(closeFrame);
95+
96+
if (closeFrame.isFinalFragment()) {
97+
DECORATE.onFrameEnd(handlerContext);
98+
}
99+
}
100+
}
101+
}
102+
}
103+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package datadog.trace.instrumentation.netty41.server.websocket;
2+
3+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
4+
import static datadog.trace.bootstrap.instrumentation.decorator.WebsocketDecorator.DECORATE;
5+
import static datadog.trace.bootstrap.instrumentation.websocket.HandlersExtractor.MESSAGE_TYPE_BINARY;
6+
import static datadog.trace.bootstrap.instrumentation.websocket.HandlersExtractor.MESSAGE_TYPE_TEXT;
7+
import static datadog.trace.instrumentation.netty41.AttributeKeys.WEBSOCKET_HANDLER_CONTEXT;
8+
9+
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
10+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
11+
import datadog.trace.bootstrap.instrumentation.websocket.HandlerContext;
12+
import io.netty.channel.*;
13+
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
14+
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
15+
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
16+
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
17+
18+
@ChannelHandler.Sharable
19+
public class WebSocketServerResponseTracingHandler extends ChannelOutboundHandlerAdapter {
20+
public static WebSocketServerResponseTracingHandler INSTANCE =
21+
new WebSocketServerResponseTracingHandler();
22+
23+
@Override
24+
public void write(ChannelHandlerContext ctx, Object frame, ChannelPromise promise)
25+
throws Exception {
26+
if (!(frame instanceof WebSocketFrame)) {
27+
ctx.write(frame, promise);
28+
return;
29+
}
30+
31+
Channel channel = ctx.channel();
32+
HandlerContext.Sender handlerContext = channel.attr(WEBSOCKET_HANDLER_CONTEXT).get();
33+
if (handlerContext == null) {
34+
ctx.write(frame, promise);
35+
return;
36+
}
37+
38+
if (frame instanceof TextWebSocketFrame) {
39+
// WebSocket Write Text Start
40+
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
41+
final AgentSpan span =
42+
DECORATE.onSendFrameStart(handlerContext, MESSAGE_TYPE_TEXT, textFrame.text().length());
43+
if (span == null) {
44+
ctx.write(frame, promise);
45+
} else {
46+
try (final AgentScope scope = activateSpan(span)) {
47+
ctx.write(frame, promise);
48+
49+
// WebSocket Write Text End
50+
if (textFrame.isFinalFragment()) {
51+
DECORATE.onFrameEnd(handlerContext);
52+
}
53+
}
54+
}
55+
return;
56+
}
57+
58+
if (frame instanceof BinaryWebSocketFrame) {
59+
// WebSocket Write Binary Start
60+
BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
61+
final AgentSpan span =
62+
DECORATE.onSendFrameStart(
63+
handlerContext, MESSAGE_TYPE_BINARY, binaryFrame.content().readableBytes());
64+
if (span == null) {
65+
ctx.write(frame, promise);
66+
} else {
67+
try (final AgentScope scope = activateSpan(span)) {
68+
ctx.write(frame, promise);
69+
70+
// WebSocket Write Binary End
71+
if (binaryFrame.isFinalFragment()) {
72+
DECORATE.onFrameEnd(handlerContext);
73+
}
74+
}
75+
}
76+
return;
77+
}
78+
79+
if (frame instanceof CloseWebSocketFrame) {
80+
// WebSocket Closed by Server
81+
CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
82+
int statusCode = closeFrame.statusCode();
83+
String reasonText = closeFrame.reasonText();
84+
channel.attr(WEBSOCKET_HANDLER_CONTEXT).remove();
85+
final AgentSpan span = DECORATE.onSessionCloseIssued(handlerContext, reasonText, statusCode);
86+
if (span == null) {
87+
ctx.write(frame, promise);
88+
} else {
89+
try (final AgentScope scope = activateSpan(span)) {
90+
ctx.write(frame, promise);
91+
if (closeFrame.isFinalFragment()) {
92+
DECORATE.onFrameEnd(handlerContext);
93+
}
94+
}
95+
}
96+
}
97+
}
98+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package datadog.trace.instrumentation.netty41.server.websocket;
2+
3+
import io.netty.channel.CombinedChannelDuplexHandler;
4+
5+
public class WebSocketServerTracingHandler
6+
extends CombinedChannelDuplexHandler<
7+
WebSocketServerRequestTracingHandler, WebSocketServerResponseTracingHandler> {
8+
9+
public WebSocketServerTracingHandler() {
10+
super(
11+
WebSocketServerRequestTracingHandler.INSTANCE,
12+
WebSocketServerResponseTracingHandler.INSTANCE);
13+
}
14+
}

0 commit comments

Comments
 (0)