Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow customization of netty channel handles before and during decompression #10261

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0925b7e
Make the decompressor extensible and allow for a plugin to define a s…
cwperks Sep 27, 2023
a8d9b73
Add to CHANGELOG
cwperks Sep 27, 2023
5188697
Use getMethods
cwperks Sep 28, 2023
cd9e72f
Create new instance of each inbound handler
cwperks Sep 28, 2023
bf2d707
Update name
cwperks Sep 28, 2023
bfc3ceb
Update name
cwperks Sep 28, 2023
2584e94
Update test
cwperks Sep 28, 2023
e02a8a3
Add netty request tests
cwperks Sep 28, 2023
0a0214d
Merge branch 'main' into improve-compressed-requests
cwperks Sep 28, 2023
104c512
Add test for createRestRequest
cwperks Sep 28, 2023
aec43e9
Very basic header validator
peternied Sep 28, 2023
46f3e4a
Revert "Very basic header validator"
cwperks Sep 29, 2023
16ecd7f
Remove createDecompressor extension point in favor of attributeKey th…
cwperks Sep 29, 2023
e6209c7
Minor update
cwperks Sep 29, 2023
54a0a96
Match previous name
cwperks Sep 29, 2023
f4eb416
Add license header
cwperks Sep 29, 2023
226299a
Back out DelegatingRestHandler changes to simplify this PR and follow…
cwperks Sep 29, 2023
4689e30
Small update to test
cwperks Sep 29, 2023
c227e6e
remove printStackTrace
cwperks Sep 29, 2023
a83c64f
Merge branch 'main' into improve-compressed-requests
cwperks Oct 3, 2023
aec3ad3
Remove channel attributes that are request specific
cwperks Oct 4, 2023
01dfa89
Move new AttributeKeys to security plugin
cwperks Oct 4, 2023
a1d6968
Merge branch 'main' into improve-compressed-requests
cwperks Oct 4, 2023
3085f64
Add charset
cwperks Oct 4, 2023
7ca4c7e
Add javadoc on new extension points
cwperks Oct 5, 2023
4c49159
Merge branch 'main' into improve-compressed-requests
cwperks Oct 5, 2023
23feffd
Merge branch 'improve-compressed-requests' of https://github.com/cwpe…
cwperks Oct 5, 2023
02b92ab
Single request class
cwperks Oct 5, 2023
5af481b
Revert access modifier changes
cwperks Oct 5, 2023
91fc5bc
Spotless
cwperks Oct 5, 2023
ddaca29
Remove createRestRequest changes in favor of new security rest channe…
cwperks Oct 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert "Very basic header validator"
This reverts commit aec43e9.

Signed-off-by: Craig Perkins <cwperx@amazon.com>
  • Loading branch information
cwperks committed Sep 29, 2023
commit 46f3e4a8ff9262d0b3f61e7c0e1170402ddfd7b4
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,18 @@
import org.opensearch.test.OpenSearchIntegTestCase.Scope;

import java.util.Collection;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.stream.IntStream;

import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.ReferenceCounted;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasSize;

import io.netty.handler.codec.http2.HttpConversionUtil;
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;

@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class Netty4Http2IT extends OpenSearchNetty4IntegTestCase {

Expand Down Expand Up @@ -64,30 +56,6 @@ public void testThatNettyHttpServerSupportsHttp2GetUpgrades() throws Exception {
}
}


public void testThatNettyHttpServerRequestBlockedWithHeaderVerifier() throws Exception {
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = randomFrom(boundAddresses);

final FullHttpRequest blockedRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
blockedRequest.headers().add("blockme", "Not Allowed");
blockedRequest.headers().add(HOST, "localhost");
blockedRequest.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), "http");


final List<FullHttpResponse> responses = new ArrayList<>();
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http2() ) {
try {
FullHttpResponse blockedResponse = nettyHttpClient.send(transportAddress.address(), blockedRequest);
responses.add(blockedResponse);
assertThat(blockedResponse.status().code(), equalTo(401));
} finally {
responses.forEach(ReferenceCounted::release);
}
}
}

