Skip to content

Commit ae43439

Browse files
authored
Merge pull request #102 from cdapio/issue-101-fix-buf-leak
(ISSUE-101) Release the empty buffer returned by BodyProducer
2 parents d266bc8 + f8867ca commit ae43439

File tree

1 file changed

+35
-46
lines changed

1 file changed

+35
-46
lines changed

src/main/java/io/cdap/http/internal/BasicHttpResponder.java

Lines changed: 35 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import io.netty.buffer.ByteBufAllocator;
2525
import io.netty.buffer.Unpooled;
2626
import io.netty.channel.Channel;
27-
import io.netty.channel.ChannelFuture;
2827
import io.netty.channel.ChannelFutureListener;
2928
import io.netty.channel.ChannelHandlerContext;
3029
import io.netty.channel.ChannelPipeline;
@@ -137,12 +136,7 @@ public void sendFile(File file, HttpHeaders headers) throws IOException {
137136
// The FileRegion will close the file channel when it is done sending.
138137
FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, file.length());
139138
channel.write(region);
140-
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(new ChannelFutureListener() {
141-
@Override
142-
public void operationComplete(ChannelFuture future) {
143-
completion.run();
144-
}
145-
});
139+
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(future -> completion.run());
146140
} catch (Throwable t) {
147141
completion.run();
148142
throw t;
@@ -188,18 +182,14 @@ public void sendContent(HttpResponseStatus status, final BodyProducer bodyProduc
188182
checkNotResponded();
189183

190184
// Streams the data produced by the given BodyProducer
191-
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
192-
193-
@Override
194-
public void operationComplete(ChannelFuture future) throws Exception {
195-
if (!future.isSuccess()) {
196-
callBodyProducerHandleError(bodyProducer, future.cause());
197-
channel.close();
198-
return;
199-
}
200-
channel.writeAndFlush(new HttpChunkedInput(new BodyProducerChunkedInput(bodyProducer, contentLength)))
201-
.addListener(createBodyProducerCompletionListener(bodyProducer));
185+
channel.writeAndFlush(response).addListener(future -> {
186+
if (!future.isSuccess()) {
187+
callBodyProducerHandleError(bodyProducer, future.cause());
188+
channel.close();
189+
return;
202190
}
191+
channel.writeAndFlush(new HttpChunkedInput(new BodyProducerChunkedInput(bodyProducer, contentLength)))
192+
.addListener(createBodyProducerCompletionListener(bodyProducer));
203193
});
204194
}
205195

@@ -211,21 +201,18 @@ boolean isResponded() {
211201
}
212202

213203
private ChannelFutureListener createBodyProducerCompletionListener(final BodyProducer bodyProducer) {
214-
return new ChannelFutureListener() {
215-
@Override
216-
public void operationComplete(ChannelFuture future) throws Exception {
217-
if (!future.isSuccess()) {
218-
callBodyProducerHandleError(bodyProducer, future.cause());
219-
channel.close();
220-
return;
221-
}
204+
return future -> {
205+
if (!future.isSuccess()) {
206+
callBodyProducerHandleError(bodyProducer, future.cause());
207+
channel.close();
208+
return;
209+
}
222210

223-
try {
224-
bodyProducer.finished();
225-
} catch (Throwable t) {
226-
callBodyProducerHandleError(bodyProducer, t);
227-
channel.close();
228-
}
211+
try {
212+
bodyProducer.finished();
213+
} catch (Throwable t) {
214+
callBodyProducerHandleError(bodyProducer, t);
215+
channel.close();
229216
}
230217
};
231218
}
@@ -256,19 +243,11 @@ private Runnable prepareSendFile(Channel channel) {
256243
try {
257244
final ChannelPipeline pipeline = channel.pipeline();
258245
pipeline.remove("compressor");
259-
return new Runnable() {
260-
@Override
261-
public void run() {
262-
pipeline.addAfter("codec", "compressor", new HttpContentCompressor());
263-
}
264-
};
246+
return () -> pipeline.addAfter("codec", "compressor", new HttpContentCompressor());
265247
} catch (NoSuchElementException e) {
266248
// Ignore if there is no compressor
267-
return new Runnable() {
268-
@Override
269-
public void run() {
270-
// no-op
271-
}
249+
return () -> {
250+
// no-op
272251
};
273252
}
274253
}
@@ -299,9 +278,19 @@ public boolean isEndOfInput() throws Exception {
299278
}
300279

301280
completed = !nextChunk.isReadable();
302-
if (completed && length >= 0 && bytesProduced != length) {
303-
throw new IllegalStateException("Body size doesn't match with content length. " +
304-
"Content-Length: " + length + ", bytes produced: " + bytesProduced);
281+
if (completed) {
282+
try {
283+
if (length >= 0 && bytesProduced != length) {
284+
throw new IllegalStateException("Body size doesn't match with content length. " +
285+
"Content-Length: " + length + ", bytes produced: " + bytesProduced);
286+
}
287+
} finally {
288+
// We should release the buffer if it is completed since this will be the last place that uses the buffer,
289+
// as the buffer won't be returned by the readChunk method.
290+
// Also, the buffer won't get double released since this method entrance is protected by the `completed`
291+
// field.
292+
nextChunk.release();
293+
}
305294
}
306295

307296
return completed;

0 commit comments

Comments
 (0)