Skip to content

(ISSUE-101) Release the empty buffer returned by BodyProducer #102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 12, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 35 additions & 46 deletions src/main/java/io/cdap/http/internal/BasicHttpResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
Expand Down Expand Up @@ -137,12 +136,7 @@ public void sendFile(File file, HttpHeaders headers) throws IOException {
// The FileRegion will close the file channel when it is done sending.
FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, file.length());
channel.write(region);
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
completion.run();
}
});
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(future -> completion.run());
} catch (Throwable t) {
completion.run();
throw t;
Expand Down Expand Up @@ -188,18 +182,14 @@ public void sendContent(HttpResponseStatus status, final BodyProducer bodyProduc
checkNotResponded();

// Streams the data produced by the given BodyProducer
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {

@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
callBodyProducerHandleError(bodyProducer, future.cause());
channel.close();
return;
}
channel.writeAndFlush(new HttpChunkedInput(new BodyProducerChunkedInput(bodyProducer, contentLength)))
.addListener(createBodyProducerCompletionListener(bodyProducer));
channel.writeAndFlush(response).addListener(future -> {
if (!future.isSuccess()) {
callBodyProducerHandleError(bodyProducer, future.cause());
channel.close();
return;
}
channel.writeAndFlush(new HttpChunkedInput(new BodyProducerChunkedInput(bodyProducer, contentLength)))
.addListener(createBodyProducerCompletionListener(bodyProducer));
});
}

Expand All @@ -211,21 +201,18 @@ boolean isResponded() {
}

private ChannelFutureListener createBodyProducerCompletionListener(final BodyProducer bodyProducer) {
return new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
callBodyProducerHandleError(bodyProducer, future.cause());
channel.close();
return;
}
return future -> {
if (!future.isSuccess()) {
callBodyProducerHandleError(bodyProducer, future.cause());
channel.close();
return;
}

try {
bodyProducer.finished();
} catch (Throwable t) {
callBodyProducerHandleError(bodyProducer, t);
channel.close();
}
try {
bodyProducer.finished();
} catch (Throwable t) {
callBodyProducerHandleError(bodyProducer, t);
channel.close();
}
};
}
Expand Down Expand Up @@ -256,19 +243,11 @@ private Runnable prepareSendFile(Channel channel) {
try {
final ChannelPipeline pipeline = channel.pipeline();
pipeline.remove("compressor");
return new Runnable() {
@Override
public void run() {
pipeline.addAfter("codec", "compressor", new HttpContentCompressor());
}
};
return () -> pipeline.addAfter("codec", "compressor", new HttpContentCompressor());
} catch (NoSuchElementException e) {
// Ignore if there is no compressor
return new Runnable() {
@Override
public void run() {
// no-op
}
return () -> {
// no-op
};
}
}
Expand Down Expand Up @@ -299,9 +278,19 @@ public boolean isEndOfInput() throws Exception {
}

completed = !nextChunk.isReadable();
if (completed && length >= 0 && bytesProduced != length) {
throw new IllegalStateException("Body size doesn't match with content length. " +
"Content-Length: " + length + ", bytes produced: " + bytesProduced);
if (completed) {
try {
if (length >= 0 && bytesProduced != length) {
throw new IllegalStateException("Body size doesn't match with content length. " +
"Content-Length: " + length + ", bytes produced: " + bytesProduced);
}
} finally {
// We should release the buffer if it is completed since this will be the last place that uses the buffer,
// as the buffer won't be returned by the readChunk method.
// Also, the buffer won't get double released since this method entrance is protected by the `completed`
// field.
nextChunk.release();
}
}

return completed;
Expand Down