public void testThatNettyHttpServerSupportsHttp2PostUpgrades() throws Exception {
final List<Tuple<String, CharSequence>> requests = List.of(Tuple.tuple("/_search", "{\"query\":{ \"match_all\":{}}}"));

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.http.HttpPipelinedRequest;
import org.opensearch.rest.RestHandlerContext;
import org.opensearch.rest.RestResponse;

import io.netty.channel.ChannelHandler;
Expand All @@ -53,9 +54,12 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest httpRequest) {
final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
final RestResponse earlyResponse = ctx.channel().attr(Netty4HttpServerTransport.EARLY_RESPONSE).get();
peternied marked this conversation as resolved.
Show resolved Hide resolved
final ThreadContext.StoredContext contextToRestore = ctx.channel().attr(Netty4HttpServerTransport.CONTEXT_TO_RESTORE).get();
peternied marked this conversation as resolved.
Show resolved Hide resolved
final RestHandlerContext requestContext = new RestHandlerContext(earlyResponse, contextToRestore);
boolean success = false;
try {
serverTransport.incomingRequest(httpRequest, channel);
serverTransport.incomingRequest(httpRequest, channel, requestContext);
success = true;
} finally {
if (success == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.common.util.net.NetUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
Expand All @@ -52,6 +53,7 @@
import org.opensearch.http.HttpHandlingSettings;
import org.opensearch.http.HttpReadTimeoutException;
import org.opensearch.http.HttpServerChannel;
import org.opensearch.rest.RestResponse;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.NettyAllocator;
Expand Down Expand Up @@ -334,11 +336,16 @@ public ChannelHandler configureServerChannelHandler() {
return new HttpChannelHandler(this, handlingSettings);
}

protected static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("opensearch-http-channel");
public static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("opensearch-http-channel");
peternied marked this conversation as resolved.
Show resolved Hide resolved
protected static final AttributeKey<Netty4HttpServerChannel> HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance(
"opensearch-http-server-channel"
);

public static final AttributeKey<RestResponse> EARLY_RESPONSE = AttributeKey.newInstance("opensearch-http-early-response");
public static final AttributeKey<ThreadContext.StoredContext> CONTEXT_TO_RESTORE = AttributeKey.newInstance(
"opensearch-http-request-thread-context"
);

protected static class HttpChannelHandler extends ChannelInitializer<Channel> {

private final Netty4HttpServerTransport transport;
Expand Down Expand Up @@ -420,14 +427,10 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws E
final ChannelPipeline pipeline = ctx.pipeline();
pipeline.addAfter(ctx.name(), "handler", getRequestHandler());
pipeline.replace(this, "header_verifier", transport.createHeaderVerifier());
peternied marked this conversation as resolved.
Show resolved Hide resolved
pipeline.addAfter("header_verifier", "decoder_compress", new HttpContentDecompressor());
pipeline.addAfter("decoder_compress", "aggregator", aggregator);
pipeline.addAfter("header_verifier", "decompress", transport.createDecompressor());
pipeline.addAfter("decompress", "aggregator", aggregator);
if (handlingSettings.isCompression()) {
pipeline.addAfter(
"aggregator",
"encoder_compress",
new HttpContentCompressor(handlingSettings.getCompressionLevel())
);
pipeline.addAfter("aggregator", "compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
pipeline.addBefore("handler", "request_creator", requestCreator);
pipeline.addBefore("handler", "response_creator", responseCreator);
Expand All @@ -447,13 +450,13 @@ protected void configureDefaultHttpPipeline(ChannelPipeline pipeline) {
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
pipeline.addLast("decoder", decoder);
pipeline.addLast("header_verifier", transport.createHeaderVerifier());
peternied marked this conversation as resolved.
Show resolved Hide resolved
pipeline.addLast("decoder_compress", new HttpContentDecompressor());
pipeline.addLast("decompress", transport.createDecompressor());
pipeline.addLast("encoder", new HttpResponseEncoder());
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
pipeline.addLast("aggregator", aggregator);
if (handlingSettings.isCompression()) {
pipeline.addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
pipeline.addLast("compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
pipeline.addLast("request_creator", requestCreator);
pipeline.addLast("response_creator", responseCreator);
Expand Down Expand Up @@ -488,18 +491,16 @@ protected void initChannel(Channel childChannel) throws Exception {

final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);

childChannel.pipeline()
.addLast(new LoggingHandler(LogLevel.DEBUG))
.addLast(new Http2StreamFrameToHttpObjectCodec(true))
.addLast("byte_buf_sizer", byteBufSizer)
.addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS))
.addLast("header_verifier", transport.createHeaderVerifier())
.addLast("decoder_decompress", new HttpContentDecompressor());
.addLast("decompress", transport.createDecompressor());

if (handlingSettings.isCompression()) {
childChannel.pipeline()
.addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
childChannel.pipeline().addLast("compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}

childChannel.pipeline()
Expand Down Expand Up @@ -534,9 +535,12 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}
}

protected HttpContentDecompressor createDecompressor() {
peternied marked this conversation as resolved.
Show resolved Hide resolved
return new HttpContentDecompressor();
}

protected ChannelInboundHandlerAdapter createHeaderVerifier() {
peternied marked this conversation as resolved.
Show resolved Hide resolved
peternied marked this conversation as resolved.
Show resolved Hide resolved
return new Netty4HeaderVerifier();
// pass-through
// return new ChannelInboundHandlerAdapter();
return new ChannelInboundHandlerAdapter();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.nio.SocketChannelContext;
import org.opensearch.nio.TaskScheduler;
import org.opensearch.nio.WriteOperation;
import org.opensearch.rest.RestHandlerContext;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -172,7 +173,7 @@ private void handleRequest(Object msg) {
final HttpPipelinedRequest pipelinedRequest = (HttpPipelinedRequest) msg;
boolean success = false;
try {
transport.incomingRequest(pipelinedRequest, nioHttpChannel);
transport.incomingRequest(pipelinedRequest, nioHttpChannel, RestHandlerContext.EMPTY);
peternied marked this conversation as resolved.
Show resolved Hide resolved
success = true;
} finally {
if (success == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.nio.InboundChannelBuffer;
import org.opensearch.nio.SocketChannelContext;
import org.opensearch.nio.TaskScheduler;
import org.opensearch.rest.RestHandlerContext;
import org.opensearch.rest.RestRequest;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;
Expand Down Expand Up @@ -101,7 +102,7 @@ public void setMocks() {
doAnswer(invocation -> {
((HttpRequest) invocation.getArguments()[0]).releaseAndCopy();
return null;
}).when(transport).incomingRequest(any(HttpRequest.class), any(HttpChannel.class));
}).when(transport).incomingRequest(any(HttpRequest.class), any(HttpChannel.class), any(RestHandlerContext.class));
Settings settings = Settings.builder().put(SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), new ByteSizeValue(1024)).build();
HttpHandlingSettings httpHandlingSettings = HttpHandlingSettings.fromSettings(settings);
channel = mock(NioHttpChannel.class);
Expand All @@ -122,12 +123,12 @@ public void testSuccessfulDecodeHttpRequest() throws IOException {
try {
handler.consumeReads(toChannelBuffer(slicedBuf));

verify(transport, times(0)).incomingRequest(any(HttpRequest.class), any(NioHttpChannel.class));
verify(transport, times(0)).incomingRequest(any(HttpRequest.class), any(NioHttpChannel.class), any(RestHandlerContext.class));

handler.consumeReads(toChannelBuffer(slicedBuf2));

ArgumentCaptor<HttpRequest> requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
verify(transport).incomingRequest(requestCaptor.capture(), any(NioHttpChannel.class));
verify(transport).incomingRequest(requestCaptor.capture(), any(NioHttpChannel.class), any(RestHandlerContext.class));

HttpRequest nioHttpRequest = requestCaptor.getValue();
assertEquals(HttpRequest.HttpVersion.HTTP_1_1, nioHttpRequest.protocolVersion());
Expand All @@ -153,7 +154,7 @@ public void testDecodeHttpRequestError() throws IOException {
handler.consumeReads(toChannelBuffer(buf));

ArgumentCaptor<HttpRequest> requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
verify(transport).incomingRequest(requestCaptor.capture(), any(NioHttpChannel.class));
verify(transport).incomingRequest(requestCaptor.capture(), any(NioHttpChannel.class), any(RestHandlerContext.class));

assertNotNull(requestCaptor.getValue().getInboundException());
assertTrue(requestCaptor.getValue().getInboundException() instanceof IllegalArgumentException);
Expand All @@ -174,7 +175,7 @@ public void testDecodeHttpRequestContentLengthToLongGeneratesOutboundMessage() t
} finally {
buf.release();
}
verify(transport, times(0)).incomingRequest(any(), any());
verify(transport, times(0)).incomingRequest(any(), any(), any(RestHandlerContext.class));

List<FlushOperation> flushOperations = handler.pollFlushOperations();
assertFalse(flushOperations.isEmpty());
Expand Down Expand Up @@ -280,7 +281,7 @@ private void prepareHandlerForResponse(HttpReadWriteHandler handler) throws IOEx
}

ArgumentCaptor<HttpPipelinedRequest> requestCaptor = ArgumentCaptor.forClass(HttpPipelinedRequest.class);
verify(transport, atLeastOnce()).incomingRequest(requestCaptor.capture(), any(HttpChannel.class));
verify(transport, atLeastOnce()).incomingRequest(requestCaptor.capture(), any(HttpChannel.class), any(RestHandlerContext.class));

HttpRequest httpRequest = requestCaptor.getValue();
assertNotNull(httpRequest);
Expand Down
Loading