Skip to content

Commit

Permalink
fix #218 add predicate based http server compression
Browse files Browse the repository at this point in the history
HttpServerBuilder now exposes a compression(BiPredicate<Req,Res>).
The predicate is evaluated before committing response header/status.
If the predicate is true, compression is enabled for the given response.
The minimum compression threshold now uses content-length predicate.
  • Loading branch information
Stephane Maldini authored and smaldini committed Feb 28, 2018
1 parent 015c89b commit 288a3bc
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 17 deletions.
8 changes: 2 additions & 6 deletions src/main/java/reactor/ipc/netty/http/HttpOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ public Mono<Void> then() {
msg = outboundHttpMessage();
}


checkIfNotPersistent();
preSendHeadersAndStatus();

return channel().writeAndFlush(msg);
}
Expand All @@ -141,10 +140,7 @@ public Mono<Void> then() {
});
}

protected void checkIfNotPersistent(){
//default doesn't imply anything - only server usually implies if connection
// should default persist (keep-alive) when response is not self defined
}
protected abstract void preSendHeadersAndStatus();

protected abstract HttpMessage newFullEmptyBodyMessage();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,11 @@ else if (version.equals(HttpVersion.HTTP_1_1)) {
throw new IllegalStateException(version.protocolName() + " not supported");
}

@Override
protected void preSendHeadersAndStatus() {
//Noop
}

