Skip to content

[7.x] ThreadContext response header leak #68651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.netty.channel.SimpleChannelInboundHandler;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.transport.Transports;

@ChannelHandler.Sharable
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest> {
Expand All @@ -25,6 +26,8 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest httpRequest) {
assert Transports.assertDefaultThreadContext(serverTransport.getThreadPool().getThreadContext());
assert Transports.assertTransportThread();
final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
boolean success = false;
try {
Expand All @@ -40,6 +43,8 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest http
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
assert Transports.assertDefaultThreadContext(serverTransport.getThreadPool().getThreadContext());

Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
if (cause instanceof Error) {
serverTransport.onException(channel, new Exception(cause));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,4 +533,8 @@ private static ActionListener<Void> earlyResponseListener(HttpRequest request, H
return NO_OP;
}
}

public ThreadPool getThreadPool() {
return threadPool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ public void sendResponse(RestResponse restResponse) {

ActionListener<Void> listener = ActionListener.wrap(() -> Releasables.close(toClose));
httpChannel.sendResponse(httpResponse, listener);
// try (ThreadContext.StoredContext existing = threadContext.stashContext()) {
// httpChannel.sendResponse(httpResponse, listener);
// }
success = true;
} finally {
if (success == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ public static boolean assertNotTransportThread(String reason) {
public static boolean assertDefaultThreadContext(ThreadContext threadContext) {
assert threadContext.getRequestHeadersOnly().isEmpty() ||
threadContext.getRequestHeadersOnly().size() == 1 && threadContext.getRequestHeadersOnly().containsKey(Task.X_OPAQUE_ID) :
"expected empty context but was " + threadContext.getRequestHeadersOnly() + " on " + Thread.currentThread().getName();
"[request headers] expected empty context but was " + threadContext.getRequestHeadersOnly()
+ " "+threadContext.getResponseHeaders() + " on " + Thread.currentThread().getName();
assert threadContext.getResponseHeaders().isEmpty() :
"[response headers] expected empty context but was " + threadContext.getResponseHeaders()
+ " on " + Thread.currentThread().getName();
return true;
}
}