Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Immediately mark outbound as complete when sending Mono or Object #3250

Merged
merged 3 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ else if (!SCHEME_PATTERN.matcher(tempUri).matches()) {
*/
protected abstract HttpMessage outboundHttpMessage();

HttpMessage prepareHttpMessage(ByteBuf buffer) {
protected HttpMessage prepareHttpMessage(ByteBuf buffer) {
HttpMessage msg;
if (HttpUtil.getContentLength(outboundHttpMessage(), -1) == 0 ||
isContentAlwaysEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.DecoderResult;
Expand Down Expand Up @@ -57,7 +55,7 @@
*
* @author Violeta Georgieva
*/
final class Http2StreamBridgeServerHandler extends ChannelDuplexHandler implements ChannelFutureListener {
final class Http2StreamBridgeServerHandler extends ChannelDuplexHandler {

final BiPredicate<HttpServerRequest, HttpServerResponse> compress;
final ServerCookieDecoder cookieDecoder;
Expand Down Expand Up @@ -195,31 +193,11 @@ else if (msg instanceof HttpResponse && HttpResponseStatus.CONTINUE.equals(((Htt
}
else {
//"FutureReturnValueIgnored" this is deliberate
ChannelFuture f = ctx.write(msg, promise);
ctx.write(msg, promise);
if (msg instanceof LastHttpContent) {
pendingResponse = false;
f.addListener(this);
ctx.read();
}
}
}

@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(future.channel(),
"Sending last HTTP packet was not successful, terminating the channel"),
future.cause());
}
}
else {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(future.channel(),
"Last HTTP packet was sent, terminating the channel"));
}
}

HttpServerOperations.cleanHandlerTerminate(future.channel());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.GenericFutureListener;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
Expand Down Expand Up @@ -109,7 +110,7 @@
* @author Stephane Maldini1
*/
class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerResponse>
implements HttpServerRequest, HttpServerResponse {
implements HttpServerRequest, HttpServerResponse, GenericFutureListener<io.netty.util.concurrent.Future<? super Void>> {

final BiPredicate<HttpServerRequest, HttpServerResponse> configuredCompressionPredicate;
final ConnectionInfo connectionInfo;
Expand All @@ -133,6 +134,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
String path;
Future<?> requestTimeoutFuture;
Consumer<? super HttpHeaders> trailerHeadersConsumer;
HttpMessage fullHttpResponse;

volatile Context currentContext;

Expand All @@ -148,6 +150,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
this.formDecoderProvider = replaced.formDecoderProvider;
this.is100ContinueExpected = replaced.is100ContinueExpected;
this.isHttp2 = replaced.isHttp2;
this.fullHttpResponse = replaced.fullHttpResponse;
this.mapHandle = replaced.mapHandle;
this.nettyRequest = replaced.nettyRequest;
this.nettyResponse = replaced.nettyResponse;
Expand Down Expand Up @@ -514,11 +517,96 @@ public ZonedDateTime timestamp() {
return timestamp;
}

@Override
@SuppressWarnings("unchecked")
public NettyOutbound send(Publisher<? extends ByteBuf> source) {
if (!channel().isActive()) {
return then(Mono.error(AbortedException.beforeSend()));
}
if (source instanceof Mono) {
return new PostHeadersNettyOutbound(((Mono<ByteBuf>) source)
.flatMap(b -> {
if (!hasSentHeaders()) {
try {
beforeMarkSentHeaders();

fullHttpResponse = prepareHttpMessage(b);

afterMarkSentHeaders();
}
catch (RuntimeException e) {
b.release();
return Mono.error(e);
}

onComplete();
return Mono.<Void>empty();
}

if (log.isDebugEnabled()) {
log.debug(format(channel(), "Dropped HTTP content, since response has been sent already: {}"), b);
}
b.release();
return Mono.empty();
})
.doOnDiscard(ByteBuf.class, ByteBuf::release), this, null);
}
return super.send(source);
}

@Override
public NettyOutbound sendObject(Object message) {
if (!channel().isActive()) {
ReactorNetty.safeRelease(message);
return then(Mono.error(AbortedException.beforeSend()));
}
if (message instanceof ByteBuf) {
ByteBuf b = (ByteBuf) message;
return new PostHeadersNettyOutbound(Mono.create(sink -> {
if (!hasSentHeaders()) {
try {
beforeMarkSentHeaders();

fullHttpResponse = prepareHttpMessage(b);

afterMarkSentHeaders();
}
catch (RuntimeException e) {
// If afterMarkSentHeaders throws an exception there is no need to release the ByteBuf here.
// It will be released by PostHeadersNettyOutbound as there are on error/cancel hooks
sink.error(e);
return;
}

onComplete();
sink.success();
}
else {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Dropped HTTP content, since response has been sent already: {}"), b);
}
b.release();
sink.success();
}
}), this, b);
}
return super.sendObject(message);
}

@Override
public Mono<Void> send() {
return FutureMono.deferFuture(() -> markSentHeaderAndBody() ?
channel().writeAndFlush(newFullBodyMessage(EMPTY_BUFFER)) :
channel().newSucceededFuture());
return Mono.create(sink -> {
if (!hasSentHeaders()) {
onComplete();
sink.success();
}
else {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Response has been sent already."));
}
sink.success();
}
});
}

@Override
Expand Down Expand Up @@ -575,6 +663,46 @@ public HttpServerResponse status(HttpResponseStatus status) {
return this;
}

@Override
public Mono<Void> then() {
if (!channel().isActive()) {
return Mono.error(AbortedException.beforeSend());
}

if (hasSentHeaders()) {
return Mono.empty();
}

return FutureMono.deferFuture(() -> {
if (!hasSentHeaders()) {
beforeMarkSentHeaders();

HttpMessage msg = outboundHttpMessage();
boolean last = false;
int contentLength = HttpUtil.getContentLength(msg, -1);
if (contentLength == 0 || isContentAlwaysEmpty()) {
last = true;
msg = newFullBodyMessage(Unpooled.EMPTY_BUFFER);
}
else if (contentLength > 0) {
responseHeaders.remove(HttpHeaderNames.TRANSFER_ENCODING);
}

afterMarkSentHeaders();

if (!last) {
return markSentHeaders() ? channel().writeAndFlush(msg) : channel().newSucceededFuture();
}
else {
return markSentHeaderAndBody() ? channel().writeAndFlush(msg) : channel().newSucceededFuture();
}
}
else {
return channel().newSucceededFuture();
}
});
}

@Override
public HttpServerResponse trailerHeaders(Consumer<? super HttpHeaders> trailerHeaders) {
this.trailerHeadersConsumer = Objects.requireNonNull(trailerHeaders, "trailerHeaders");
Expand Down Expand Up @@ -749,11 +877,10 @@ protected void onOutboundComplete() {
}
if (markSentHeaderAndBody()) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "No sendHeaders() called before complete, sending " +
"zero-length header"));
log.debug(format(channel(), "Headers are not sent before onComplete()."));
}

f = channel().writeAndFlush(newFullBodyMessage(EMPTY_BUFFER));
f = channel().writeAndFlush(fullHttpResponse != null ? fullHttpResponse : newFullBodyMessage(EMPTY_BUFFER));
}
else if (markSentBody()) {
HttpHeaders trailerHeaders = null;
Expand Down Expand Up @@ -790,35 +917,29 @@ else if (markSentBody()) {
}
else {
discard();
terminate();
return;
}
f.addListener(s -> {
discard();
if (!s.isSuccess() && log.isDebugEnabled()) {
log.debug(format(channel(), "Failed flushing last frame"), s.cause());
}
});

f.addListener(this);
}

