From 358169b89f32c00bf229b4e42ccef756588da71f Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Sun, 26 Jan 2014 15:46:35 -0800 Subject: [PATCH] Send window update after the peer sends half the limit on a stream or connection. --- .../okhttp/internal/spdy/Settings.java | 5 +- .../okhttp/internal/spdy/SpdyConnection.java | 48 ++++++------ .../okhttp/internal/spdy/SpdyStream.java | 49 ++++++------ .../internal/spdy/SpdyConnectionTest.java | 74 ++++++++++++------- 4 files changed, 101 insertions(+), 75 deletions(-) diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Settings.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Settings.java index f886b1b2ad48..c05d6b174f10 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Settings.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Settings.java @@ -76,9 +76,9 @@ void clear() { Arrays.fill(values, 0); } - void set(int id, int idFlags, int value) { + Settings set(int id, int idFlags, int value) { if (id >= values.length) { - return; // Discard unknown settings. + return this; // Discard unknown settings. } int bit = 1 << id; @@ -95,6 +95,7 @@ void set(int id, int idFlags, int value) { } values[id] = value; + return this; } /** Returns true if a value has been assigned for the setting {@code id}. */ diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java index b784392d1320..0f9e26510fc8 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java @@ -82,24 +82,34 @@ public final class SpdyConnection implements Closeable { private Map pings; private int nextPingId; + static final int INITIAL_WINDOW_SIZE = 65535; + /** - * Initial window size to use for the connection and new streams. Until the - * peer sends an update, this will is initialized to {@code 65535}. + * The total number of bytes consumed by the application, but not yet + * acknowledged by sending a {@code WINDOW_UPDATE} frame on this connection. */ - int initialWindowSize = 65535; + // Visible for testing + long unacknowledgedBytesRead = 0; /** * Count of bytes that can be written on the connection before receiving a * window update. */ // Visible for testing - long bytesLeftInWriteWindow = initialWindowSize; + long bytesLeftInWriteWindow; + /** Settings we communicate to the peer. */ // TODO: Do we want to dynamically adjust settings, or KISS and only set once? - // Settings we might send include toggling push, adjusting compression table size. - final Settings okHttpSettings; + final Settings okHttpSettings = new Settings() + .set(Settings.INITIAL_WINDOW_SIZE, 0, INITIAL_WINDOW_SIZE); + // TODO: implement stream limit + // okHttpSettings.set(Settings.MAX_CONCURRENT_STREAMS, 0, max); + + /** Settings we receive from the peer. */ // TODO: MWS will need to guard on this setting before attempting to push. - final Settings peerSettings; + final Settings peerSettings = new Settings() + .set(Settings.INITIAL_WINDOW_SIZE, 0, INITIAL_WINDOW_SIZE); + private boolean receivedInitialPeerSettings = false; final FrameReader frameReader; final FrameWriter frameWriter; @@ -124,11 +134,8 @@ private SpdyConnection(Builder builder) { } else { throw new AssertionError(protocol); } - okHttpSettings = new Settings(); - peerSettings = new Settings(); - // TODO: implement stream limit - // okHttpSettings.set(Settings.MAX_CONCURRENT_STREAMS, 0, max); - bufferPool = new ByteArrayPool(initialWindowSize * 8); // TODO: revisit size limit! + bytesLeftInWriteWindow = peerSettings.getInitialWindowSize(); + bufferPool = new ByteArrayPool(INITIAL_WINDOW_SIZE * 8); // TODO: revisit size limit! frameReader = variant.newReader(builder.in, client); frameWriter = variant.newWriter(builder.out, client); @@ -203,8 +210,7 @@ public SpdyStream newStream(List
requestHeaders, boolean out, boolean in } streamId = nextStreamId; nextStreamId += 2; - stream = new SpdyStream( - streamId, this, outFinished, inFinished, priority, requestHeaders, initialWindowSize); + stream = new SpdyStream(streamId, this, outFinished, inFinished, priority, requestHeaders); if (stream.isOpen()) { streams.put(streamId, stream); setIdle(false); @@ -286,11 +292,11 @@ void writeSynReset(int streamId, ErrorCode statusCode) throws IOException { frameWriter.rstStream(streamId, statusCode); } - void writeWindowUpdateLater(final int streamId, final long windowSizeIncrement) { - executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) { + void writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead) { + executor.submit(new NamedRunnable("OkHttp Window Update %s stream %d", hostName, streamId) { @Override public void execute() { try { - frameWriter.windowUpdate(streamId, windowSizeIncrement); + frameWriter.windowUpdate(streamId, unacknowledgedBytesRead); } catch (IOException ignored) { } } @@ -568,7 +574,7 @@ private Reader() { // Create a stream. final SpdyStream newStream = new SpdyStream(streamId, SpdyConnection.this, outFinished, - inFinished, priority, headerBlock, initialWindowSize); + inFinished, priority, headerBlock); lastGoodStreamId = streamId; streams.put(streamId, newStream); executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) { @@ -607,6 +613,7 @@ private Reader() { long delta = 0; SpdyStream[] streamsToNotify = null; synchronized (SpdyConnection.this) { + int priorWriteWindowSize = peerSettings.getInitialWindowSize(); if (clearPrevious) { peerSettings.clear(); } else { @@ -616,9 +623,8 @@ private Reader() { ackSettingsLater(); } int peerInitialWindowSize = peerSettings.getInitialWindowSize(); - if (peerInitialWindowSize != -1 && peerInitialWindowSize != initialWindowSize) { - delta = peerInitialWindowSize - initialWindowSize; - SpdyConnection.this.initialWindowSize = peerInitialWindowSize; + if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) { + delta = peerInitialWindowSize - priorWriteWindowSize; if (!receivedInitialPeerSettings) { addBytesToWriteWindow(delta); receivedInitialPeerSettings = true; diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java index c8ac6d9e945d..87ce18a0ba28 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java @@ -35,12 +35,12 @@ public final class SpdyStream { // blocking operations are performed while the lock is held. /** - * The number of unacknowledged bytes at which the input stream will send - * the peer a {@code WINDOW_UPDATE} frame. Must be less than this client's - * window size, otherwise the remote peer will stop sending data on this - * stream. (Chrome 25 uses 5 MiB.) + * The total number of bytes consumed by the application + * (with {@link SpdyDataInputStream#read}), but not yet acknowledged by + * sending a {@code WINDOW_UPDATE} frame on this stream. */ - int windowUpdateThreshold; + // Visible for testing + long unacknowledgedBytesRead = 0; /** * Count of bytes that can be written on the stream before receiving a @@ -48,7 +48,7 @@ public final class SpdyStream { * available bytes in {@code connection.bytesLeftInWriteWindow}. */ // guarded by this - long bytesLeftInWriteWindow = 0; + long bytesLeftInWriteWindow; private final int id; private final SpdyConnection connection; @@ -72,14 +72,13 @@ public final class SpdyStream { private ErrorCode errorCode = null; SpdyStream(int id, SpdyConnection connection, boolean outFinished, boolean inFinished, - int priority, List
requestHeaders, int initialWriteWindow) { + int priority, List
requestHeaders) { if (connection == null) throw new NullPointerException("connection == null"); if (requestHeaders == null) throw new NullPointerException("requestHeaders == null"); this.id = id; this.connection = connection; - this.bytesLeftInWriteWindow = initialWriteWindow; - this.windowUpdateThreshold = initialWriteWindow / 2; - this.in = new SpdyDataInputStream(initialWriteWindow); + this.bytesLeftInWriteWindow = connection.peerSettings.getInitialWindowSize(); + this.in = new SpdyDataInputStream(connection.okHttpSettings.getInitialWindowSize()); this.out = new SpdyDataOutputStream(); this.in.finished = inFinished; this.out.finished = outFinished; @@ -363,13 +362,6 @@ private SpdyDataInputStream(int bufferLength) { */ private boolean finished; - /** - * The total number of bytes consumed by the application (with {@link - * #read}), but not yet acknowledged by sending a {@code WINDOW_UPDATE} - * frame. - */ - private int unacknowledgedBytes = 0; - @Override public int available() throws IOException { synchronized (SpdyStream.this) { checkNotClosed(); @@ -388,6 +380,7 @@ private SpdyDataInputStream(int bufferLength) { } @Override public int read(byte[] b, int offset, int count) throws IOException { + int copied = 0; synchronized (SpdyStream.this) { checkOffsetAndCount(b.length, offset, count); waitUntilReadable(); @@ -397,8 +390,6 @@ private SpdyDataInputStream(int bufferLength) { return -1; } - int copied = 0; - // drain from [pos..buffer.length) if (limit <= pos) { int bytesToCopy = Math.min(count, buffer.length - pos); @@ -419,19 +410,27 @@ private SpdyDataInputStream(int bufferLength) { } // Flow control: notify the peer that we're ready for more data! - unacknowledgedBytes += copied; - if (unacknowledgedBytes >= windowUpdateThreshold) { - connection.writeWindowUpdateLater(id, unacknowledgedBytes); - unacknowledgedBytes = 0; + unacknowledgedBytesRead += copied; + if (unacknowledgedBytesRead >= connection.okHttpSettings.getInitialWindowSize() / 2) { + connection.writeWindowUpdateLater(id, unacknowledgedBytesRead); + unacknowledgedBytesRead = 0; } if (pos == limit) { pos = -1; limit = 0; } - - return copied; } + // Update connection.unacknowledgedBytesRead outside the stream lock. + synchronized (connection) { // Multiple application threads may hit this section. + connection.unacknowledgedBytesRead += copied; + if (connection.unacknowledgedBytesRead + >= connection.okHttpSettings.getInitialWindowSize() / 2) { + connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead); + connection.unacknowledgedBytesRead = 0; + } + } + return copied; } /** diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java index 52169a5128e9..8f29368f5a69 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/SpdyConnectionTest.java @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; -import org.junit.Ignore; import org.junit.Test; import static com.squareup.okhttp.internal.Util.UTF_8; @@ -39,6 +38,7 @@ import static com.squareup.okhttp.internal.spdy.ErrorCode.REFUSED_STREAM; import static com.squareup.okhttp.internal.spdy.ErrorCode.STREAM_IN_USE; import static com.squareup.okhttp.internal.spdy.Settings.PERSIST_VALUE; +import static com.squareup.okhttp.internal.spdy.SpdyStream.OUTPUT_BUFFER_SIZE; import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_DATA; import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_GOAWAY; import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_HEADERS; @@ -47,6 +47,7 @@ import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_RST_STREAM; import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_SETTINGS; import static com.squareup.okhttp.internal.spdy.Spdy3.TYPE_WINDOW_UPDATE; +import static com.squareup.okhttp.internal.spdy.SpdyConnection.INITIAL_WINDOW_SIZE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -320,7 +321,7 @@ private MockSpdyPeer.InFrame replyWithNoData(Variant variant) throws Exception { // This stream was created *after* the connection settings were adjusted. SpdyStream stream = connection.newStream(headerEntries("a", "android"), true, true); - assertEquals(3368, connection.initialWindowSize); + assertEquals(3368, connection.peerSettings.getInitialWindowSize()); assertEquals(1684, connection.bytesLeftInWriteWindow); // initial wasn't affected. // New Stream is has the most recent initial window size. assertEquals(3368, stream.bytesLeftInWriteWindow); @@ -1052,27 +1053,25 @@ private void receiveGoAway(Variant variant) throws Exception { readSendsWindowUpdate(SPDY3); } - /** - * This test fails on http/2 as it tries to send too large data frame. In - * practice, {@link SpdyStream#OUTPUT_BUFFER_SIZE} prevents us from sending - * too large frames. The test should probably be rewritten to take into - * account max frame size per variant. - */ - @Test @Ignore public void readSendsWindowUpdateHttp2() throws Exception { + @Test public void readSendsWindowUpdateHttp2() throws Exception { readSendsWindowUpdate(HTTP_20_DRAFT_09); } private void readSendsWindowUpdate(Variant variant) throws IOException, InterruptedException { MockSpdyPeer peer = new MockSpdyPeer(variant, false); - int windowUpdateThreshold = 65535 / 2; + int windowUpdateThreshold = INITIAL_WINDOW_SIZE / 2; // Write the mocking script. peer.acceptFrame(); // SYN_STREAM peer.sendFrame().synReply(false, 1, headerEntries("a", "android")); for (int i = 0; i < 3; i++) { - peer.sendFrame().data(false, 1, new byte[windowUpdateThreshold]); - peer.acceptFrame(); // WINDOW UPDATE + peer.sendFrame().data(false, 1, new byte[OUTPUT_BUFFER_SIZE]); + peer.sendFrame().data(false, 1, new byte[OUTPUT_BUFFER_SIZE]); + peer.sendFrame().data(false, 1, new byte[OUTPUT_BUFFER_SIZE]); + peer.sendFrame().data(false, 1, new byte[windowUpdateThreshold - OUTPUT_BUFFER_SIZE * 3]); + peer.acceptFrame(); // connection WINDOW UPDATE + peer.acceptFrame(); // stream WINDOW UPDATE } peer.sendFrame().data(true, 1, new byte[0]); peer.play(); @@ -1080,7 +1079,7 @@ private void readSendsWindowUpdate(Variant variant) // Play it back. SpdyConnection connection = connection(peer, variant); SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true); - assertEquals(windowUpdateThreshold, stream.windowUpdateThreshold); + assertEquals(0, stream.unacknowledgedBytesRead); assertEquals(headerEntries("a", "android"), stream.getResponseHeaders()); InputStream in = stream.getInputStream(); int total = 0; @@ -1093,6 +1092,8 @@ private void readSendsWindowUpdate(Variant variant) assertEquals(-1, in.read()); // Verify the peer received what was expected. + assertEquals(21, peer.frameCount()); + MockSpdyPeer.InFrame synStream = peer.takeFrame(); assertEquals(TYPE_HEADERS, synStream.type); for (int i = 0; i < 3; i++) { @@ -1100,6 +1101,10 @@ private void readSendsWindowUpdate(Variant variant) assertEquals(TYPE_WINDOW_UPDATE, windowUpdate.type); assertEquals(1, windowUpdate.streamId); assertEquals(windowUpdateThreshold, windowUpdate.windowSizeIncrement); + windowUpdate = peer.takeFrame(); + assertEquals(TYPE_WINDOW_UPDATE, windowUpdate.type); + assertEquals(0, windowUpdate.streamId); // connection window update + assertEquals(windowUpdateThreshold, windowUpdate.windowSizeIncrement); } } @@ -1164,33 +1169,49 @@ private void clientSendsEmptyDataServerDoesntSendWindowUpdate(Variant variant) } @Test public void writeAwaitsWindowUpdate() throws Exception { - int windowSize = 65535; + int framesThatFillWindow = roundUp(INITIAL_WINDOW_SIZE, OUTPUT_BUFFER_SIZE); // Write the mocking script. This accepts more data frames than necessary! peer.acceptFrame(); // SYN_STREAM - for (int i = 0; i < windowSize / 1024; i++) { + for (int i = 0; i < framesThatFillWindow; i++) { peer.acceptFrame(); // DATA } + peer.acceptFrame(); // DATA we won't be able to flush until a window update. peer.play(); // Play it back. SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); SpdyStream stream = connection.newStream(headerEntries("b", "banana"), true, true); OutputStream out = stream.getOutputStream(); - out.write(new byte[windowSize]); + out.write(new byte[INITIAL_WINDOW_SIZE]); + out.flush(); + + // Check that we've filled the window for both the stream and also the connection. + assertEquals(0, connection.bytesLeftInWriteWindow); + assertEquals(0, connection.getStream(1).bytesLeftInWriteWindow); + out.write('a'); assertFlushBlocks(out); + // receiving a window update on the connection isn't enough. + connection.readerRunnable.windowUpdate(0, 1); + assertFlushBlocks(out); + + // receiving a window update on the stream will unblock the stream. + connection.readerRunnable.windowUpdate(1, 1); + out.flush(); + // Verify the peer received what was expected. MockSpdyPeer.InFrame synStream = peer.takeFrame(); assertEquals(TYPE_HEADERS, synStream.type); - MockSpdyPeer.InFrame data = peer.takeFrame(); - assertEquals(TYPE_DATA, data.type); + for (int i = 0; i < framesThatFillWindow; i++) { + MockSpdyPeer.InFrame data = peer.takeFrame(); + assertEquals(TYPE_DATA, data.type); + } } @Test public void initialSettingsWithWindowSizeAdjustsConnection() throws Exception { - int initialWindowSize = 65535; - int framesThatFillWindow = roundUp(initialWindowSize, SpdyStream.OUTPUT_BUFFER_SIZE); + int framesThatFillWindow = roundUp(INITIAL_WINDOW_SIZE, OUTPUT_BUFFER_SIZE); // Write the mocking script. This accepts more data frames than necessary! peer.acceptFrame(); // SYN_STREAM @@ -1204,7 +1225,7 @@ private void clientSendsEmptyDataServerDoesntSendWindowUpdate(Variant variant) SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); SpdyStream stream = connection.newStream(headerEntries("a", "apple"), true, true); OutputStream out = stream.getOutputStream(); - out.write(new byte[initialWindowSize]); + out.write(new byte[INITIAL_WINDOW_SIZE]); out.flush(); // write 1 more than the window size @@ -1217,7 +1238,7 @@ private void clientSendsEmptyDataServerDoesntSendWindowUpdate(Variant variant) // Receiving a Settings with a larger window size will unblock the streams. Settings initial = new Settings(); - initial.set(Settings.INITIAL_WINDOW_SIZE, PERSIST_VALUE, initialWindowSize + 1); + initial.set(Settings.INITIAL_WINDOW_SIZE, PERSIST_VALUE, INITIAL_WINDOW_SIZE + 1); connection.readerRunnable.settings(false, initial); assertEquals(1, connection.bytesLeftInWriteWindow); @@ -1231,7 +1252,7 @@ private void clientSendsEmptyDataServerDoesntSendWindowUpdate(Variant variant) // Settings after the initial do not affect the connection window size. Settings next = new Settings(); - next.set(Settings.INITIAL_WINDOW_SIZE, PERSIST_VALUE, initialWindowSize + 2); + next.set(Settings.INITIAL_WINDOW_SIZE, PERSIST_VALUE, INITIAL_WINDOW_SIZE + 2); connection.readerRunnable.settings(false, next); assertEquals(0, connection.bytesLeftInWriteWindow); // connection wasn't affected. @@ -1259,8 +1280,7 @@ private void clientSendsEmptyDataServerDoesntSendWindowUpdate(Variant variant) } @Test public void blockedStreamDoesntStarveNewStream() throws Exception { - int initialWindowSize = 65535; - int framesThatFillWindow = roundUp(initialWindowSize, SpdyStream.OUTPUT_BUFFER_SIZE); + int framesThatFillWindow = roundUp(INITIAL_WINDOW_SIZE, SpdyStream.OUTPUT_BUFFER_SIZE); // Write the mocking script. This accepts more data frames than necessary! peer.acceptFrame(); // SYN_STREAM @@ -1274,7 +1294,7 @@ private void clientSendsEmptyDataServerDoesntSendWindowUpdate(Variant variant) SpdyConnection connection = new SpdyConnection.Builder(true, peer.openSocket()).build(); SpdyStream stream1 = connection.newStream(headerEntries("a", "apple"), true, true); OutputStream out1 = stream1.getOutputStream(); - out1.write(new byte[initialWindowSize]); + out1.write(new byte[INITIAL_WINDOW_SIZE]); out1.flush(); // Check that we've filled the window for both the stream and also the connection. @@ -1295,7 +1315,7 @@ private void clientSendsEmptyDataServerDoesntSendWindowUpdate(Variant variant) assertEquals(0, connection.bytesLeftInWriteWindow); assertEquals(0, connection.getStream(1).bytesLeftInWriteWindow); - assertEquals(initialWindowSize - 3, connection.getStream(3).bytesLeftInWriteWindow); + assertEquals(INITIAL_WINDOW_SIZE - 3, connection.getStream(3).bytesLeftInWriteWindow); } /**