Skip to content

Commit 0e7716a

Browse files
committed
(ISSUE-99) Added support for flushing chunks
- Use the chunk memory limit setting to automatically flush - Added a flush method to ChunkResponder as a default method to the interface to be backward compatible
1 parent 47413e5 commit 0e7716a

File tree

7 files changed

+76
-7
lines changed

7 files changed

+76
-7
lines changed

src/main/java/io/cdap/http/ChunkResponder.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919
import io.netty.buffer.ByteBuf;
2020

2121
import java.io.Closeable;
22+
import java.io.Flushable;
2223
import java.io.IOException;
2324
import java.nio.ByteBuffer;
2425

2526
/**
2627
* A responder for sending chunk-encoded response
2728
*/
28-
public interface ChunkResponder extends Closeable {
29+
public interface ChunkResponder extends Closeable, Flushable {
2930

3031
/**
3132
* Adds a chunk of data to the response. The content will be sent to the client asynchronously.
@@ -43,6 +44,14 @@ public interface ChunkResponder extends Closeable {
4344
*/
4445
void sendChunk(ByteBuf chunk) throws IOException;
4546

47+
/**
48+
* Flushes all the chunks writen so far to the client asynchronously.
49+
*/
50+
@Override
51+
default void flush() {
52+
// no-op
53+
}
54+
4655
/**
4756
* Closes this responder which signals the end of the chunk response.
4857
*/

src/main/java/io/cdap/http/internal/BasicHttpResponder.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,13 @@ final class BasicHttpResponder extends AbstractHttpResponder {
6767
private final Channel channel;
6868
private final AtomicBoolean responded;
6969
private final boolean sslEnabled;
70+
private final int chunkMemoryLimit;
7071

71-
BasicHttpResponder(Channel channel, boolean sslEnabled) {
72+
BasicHttpResponder(Channel channel, boolean sslEnabled, int chunkMemoryLimit) {
7273
this.channel = channel;
7374
this.responded = new AtomicBoolean(false);
7475
this.sslEnabled = sslEnabled;
76+
this.chunkMemoryLimit = chunkMemoryLimit;
7577
}
7678

7779
@Override
@@ -90,7 +92,7 @@ public ChunkResponder sendChunkStart(HttpResponseStatus status, HttpHeaders head
9092

9193
checkNotResponded();
9294
channel.write(response);
93-
return new ChannelChunkResponder(channel);
95+
return new ChannelChunkResponder(channel, chunkMemoryLimit);
9496
}
9597

9698
@Override

src/main/java/io/cdap/http/internal/ChannelChunkResponder.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.io.IOException;
2828
import java.nio.ByteBuffer;
2929
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicLong;
3031

3132
/**
3233
* A {@link ChunkResponder} that writes chunks to a {@link Channel}.
@@ -35,10 +36,14 @@ final class ChannelChunkResponder implements ChunkResponder {
3536

3637
private final Channel channel;
3738
private final AtomicBoolean closed;
39+
private final AtomicLong bufferedSize;
40+
private final int chunkMemoryLimit;
3841

39-
ChannelChunkResponder(Channel channel) {
42+
ChannelChunkResponder(Channel channel, int chunkMemoryLimit) {
4043
this.channel = channel;
41-
this.closed = new AtomicBoolean(false);
44+
this.closed = new AtomicBoolean();
45+
this.bufferedSize = new AtomicLong();
46+
this.chunkMemoryLimit = chunkMemoryLimit;
4247
}
4348

4449
@Override
@@ -54,14 +59,35 @@ public void sendChunk(ByteBuf chunk) throws IOException {
5459
if (!channel.isActive()) {
5560
throw new IOException("Connection already closed.");
5661
}
62+
int chunkSize = chunk.readableBytes();
5763
channel.write(new DefaultHttpContent(chunk));
64+
tryFlush(chunkSize);
5865
}
5966

6067
@Override
61-
public void close() throws IOException {
68+
public void flush() {
69+
// Use the limit as the size to force a flush
70+
tryFlush(chunkMemoryLimit);
71+
}
72+
73+
@Override
74+
public void close() {
6275
if (!closed.compareAndSet(false, true)) {
6376
return;
6477
}
6578
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
6679
}
80+
81+
private void tryFlush(int size) {
82+
long newSize = bufferedSize.addAndGet(size);
83+
if (newSize >= chunkMemoryLimit) {
84+
channel.flush();
85+
// Subtract what were flushed.
86+
// This is correct for single thread.
87+
// For concurrent calls, this provides a lower bound,
88+
// meaning more data might get flushed then being subtracted.
89+
// This make sure we won't go over the memory limit, but might flush more often than needed.
90+
bufferedSize.addAndGet(-1 * newSize);
91+
}
92+
}
6793
}

src/main/java/io/cdap/http/internal/RequestRouter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
8181
return;
8282
}
8383
HttpRequest request = (HttpRequest) msg;
84-
BasicHttpResponder responder = new BasicHttpResponder(ctx.channel(), sslEnabled);
84+
BasicHttpResponder responder = new BasicHttpResponder(ctx.channel(), sslEnabled, chunkMemoryLimit);
8585

8686
// Reset the methodInfo for the incoming request error handling
8787
methodInfo = null;

src/main/java/io/cdap/http/internal/WrappedHttpResponder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ public void sendChunk(ByteBuf chunk) throws IOException {
6565
chunkResponder.sendChunk(chunk);
6666
}
6767

68+
@Override
69+
public void flush() {
70+
chunkResponder.flush();
71+
}
72+
6873
@Override
6974
public void close() throws IOException {
7075
chunkResponder.close();

src/test/java/io/cdap/http/HttpServerTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,19 @@ public void testChunkResponse() throws IOException {
650650
}
651651
}
652652

653+
@Test
654+
public void testLargeChunkResponse() throws IOException {
655+
// Chunk limit for test is 75K, so we request for 150 chunks, each is 1K in length
656+
HttpURLConnection urlConn = request("/test/v1/largeChunk?s=1024&n=150", HttpMethod.GET);
657+
try {
658+
String response = getContent(urlConn);
659+
String expected = String.join("", Collections.nCopies(150 * 1024, "0"));
660+
Assert.assertEquals(expected, response);
661+
} finally {
662+
urlConn.disconnect();
663+
}
664+
}
665+
653666
@Test
654667
public void testStringQueryParam() throws IOException {
655668
// First send without query, for String type, should get defaulted to null.

src/test/java/io/cdap/http/TestHandler.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.nio.ByteBuffer;
3939
import java.nio.channels.FileChannel;
4040
import java.nio.charset.StandardCharsets;
41+
import java.util.Collections;
4142
import java.util.List;
4243
import java.util.SortedSet;
4344
import java.util.concurrent.TimeUnit;
@@ -424,6 +425,19 @@ public void chunk(FullHttpRequest request, HttpResponder responder) throws IOExc
424425
chunker.close();
425426
}
426427

428+
@Path("/largeChunk")
429+
@GET
430+
public void largeChunk(HttpRequest request, HttpResponder responder,
431+
@QueryParam("s") int chunkSize,
432+
@QueryParam("n") int count) throws IOException {
433+
String msg = String.join("", Collections.nCopies(chunkSize, "0"));
434+
try (ChunkResponder chunker = responder.sendChunkStart(HttpResponseStatus.OK)) {
435+
for (int i = 0; i < count; i++) {
436+
chunker.sendChunk(StandardCharsets.UTF_8.encode(msg));
437+
}
438+
}
439+
}
440+
427441
@Path("/produceBody")
428442
@GET
429443
public void produceBody(HttpRequest request, HttpResponder responder,

0 commit comments

Comments
 (0)