|
26 | 26 | import io.netty.channel.Channel;
|
27 | 27 | import io.netty.channel.ChannelFuture;
|
28 | 28 | import io.netty.channel.ChannelFutureListener;
|
| 29 | +import io.netty.channel.ChannelHandler; |
29 | 30 | import io.netty.channel.ChannelHandlerContext;
|
| 31 | +import io.netty.channel.ChannelPipeline; |
30 | 32 | import io.netty.channel.DefaultFileRegion;
|
31 | 33 | import io.netty.channel.FileRegion;
|
32 | 34 | import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
|
51 | 53 | import java.io.IOException;
|
52 | 54 | import java.io.RandomAccessFile;
|
53 | 55 | import java.nio.charset.StandardCharsets;
|
| 56 | +import java.util.NoSuchElementException; |
54 | 57 | import java.util.concurrent.atomic.AtomicBoolean;
|
55 | 58 | import javax.annotation.Nullable;
|
56 | 59 |
|
@@ -127,10 +130,21 @@ public void sendFile(File file, HttpHeaders headers) throws IOException {
|
127 | 130 | // The HttpChunkedInput will write out the last content
|
128 | 131 | channel.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 8192)));
|
129 | 132 | } else {
|
130 |
| - // The FileRegion will close the file channel when it is done sending. |
131 |
| - FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, file.length()); |
132 |
| - channel.write(region); |
133 |
| - channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); |
| 133 | + final Runnable completion = prepareSendFile(channel); |
| 134 | + try { |
| 135 | + // The FileRegion will close the file channel when it is done sending. |
| 136 | + FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, file.length()); |
| 137 | + channel.write(region); |
| 138 | + channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(new ChannelFutureListener() { |
| 139 | + @Override |
| 140 | + public void operationComplete(ChannelFuture future) { |
| 141 | + completion.run(); |
| 142 | + } |
| 143 | + }); |
| 144 | + } catch (Throwable t) { |
| 145 | + completion.run(); |
| 146 | + throw t; |
| 147 | + } |
134 | 148 | }
|
135 | 149 | } catch (Throwable t) {
|
136 | 150 | try {
|
@@ -228,6 +242,35 @@ private void checkNotResponded() {
|
228 | 242 | }
|
229 | 243 | }
|
230 | 244 |
|
| 245 | + /** |
| 246 | + * Prepares the given {@link Channel} for the sending file. |
| 247 | + * |
| 248 | + * @param channel the channel to prepare |
| 249 | + * @return a {@link Runnable} that should be called when the send file is completed to revert the action |
| 250 | + */ |
| 251 | + private Runnable prepareSendFile(Channel channel) { |
| 252 | + // Remove the "compressor" from the pipeline to skip content encoding since FileRegion do zero-copy write, |
| 253 | + // hence bypassing any user space data operation. |
| 254 | + try { |
| 255 | + final ChannelPipeline pipeline = channel.pipeline(); |
| 256 | + final ChannelHandler compressor = pipeline.remove("compressor"); |
| 257 | + return new Runnable() { |
| 258 | + @Override |
| 259 | + public void run() { |
| 260 | + pipeline.addAfter("codec", "compressor", compressor); |
| 261 | + } |
| 262 | + }; |
| 263 | + } catch (NoSuchElementException e) { |
| 264 | + // Ignore if there is no compressor |
| 265 | + return new Runnable() { |
| 266 | + @Override |
| 267 | + public void run() { |
| 268 | + // no-op |
| 269 | + } |
| 270 | + }; |
| 271 | + } |
| 272 | + } |
| 273 | + |
231 | 274 | /**
|
232 | 275 | * A {@link ChunkedInput} implementation that produce chunks using {@link BodyProducer}.
|
233 | 276 | */
|
|
0 commit comments