@Override
protected void onHandlerStart() {
applyHandler();
Expand Down
54 changes: 51 additions & 3 deletions src/main/java/reactor/ipc/netty/http/server/HttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Supplier;

import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.NetUtil;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
Expand Down Expand Up @@ -233,6 +234,43 @@ public void startRouterAndAwait(Consumer<? super HttpServerRoutes> routesBuilder

static final LoggingHandler loggingHandler = new LoggingHandler(HttpServer.class);

static BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate(
HttpServerOptions options) {

int minResponseSize = options.minCompressionResponseSize();

if (minResponseSize < 0) {
return null;
}

if (minResponseSize == 0) {
return options.compressionPredicate();
}

BiPredicate<HttpServerRequest, HttpServerResponse> lengthPredicate =
(req, res) -> {
String length = res.responseHeaders()
.get(HttpHeaderNames.CONTENT_LENGTH);

if (length == null) {
return true;
}

try {
return Long.parseLong(length) >= minResponseSize;
}
catch (NumberFormatException nfe) {
return true;
}
};

if (options.compressionPredicate() == null) {
return lengthPredicate;
}

return lengthPredicate.and(options.compressionPredicate());
}

final class TcpBridgeServer extends TcpServer
implements BiConsumer<ChannelPipeline, ContextHandler<Channel>> {

Expand All @@ -244,14 +282,24 @@ final class TcpBridgeServer extends TcpServer
protected ContextHandler<Channel> doHandler(
BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler,
MonoSink<NettyContext> sink) {

BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate =
compressPredicate(options);

boolean alwaysCompress = compressPredicate == null && options.minCompressionResponseSize() == 0;

return ContextHandler.newServerContext(sink,
options,
loggingHandler,
(ch, c, msg) -> {

HttpServerOperations ops = HttpServerOperations.bindHttp(ch, handler, c, msg);
HttpServerOperations ops = HttpServerOperations.bindHttp(ch,
handler,
c,
compressPredicate,
msg);

if(options.minCompressionResponseSize() >= 0) {
if (alwaysCompress) {
ops.compression(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -78,15 +79,18 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
static HttpServerOperations bindHttp(Channel channel,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler,
ContextHandler<?> context,
BiPredicate<HttpServerRequest, HttpServerResponse> compressionPredicate,
Object msg) {
return new HttpServerOperations(channel, handler, context, (HttpRequest) msg);
return new HttpServerOperations(channel, handler, context, compressionPredicate,(HttpRequest) msg);
}

final HttpResponse nettyResponse;
final HttpHeaders responseHeaders;
final Cookies cookieHolder;
final HttpRequest nettyRequest;

final BiPredicate<HttpServerRequest, HttpServerResponse> compressionPredicate;

Function<? super String, Map<String, String>> paramsResolver;

HttpServerOperations(Channel ch, HttpServerOperations replaced) {
Expand All @@ -96,16 +100,19 @@ static HttpServerOperations bindHttp(Channel channel,
this.nettyResponse = replaced.nettyResponse;
this.paramsResolver = replaced.paramsResolver;
this.nettyRequest = replaced.nettyRequest;
this.compressionPredicate = replaced.compressionPredicate;
}

HttpServerOperations(Channel ch,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler,
ContextHandler<?> context,
BiPredicate<HttpServerRequest, HttpServerResponse> compressionPredicate,
HttpRequest nettyRequest) {
super(ch, handler, context);
this.nettyRequest = Objects.requireNonNull(nettyRequest, "nettyRequest");
this.nettyResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
this.responseHeaders = nettyResponse.headers();
this.compressionPredicate = compressionPredicate;
this.cookieHolder = Cookies.newServerRequestHolder(requestHeaders());
chunkedTransfer(true);

Expand Down Expand Up @@ -362,11 +369,6 @@ public HttpVersion version() {
throw new IllegalStateException("request not parsed");
}

@Override
protected void onHandlerStart() {
applyHandler();
}

@Override
public HttpServerResponse compression(boolean compress) {
if (!compress) {
Expand All @@ -390,6 +392,11 @@ else if (channel().pipeline()
return this;
}

@Override
protected void onHandlerStart() {
applyHandler();
}

@Override
protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpContent) {
Expand All @@ -410,11 +417,14 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
}

@Override
protected void checkIfNotPersistent(){
protected void preSendHeadersAndStatus(){
if (!HttpUtil.isTransferEncodingChunked(nettyResponse) && !HttpUtil.isContentLengthSet(
nettyResponse)) {
markPersistent(false);
}
if (compressionPredicate != null && compressionPredicate.test(this, this)) {
compression(true);
}
}

@Override
Expand Down
39 changes: 39 additions & 0 deletions src/main/java/reactor/ipc/netty/http/server/HttpServerOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package reactor.ipc.netty.http.server;

import java.util.Objects;
import java.util.function.BiPredicate;

import io.netty.bootstrap.ServerBootstrap;
import reactor.ipc.netty.options.ServerOptions;

Expand All @@ -37,6 +40,8 @@ public static HttpServerOptions.Builder builder() {
return new HttpServerOptions.Builder();
}

private final BiPredicate<HttpServerRequest, HttpServerResponse> compressionPredicate;

private final int minCompressionResponseSize;
private final int maxInitialLineLength;
private final int maxHeaderSize;
Expand All @@ -52,6 +57,17 @@ private HttpServerOptions(HttpServerOptions.Builder builder) {
this.maxChunkSize = builder.maxChunkSize;
this.validateHeaders = builder.validateHeaders;
this.initialBufferSize = builder.initialBufferSize;
this.compressionPredicate = builder.compressionPredicate;
}

/**
* Returns the compression predicate returning true to compress the response.
* By default the compression is disabled.
*
* @return Returns the compression predicate returning true to compress the response.
*/
public BiPredicate<HttpServerRequest, HttpServerResponse> compressionPredicate() {
return compressionPredicate;
}

/**
Expand Down Expand Up @@ -153,6 +169,8 @@ public static final class Builder extends ServerOptions.Builder<Builder> {
public static final boolean DEFAULT_VALIDATE_HEADERS = true;
public static final int DEFAULT_INITIAL_BUFFER_SIZE = 128;

private BiPredicate<HttpServerRequest, HttpServerResponse> compressionPredicate;

private int minCompressionResponseSize = -1;
private int maxInitialLineLength = DEFAULT_MAX_INITIAL_LINE_LENGTH;
private int maxHeaderSize = DEFAULT_MAX_HEADER_SIZE;
Expand All @@ -176,6 +194,27 @@ public final Builder compression(boolean enabled) {
return get();
}

/**
* Enable GZip response compression if the client request presents accept encoding
* headers and the passed {@link java.util.function.Predicate} matches.
* <p>
* note the passed {@link HttpServerRequest} and {@link HttpServerResponse}
* should be considered read-only and the implement SHOULD NOT consume or
* write the request/response in this predicate.
* @param predicate that returns true to compress the response.
*
*
* @return {@code this}
*/
public final Builder compression(BiPredicate<HttpServerRequest, HttpServerResponse> predicate) {
Objects.requireNonNull(predicate, "compressionPredicate");
if (this.minCompressionResponseSize < 0) {
this.minCompressionResponseSize = 0;
}
this.compressionPredicate = predicate;
return get();
}

/**
* Enable GZip response compression if the client request presents accept encoding
* headers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

/**
* @author mostroverkhov
* @author smaldini
*/
public class HttpCompressionClientServerTests {

Expand Down Expand Up @@ -162,12 +163,87 @@ public void serverCompressionAlwaysEnabled() throws Exception {
.block();
}



@Test
@Ignore
public void serverCompressionEnabledSmallResponse() throws Exception {
HttpServer server = HttpServer.create(o -> o.port(0)
.compression(25));

NettyContext nettyContext =
server.newHandler((in, out) -> out.header("content-length", "5")
.sendString(Mono.just("reply")))
.block(Duration.ofMillis(10_000));

//don't activate compression on the client options to avoid auto-handling (which removes the header)
HttpClient client = HttpClient.create(o -> o.connectAddress(() -> address(nettyContext)));
HttpClientResponse resp =
//edit the header manually to attempt to trigger compression on server side
client.get("/test", req -> req.header("Accept-Encoding", "gzip"))
.block();

//check the server didn't send the gzip header, only transfer-encoding
HttpHeaders headers = resp.responseHeaders();
assertThat(headers.get("conTENT-encoding")).isNull();

//check the server sent plain text
String reply = resp.receive()
.asString()
.blockFirst();
Assert.assertEquals("reply", reply);

resp.dispose();
nettyContext.dispose();
nettyContext.onClose()
.block();
}

@Test
public void serverCompressionPredicateTrue() throws Exception {
HttpServer server = HttpServer.create(o -> o.port(0)
.compression((req, res) -> true));

NettyContext nettyContext =
server.newHandler((in, out) -> out.sendString(Mono.just("reply")))
.block(Duration.ofMillis(10_000));

//don't activate compression on the client options to avoid auto-handling (which removes the header)
HttpClient client = HttpClient.create(o -> o.connectAddress(() -> address(nettyContext)));
HttpClientResponse resp =
//edit the header manually to attempt to trigger compression on server side
client.get("/test", req -> req.header("Accept-Encoding", "gzip"))
.block();

assertThat(resp.responseHeaders().get("content-encoding")).isEqualTo("gzip");

byte[] replyBuffer = resp.receive()
.aggregate()
.asByteArray()
.block();

assertThat(new String(replyBuffer)).isNotEqualTo("reply");

GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(replyBuffer));
byte deflatedBuf[] = new byte[1024];
int readable = gis.read(deflatedBuf);
gis.close();

assertThat(readable).isGreaterThan(0);

String deflated = new String(deflatedBuf, 0, readable);

assertThat(deflated).isEqualTo("reply");

nettyContext.dispose();
nettyContext.onClose()
.block();
}

@Test
public void serverCompressionPredicateFalse() throws Exception {
HttpServer server = HttpServer.create(o -> o.port(0)
.compression((req, res) -> false));

NettyContext nettyContext =
server.newHandler((in, out) -> out.sendString(Mono.just("reply")))
.block(Duration.ofMillis(10_000));
Expand Down

0 comments on commit 288a3bc

Please sign in to comment.