From 495ecf88773736f3da1dd5ea93de78d7e0a9d0fe Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Mon, 26 Feb 2018 11:49:53 -0800 Subject: [PATCH] Tweak when to mark an HTTP connection persistent As a server response, if no identified message length, do not persist As a server empty body response, always assume persist As a client request, do not assume anything --- .../reactor/ipc/netty/http/HttpOperations.java | 11 +++++++---- .../netty/http/server/HttpServerOperations.java | 14 +++++++++++++- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/main/java/reactor/ipc/netty/http/HttpOperations.java b/src/main/java/reactor/ipc/netty/http/HttpOperations.java index d7be7bca4a..ea00651786 100644 --- a/src/main/java/reactor/ipc/netty/http/HttpOperations.java +++ b/src/main/java/reactor/ipc/netty/http/HttpOperations.java @@ -147,10 +147,8 @@ public Mono then() { .remove(HttpHeaderNames.TRANSFER_ENCODING); } - if (!HttpUtil.isTransferEncodingChunked(outboundHttpMessage()) && !HttpUtil.isContentLengthSet( - outboundHttpMessage())) { - markPersistent(false); - } + shouldNotPersist(); + return channel().writeAndFlush(outboundHttpMessage()); } else { @@ -159,6 +157,11 @@ public Mono then() { }); } + protected void shouldNotPersist(){ + //default doesn't imply anything - only server usually implies if connection + // should default persist (keep-alive) when response is not self defined + } + protected abstract HttpMessage newFullEmptyBodyMessage(); @Override diff --git a/src/main/java/reactor/ipc/netty/http/server/HttpServerOperations.java b/src/main/java/reactor/ipc/netty/http/server/HttpServerOperations.java index a27f84e129..eae4779c99 100644 --- a/src/main/java/reactor/ipc/netty/http/server/HttpServerOperations.java +++ b/src/main/java/reactor/ipc/netty/http/server/HttpServerOperations.java @@ -281,7 +281,10 @@ public HttpHeaders responseHeaders() { public Mono send() { if (markSentHeaderAndBody()) { HttpMessage response = newFullEmptyBodyMessage(); - return FutureMono.deferFuture(() -> channel().writeAndFlush(response)); + return FutureMono.deferFuture(() -> { + markPersistent(true); + return channel().writeAndFlush(response); + }); } else { return Mono.empty(); @@ -387,6 +390,14 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) { } } + @Override + protected void shouldNotPersist(){ + if (!HttpUtil.isTransferEncodingChunked(outboundHttpMessage()) && !HttpUtil.isContentLengthSet( + outboundHttpMessage())) { + markPersistent(false); + } + } + @Override protected void onOutboundComplete() { if (isWebsocket()) { @@ -402,6 +413,7 @@ protected void onOutboundComplete() { log.debug("No sendHeaders() called before complete, sending " + "zero-length header"); } + markPersistent(true); f = channel().writeAndFlush(newFullEmptyBodyMessage()); } else if (markSentBody()) {