Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add possibility to delay the last flush operation #3271

Merged
merged 2 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -938,8 +938,11 @@ public void operationComplete(io.netty.util.concurrent.Future<? super Void> futu
}
}

discard();
terminateInternal();
}

void terminateInternal() {
discard();
terminate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.http.logging.HttpMessageArgProviderFactory;
import reactor.netty.http.logging.HttpMessageLogFactory;
import reactor.util.annotation.Nullable;
Expand All @@ -67,6 +68,9 @@ final class HttpTrafficHandler extends ChannelDuplexHandler implements Runnable

static final HttpVersion H2 = HttpVersion.valueOf("HTTP/2.0");

static final boolean LAST_FLUSH_WHEN_NO_READ = Boolean.parseBoolean(
System.getProperty("reactor.netty.http.server.lastFlushWhenNoRead", "false"));

final BiPredicate<HttpServerRequest, HttpServerResponse> compress;
final ServerCookieDecoder cookieDecoder;
final ServerCookieEncoder cookieEncoder;
Expand Down Expand Up @@ -96,6 +100,10 @@ final class HttpTrafficHandler extends ChannelDuplexHandler implements Runnable

Boolean secure;

boolean read;
boolean needsFlush;
boolean finalizingResponse;

HttpTrafficHandler(
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compress,
ServerCookieDecoder decoder,
Expand Down Expand Up @@ -142,6 +150,7 @@ public void channelActive(ChannelHandlerContext ctx) {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
read = true;
if (secure == null) {
secure = ctx.channel().pipeline().get(SslHandler.class) != null;
}
Expand All @@ -152,6 +161,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
}
// read message and track if it was keepAlive
if (msg instanceof HttpRequest) {
finalizingResponse = false;

if (idleTimeout != null) {
IdleTimeoutHandler.removeIdleTimeoutHandler(ctx.pipeline());
}
Expand Down Expand Up @@ -196,6 +207,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
else {
overflow = false;

if (LAST_FLUSH_WHEN_NO_READ) {
ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
if (ops instanceof HttpServerOperations) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(ctx.channel(), "Last HTTP packet was sent, terminating the channel"));
}
((HttpServerOperations) ops).terminateInternal();
}
}

DecoderResult decoderResult = request.decoderResult();
if (decoderResult.isFailure()) {
sendDecodingFailures(decoderResult.cause(), msg);
Expand Down Expand Up @@ -284,6 +305,38 @@ else if (overflow) {
ctx.fireChannelRead(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
endReadAndFlush();
ctx.fireChannelReadComplete();
}

void endReadAndFlush() {
if (read) {
read = false;
if (LAST_FLUSH_WHEN_NO_READ && needsFlush) {
needsFlush = false;
ctx.flush();
}
}
}

@Override
public void flush(ChannelHandlerContext ctx) {
if (LAST_FLUSH_WHEN_NO_READ && finalizingResponse) {
if (needsFlush || !ctx.channel().isWritable()) {
needsFlush = false;
ctx.flush();
}
else {
needsFlush = true;
}
}
else {
ctx.flush();
}
}

void sendDecodingFailures(Throwable t, Object msg) {
sendDecodingFailures(t, msg, null, null);
}
Expand Down Expand Up @@ -330,6 +383,12 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}
if (msg instanceof LastHttpContent) {
finalizingResponse = true;

if (LAST_FLUSH_WHEN_NO_READ) {
needsFlush = !read;
}

if (!shouldKeepAlive()) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(ctx.channel(), "Detected non persistent http " +
Expand Down Expand Up @@ -404,6 +463,18 @@ public void run() {
HttpRequestHolder holder = (HttpRequestHolder) next;
nextRequest = holder.request;

finalizingResponse = false;

if (LAST_FLUSH_WHEN_NO_READ) {
ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
if (ops instanceof HttpServerOperations) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(ctx.channel(), "Last HTTP packet was sent, terminating the channel"));
}
((HttpServerOperations) ops).terminateInternal();
}
}

DecoderResult decoderResult = nextRequest.decoderResult();
if (decoderResult.isFailure()) {
sendDecodingFailures(decoderResult.cause(), nextRequest, holder.timestamp, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,25 @@
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import reactor.netty.BaseHttpTest;
import reactor.netty.Connection;
import reactor.netty.DisposableServer;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;
import reactor.test.StepVerifier;
import reactor.util.annotation.Nullable;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.Arrays;
Expand Down Expand Up @@ -250,6 +255,108 @@ void httpPostRespondsSend(HttpProtocol protocol) throws Exception {
assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(2);
}

@Test
void httpPipeliningGetRespondsSendObject() throws Exception {
String oldValue = System.getProperty("reactor.netty.http.server.lastFlushWhenNoRead", "false");
System.setProperty("reactor.netty.http.server.lastFlushWhenNoRead", "true");
try {
CountDownLatch latch = new CountDownLatch(64);
EventsRecorder recorder = new EventsRecorder(latch);
disposableServer = createServer(recorder, HttpProtocol.HTTP11,
r -> r.get("/1", (req, res) -> res.sendObject(Unpooled.wrappedBuffer(REPEAT.getBytes(Charset.defaultCharset())))
.then().doOnEach(recorder).doOnCancel(recorder)));

Connection client =
TcpClient.create()
.port(disposableServer.port())
.wiretap(true)
.connectNow();

int port = disposableServer.port();
String address = HttpUtil.formatHostnameForHttp((InetSocketAddress) disposableServer.address()) + ":" + port;
String request = repeatString("GET /1 HTTP/1.1\r\nHost: " + address + "\r\n\r\n");
client.outbound()
.sendObject(Unpooled.wrappedBuffer(request.getBytes(Charset.defaultCharset())))
.then()
.subscribe();

CountDownLatch responses = new CountDownLatch(16);
client.inbound()
.receive()
.asString()
.doOnNext(s -> {
int ind = 0;
while ((ind = s.indexOf("200", ind)) != -1) {
responses.countDown();
ind += 3;
}
})
.subscribe();

assertThat(responses.await(5, TimeUnit.SECONDS)).isTrue();

assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(recorder.bufferIsReleased.get()).isEqualTo(16);
assertThat(recorder.fullResponseIsSent.get()).isEqualTo(16);
assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(16);
assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(16);
}
finally {
System.setProperty("reactor.netty.http.server.lastFlushWhenNoRead", oldValue);
}
}

@Test
void httpPipeliningGetRespondsSendMono() throws Exception {
String oldValue = System.getProperty("reactor.netty.http.server.lastFlushWhenNoRead", "false");
System.setProperty("reactor.netty.http.server.lastFlushWhenNoRead", "true");
try {
CountDownLatch latch = new CountDownLatch(64);
EventsRecorder recorder = new EventsRecorder(latch);
disposableServer = createServer(recorder, HttpProtocol.HTTP11,
r -> r.get("/1", (req, res) -> res.sendString(Mono.just(REPEAT).delayElement(Duration.ofMillis(10))
.doOnEach(recorder).doOnCancel(recorder))));

Connection client =
TcpClient.create()
.port(disposableServer.port())
.wiretap(true)
.connectNow();

int port = disposableServer.port();
String address = HttpUtil.formatHostnameForHttp((InetSocketAddress) disposableServer.address()) + ":" + port;
String request = repeatString("GET /1 HTTP/1.1\r\nHost: " + address + "\r\n\r\n");
client.outbound()
.sendObject(Unpooled.wrappedBuffer(request.getBytes(Charset.defaultCharset())))
.then()
.subscribe();

CountDownLatch responses = new CountDownLatch(16);
client.inbound()
.receive()
.asString()
.doOnNext(s -> {
int ind = 0;
while ((ind = s.indexOf("200", ind)) != -1) {
responses.countDown();
ind += 3;
}
})
.subscribe();

assertThat(responses.await(5, TimeUnit.SECONDS)).isTrue();

assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(recorder.bufferIsReleased.get()).isEqualTo(16);
assertThat(recorder.fullResponseIsSent.get()).isEqualTo(16);
assertThat(recorder.onCompleteIsReceived.get()).isEqualTo(16);
assertThat(recorder.onTerminateIsReceived.get()).isEqualTo(16);
}
finally {
System.setProperty("reactor.netty.http.server.lastFlushWhenNoRead", oldValue);
}
}

@ParameterizedTest
@EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"})
void httpPostRespondsSendFlux(HttpProtocol protocol) throws Exception {
Expand Down Expand Up @@ -386,6 +493,14 @@ static String createString(int length) {
return new String(chars);
}

static String repeatString(String s) {
StringBuilder sb = new StringBuilder(16 * s.length());
for (int i = 0; i < 16; i++) {
sb.append(s);
}
return sb.toString();
}

static Mono<List<String>> sendGetRequest(int port, HttpProtocol protocol) {
return sendRequest(port, protocol, HttpMethod.GET, 1, null);
}
Expand Down