static void cleanHandlerTerminate(Channel ch) {
ChannelOperations<?, ?> ops = get(ch);

if (ops == null) {
return;
}

ops.discard();

//Try to defer the disposing to leave a chance for any synchronous complete following this callback
if (!ops.isSubscriptionDisposed()) {
ch.eventLoop()
.execute(((HttpServerOperations) ops)::terminate);
@Override
public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) {
if (!future.isSuccess()) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Sending last HTTP packet was not successful, terminating the channel"),
future.cause());
}
}
else {
//if already disposed, we can immediately call terminate
((HttpServerOperations) ops).terminate();
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Last HTTP packet was sent, terminating the channel"));
}
}

discard();

terminate();
}

static long requestsCounter(Channel channel) {
Expand Down Expand Up @@ -896,8 +1017,15 @@ else if (cause instanceof TooLongHttpHeaderException) {
}
}

//"FutureReturnValueIgnored" this is deliberate
ctx.channel().writeAndFlush(response);
if (ops instanceof HttpServerOperations) {
HttpServerOperations serverOps = (HttpServerOperations) ops;
serverOps.fullHttpResponse = response;
serverOps.onComplete();
}
else {
//"FutureReturnValueIgnored" this is deliberate
ctx.channel().writeAndFlush(response);
}

listener.onStateChange(ops, REQUEST_DECODING_FAILED);
}
Expand All @@ -922,13 +1050,15 @@ protected void onOutboundError(Throwable err) {
nettyResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
responseHeaders.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
channel().writeAndFlush(newFullBodyMessage(EMPTY_BUFFER))
.addListener(this)
.addListener(ChannelFutureListener.CLOSE);
return;
}

markSentBody();
log.error(format(channel(), "Error finishing response. Closing connection"), err);
channel().writeAndFlush(EMPTY_BUFFER)
.addListener(this)
.addListener(ChannelFutureListener.CLOSE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.function.BiPredicate;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
Expand Down Expand Up @@ -62,8 +61,7 @@
* Replace {@link io.netty.handler.codec.http.HttpServerKeepAliveHandler} with extra
* handler management.
*/
final class HttpTrafficHandler extends ChannelDuplexHandler
implements Runnable, ChannelFutureListener {
final class HttpTrafficHandler extends ChannelDuplexHandler implements Runnable {

static final String MULTIPART_PREFIX = "multipart";

Expand Down Expand Up @@ -339,13 +337,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
pendingResponses);
}
ctx.write(msg, promise.unvoid())
.addListener(this)
.addListener(ChannelFutureListener.CLOSE);
return;
}

ctx.write(msg, promise.unvoid())
.addListener(this);
ctx.write(msg, promise);

if (!persistentConnection) {
return;
Expand Down Expand Up @@ -457,25 +453,6 @@ public void run() {
overflow = false;
}

@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(future.channel(),
"Sending last HTTP packet was not successful, terminating the channel"),
future.cause());
}
}
else {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(format(future.channel(),
"Last HTTP packet was sent, terminating the channel"));
}
}

HttpServerOperations.cleanHandlerTerminate(future.channel());
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
discard();
Expand Down
Loading