Skip to content

Commit

Permalink
Netty4: handle read timeout (#3613)
Browse files Browse the repository at this point in the history
* Netty4: handle read timeout

* spotless
  • Loading branch information
laurit authored Jul 21, 2021
1 parent a55e048 commit 1197480
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@

package io.opentelemetry.javaagent.instrumentation.netty.v4_0;

import static io.opentelemetry.javaagent.instrumentation.netty.v4_0.server.NettyHttpServerTracer.tracer;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Attribute;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.NettyHttpClientTracer;
import io.opentelemetry.javaagent.instrumentation.netty.v4_0.server.NettyHttpServerTracer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand All @@ -32,19 +36,29 @@ public ElementMatcher<TypeDescription> typeMatcher() {
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(named("notifyHandlerException"))
.and(named("invokeExceptionCaught"))
.and(takesArgument(0, named(Throwable.class.getName()))),
AbstractChannelHandlerContextInstrumentation.class.getName()
+ "$NotifyHandlerExceptionAdvice");
+ "$InvokeExceptionCaughtAdvice");
}

@SuppressWarnings("unused")
public static class NotifyHandlerExceptionAdvice {
public static class InvokeExceptionCaughtAdvice {

@Advice.OnMethodEnter
public static void onEnter(@Advice.Argument(0) Throwable throwable) {
public static void onEnter(
@Advice.This ChannelHandlerContext channelContext,
@Advice.Argument(0) Throwable throwable) {
if (throwable != null) {
tracer().onException(Java8BytecodeBridge.currentContext(), throwable);
Attribute<Context> clientContextAttr =
channelContext.channel().attr(AttributeKeys.CLIENT_CONTEXT);
Context context = clientContextAttr.get();
if (context != null) {
NettyHttpClientTracer.tracer().endExceptionally(context, throwable);
} else {
NettyHttpServerTracer.tracer()
.onException(Java8BytecodeBridge.currentContext(), throwable);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.codec.http.HttpHeaders
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.timeout.ReadTimeoutHandler
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
Expand All @@ -31,24 +32,39 @@ class Netty40ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup()

@Shared
private Bootstrap bootstrap
private Bootstrap bootstrap = buildBootstrap()

def setupSpec() {
bootstrap = new Bootstrap()
@Shared
private Bootstrap readTimeoutBootstrap = buildBootstrap(true)

def cleanupSpec() {
eventLoopGroup?.shutdownGracefully()
}

Bootstrap buildBootstrap(boolean readTimeout = false) {
Bootstrap bootstrap = new Bootstrap()
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline()
if (readTimeout) {
pipeline.addLast(new ReadTimeoutHandler(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS))
}
pipeline.addLast(new HttpClientCodec())
}
})

return bootstrap
}

def cleanupSpec() {
eventLoopGroup?.shutdownGracefully()
Bootstrap getBootstrap(URI uri) {
if (uri.getPath() == "/read-timeout") {
return readTimeoutBootstrap
}
return bootstrap
}

@Override
Expand All @@ -62,7 +78,7 @@ class Netty40ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement

@Override
int sendRequest(DefaultFullHttpRequest request, String method, URI uri, Map<String, String> headers) {
def channel = bootstrap.connect(uri.host, getPort(uri)).sync().channel()
def channel = getBootstrap(uri).connect(uri.host, getPort(uri)).sync().channel()
def result = new CompletableFuture<Integer>()
channel.pipeline().addLast(new ClientHandler(result))
channel.writeAndFlush(request).get()
Expand All @@ -73,7 +89,7 @@ class Netty40ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
void sendRequestWithCallback(DefaultFullHttpRequest request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
Channel ch
try {
ch = bootstrap.connect(uri.host, getPort(uri)).sync().channel()
ch = getBootstrap(uri).connect(uri.host, getPort(uri)).sync().channel()
} catch (Exception exception) {
requestResult.complete(exception)
return
Expand Down Expand Up @@ -121,4 +137,9 @@ class Netty40ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
boolean testHttps() {
false
}

@Override
boolean testReadTimeout() {
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

package io.opentelemetry.javaagent.instrumentation.netty.v4_1;

import static io.opentelemetry.javaagent.instrumentation.netty.v4_1.server.NettyHttpServerTracer.tracer;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Attribute;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.netty.v4_1.AttributeKeys;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.client.NettyHttpClientTracer;
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.server.NettyHttpServerTracer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand All @@ -38,9 +43,18 @@ public void transform(TypeTransformer transformer) {
public static class InvokeExceptionCaughtAdvice {

@Advice.OnMethodEnter
public static void onEnter(@Advice.Argument(0) Throwable throwable) {
public static void onEnter(
@Advice.This ChannelHandlerContext channelContext,
@Advice.Argument(0) Throwable throwable) {
if (throwable != null) {
tracer().onException(Java8BytecodeBridge.currentContext(), throwable);
if (channelContext.channel().hasAttr(AttributeKeys.CLIENT_CONTEXT)) {
Attribute<Context> clientContextAttr =
channelContext.channel().attr(AttributeKeys.CLIENT_CONTEXT);
NettyHttpClientTracer.tracer().endExceptionally(clientContextAttr.get(), throwable);
} else {
NettyHttpServerTracer.tracer()
.onException(Java8BytecodeBridge.currentContext(), throwable);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.timeout.ReadTimeoutHandler
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.instrumentation.test.AgentTestTrait
Expand All @@ -48,11 +49,14 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
@Shared
private Bootstrap httpsBootstrap = buildBootstrap(true)

@Shared
private Bootstrap readTimeoutBootstrap = buildBootstrap(false, true)

def cleanupSpec() {
eventLoopGroup?.shutdownGracefully()
}

Bootstrap buildBootstrap(boolean https) {
Bootstrap buildBootstrap(boolean https, boolean readTimeout = false) {
Bootstrap bootstrap = new Bootstrap()
bootstrap.group(eventLoopGroup)
.channel(getChannelClass())
Expand All @@ -65,6 +69,9 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
SslContext sslContext = SslContextBuilder.forClient().build()
pipeline.addLast(sslContext.newHandler(socketChannel.alloc()))
}
if (readTimeout) {
pipeline.addLast(new ReadTimeoutHandler(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS))
}
pipeline.addLast(new HttpClientCodec())
}
})
Expand All @@ -81,7 +88,12 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
}

Bootstrap getBootstrap(URI uri) {
return uri.getScheme() == "https" ? httpsBootstrap : bootstrap
if (uri.getScheme() == "https") {
return httpsBootstrap
} else if (uri.getPath() == "/read-timeout") {
return readTimeoutBootstrap
}
return bootstrap
}

@Override
Expand Down Expand Up @@ -144,6 +156,11 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
false
}

@Override
boolean testReadTimeout() {
true
}

def "test connection reuse and second request with lazy execute"() {
setup:
//Create a simple Netty pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ class RatpackForkedHttpClientTest extends RatpackHttpClientTest {
def resp = client.request(uri) { spec ->
// Connect timeout for the whole client was added in 1.5 so we need to add timeout for each request
spec.connectTimeout(Duration.ofSeconds(2))
if (uri.getPath() == "/read-timeout") {
spec.readTimeout(Duration.ofMillis(READ_TIMEOUT_MS))
}
spec.method(method)
spec.headers { headersSpec ->
headers.entrySet().each {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package client

import io.netty.channel.ConnectTimeoutException
import io.netty.handler.timeout.ReadTimeoutException
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.asserts.SpanAssert
Expand Down Expand Up @@ -81,6 +82,9 @@ class RatpackHttpClientTest extends HttpClientTest<Void> implements AgentTestTra
def resp = client.request(uri) { spec ->
// Connect timeout for the whole client was added in 1.5 so we need to add timeout for each request
spec.connectTimeout(Duration.ofSeconds(2))
if (uri.getPath() == "/read-timeout") {
spec.readTimeout(Duration.ofMillis(READ_TIMEOUT_MS))
}
spec.method(method)
spec.headers { headersSpec ->
headers.entrySet().each {
Expand Down Expand Up @@ -119,10 +123,13 @@ class RatpackHttpClientTest extends HttpClientTest<Void> implements AgentTestTra

@Override
void assertClientSpanErrorEvent(SpanAssert spanAssert, URI uri, Throwable exception) {
switch (uri.toString()) {
case "https://192.0.2.1/": // non routable address
spanAssert.errorEvent(ConnectTimeoutException, ~/connection timed out:/)
return
// non routable address
if (uri.toString() == "https://192.0.2.1/") {
spanAssert.errorEvent(ConnectTimeoutException, ~/connection timed out:/)
return
} else if (uri.getPath() == "/read-timeout") {
spanAssert.errorEvent(ReadTimeoutException)
return
}
super.assertClientSpanErrorEvent(spanAssert, uri, exception)
}
Expand All @@ -147,4 +154,9 @@ class RatpackHttpClientTest extends HttpClientTest<Void> implements AgentTestTra
// these tests will pass, but they don't really test anything since REQUEST is Void
false
}

@Override
boolean testReadTimeout() {
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import spock.lang.Unroll
abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
protected static final BODY_METHODS = ["POST", "PUT"]
protected static final CONNECT_TIMEOUT_MS = 5000
protected static final READ_TIMEOUT_MS = 2000
protected static final BASIC_AUTH_KEY = "custom-authorization-header"
protected static final BASIC_AUTH_VAL = "plain text auth token"

Expand Down Expand Up @@ -108,6 +109,11 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
.service("/to-secured") {ctx, req ->
HttpResponse.ofRedirect(HttpStatus.FOUND, "/secured")
}
.service("/read-timeout") {ctx, req ->
Thread.sleep(READ_TIMEOUT_MS * 5)
ResponseHeadersBuilder headers = ResponseHeaders.builder(HttpStatus.OK)
HttpResponse.of(headers.build(), HttpData.ofAscii("Should have timed out."))
}
.decorator(new DecoratingHttpServiceFunction() {
@Override
HttpResponse serve(HttpService delegate, ServiceRequestContext ctx, HttpRequest req) {
Expand Down Expand Up @@ -745,6 +751,31 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
method = "HEAD"
}

def "read timeout"() {
given:
assumeTrue(testReadTimeout())
def uri = resolveAddress("/read-timeout")

when:
runWithSpan("parent") {
doRequest(method, uri)
}

then:
def ex = thrown(Exception)
def thrownException = ex instanceof ExecutionException ? ex.cause : ex
assertTraces(1) {
trace(0, 3) {
basicSpan(it, 0, "parent", null, thrownException)
clientSpan(it, 1, span(0), method, uri, null, thrownException)
serverSpan(it, 2, span(1))
}
}

where:
method = "GET"
}

// IBM JVM has different protocol support for TLS
@Requires({ !System.getProperty("java.vm.name").contains("IBM J9 VM") })
def "test https request"() {
Expand Down Expand Up @@ -1067,6 +1098,10 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
true
}

boolean testReadTimeout() {
false
}

boolean testRemoteConnection() {
true
}
Expand Down

0 comments on commit 1197480

Please sign in to comment.