Skip to content

Commit

Permalink
Send window update after the peer sends half the limit on a stream or…
Browse files Browse the repository at this point in the history
… connection.
  • Loading branch information
Adrian Cole committed Jan 27, 2014
1 parent 3cf4546 commit 358169b
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ void clear() {
Arrays.fill(values, 0);
}

void set(int id, int idFlags, int value) {
Settings set(int id, int idFlags, int value) {
if (id >= values.length) {
return; // Discard unknown settings.
return this; // Discard unknown settings.
}

int bit = 1 << id;
Expand All @@ -95,6 +95,7 @@ void set(int id, int idFlags, int value) {
}

values[id] = value;
return this;
}

/** Returns true if a value has been assigned for the setting {@code id}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,34 @@ public final class SpdyConnection implements Closeable {
private Map<Integer, Ping> pings;
private int nextPingId;

static final int INITIAL_WINDOW_SIZE = 65535;

/**
* Initial window size to use for the connection and new streams. Until the
* peer sends an update, this will is initialized to {@code 65535}.
* The total number of bytes consumed by the application, but not yet
* acknowledged by sending a {@code WINDOW_UPDATE} frame on this connection.
*/
int initialWindowSize = 65535;
// Visible for testing
long unacknowledgedBytesRead = 0;

/**
* Count of bytes that can be written on the connection before receiving a
* window update.
*/
// Visible for testing
long bytesLeftInWriteWindow = initialWindowSize;
long bytesLeftInWriteWindow;

/** Settings we communicate to the peer. */
// TODO: Do we want to dynamically adjust settings, or KISS and only set once?
// Settings we might send include toggling push, adjusting compression table size.
final Settings okHttpSettings;
final Settings okHttpSettings = new Settings()
.set(Settings.INITIAL_WINDOW_SIZE, 0, INITIAL_WINDOW_SIZE);
// TODO: implement stream limit
// okHttpSettings.set(Settings.MAX_CONCURRENT_STREAMS, 0, max);

/** Settings we receive from the peer. */
// TODO: MWS will need to guard on this setting before attempting to push.
final Settings peerSettings;
final Settings peerSettings = new Settings()
.set(Settings.INITIAL_WINDOW_SIZE, 0, INITIAL_WINDOW_SIZE);

private boolean receivedInitialPeerSettings = false;
final FrameReader frameReader;
final FrameWriter frameWriter;
Expand All @@ -124,11 +134,8 @@ private SpdyConnection(Builder builder) {
} else {
throw new AssertionError(protocol);
}
okHttpSettings = new Settings();
peerSettings = new Settings();
// TODO: implement stream limit
// okHttpSettings.set(Settings.MAX_CONCURRENT_STREAMS, 0, max);
bufferPool = new ByteArrayPool(initialWindowSize * 8); // TODO: revisit size limit!
bytesLeftInWriteWindow = peerSettings.getInitialWindowSize();
bufferPool = new ByteArrayPool(INITIAL_WINDOW_SIZE * 8); // TODO: revisit size limit!
frameReader = variant.newReader(builder.in, client);
frameWriter = variant.newWriter(builder.out, client);

Expand Down Expand Up @@ -203,8 +210,7 @@ public SpdyStream newStream(List<Header> requestHeaders, boolean out, boolean in
}
streamId = nextStreamId;
nextStreamId += 2;
stream = new SpdyStream(
streamId, this, outFinished, inFinished, priority, requestHeaders, initialWindowSize);
stream = new SpdyStream(streamId, this, outFinished, inFinished, priority, requestHeaders);
if (stream.isOpen()) {
streams.put(streamId, stream);
setIdle(false);
Expand Down Expand Up @@ -286,11 +292,11 @@ void writeSynReset(int streamId, ErrorCode statusCode) throws IOException {
frameWriter.rstStream(streamId, statusCode);
}

void writeWindowUpdateLater(final int streamId, final long windowSizeIncrement) {
executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
void writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead) {
executor.submit(new NamedRunnable("OkHttp Window Update %s stream %d", hostName, streamId) {
@Override public void execute() {
try {
frameWriter.windowUpdate(streamId, windowSizeIncrement);
frameWriter.windowUpdate(streamId, unacknowledgedBytesRead);
} catch (IOException ignored) {
}
}
Expand Down Expand Up @@ -568,7 +574,7 @@ private Reader() {

// Create a stream.
final SpdyStream newStream = new SpdyStream(streamId, SpdyConnection.this, outFinished,
inFinished, priority, headerBlock, initialWindowSize);
inFinished, priority, headerBlock);
lastGoodStreamId = streamId;
streams.put(streamId, newStream);
executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
Expand Down Expand Up @@ -607,6 +613,7 @@ private Reader() {
long delta = 0;
SpdyStream[] streamsToNotify = null;
synchronized (SpdyConnection.this) {
int priorWriteWindowSize = peerSettings.getInitialWindowSize();
if (clearPrevious) {
peerSettings.clear();
} else {
Expand All @@ -616,9 +623,8 @@ private Reader() {
ackSettingsLater();
}
int peerInitialWindowSize = peerSettings.getInitialWindowSize();
if (peerInitialWindowSize != -1 && peerInitialWindowSize != initialWindowSize) {
delta = peerInitialWindowSize - initialWindowSize;
SpdyConnection.this.initialWindowSize = peerInitialWindowSize;
if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) {
delta = peerInitialWindowSize - priorWriteWindowSize;
if (!receivedInitialPeerSettings) {
addBytesToWriteWindow(delta);
receivedInitialPeerSettings = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@ public final class SpdyStream {
// blocking operations are performed while the lock is held.

/**
* The number of unacknowledged bytes at which the input stream will send
* the peer a {@code WINDOW_UPDATE} frame. Must be less than this client's
* window size, otherwise the remote peer will stop sending data on this
* stream. (Chrome 25 uses 5 MiB.)
* The total number of bytes consumed by the application
* (with {@link SpdyDataInputStream#read}), but not yet acknowledged by
* sending a {@code WINDOW_UPDATE} frame on this stream.
*/
int windowUpdateThreshold;
// Visible for testing
long unacknowledgedBytesRead = 0;

/**
* Count of bytes that can be written on the stream before receiving a
* window update. Even if this is positive, writes will block until there
* available bytes in {@code connection.bytesLeftInWriteWindow}.
*/
// guarded by this
long bytesLeftInWriteWindow = 0;
long bytesLeftInWriteWindow;

private final int id;
private final SpdyConnection connection;
Expand All @@ -72,14 +72,13 @@ public final class SpdyStream {
private ErrorCode errorCode = null;

SpdyStream(int id, SpdyConnection connection, boolean outFinished, boolean inFinished,
int priority, List<Header> requestHeaders, int initialWriteWindow) {
int priority, List<Header> requestHeaders) {
if (connection == null) throw new NullPointerException("connection == null");
if (requestHeaders == null) throw new NullPointerException("requestHeaders == null");
this.id = id;
this.connection = connection;
this.bytesLeftInWriteWindow = initialWriteWindow;
this.windowUpdateThreshold = initialWriteWindow / 2;
this.in = new SpdyDataInputStream(initialWriteWindow);
this.bytesLeftInWriteWindow = connection.peerSettings.getInitialWindowSize();
this.in = new SpdyDataInputStream(connection.okHttpSettings.getInitialWindowSize());
this.out = new SpdyDataOutputStream();
this.in.finished = inFinished;
this.out.finished = outFinished;
Expand Down Expand Up @@ -363,13 +362,6 @@ private SpdyDataInputStream(int bufferLength) {
*/
private boolean finished;

/**
* The total number of bytes consumed by the application (with {@link
* #read}), but not yet acknowledged by sending a {@code WINDOW_UPDATE}
* frame.
*/
private int unacknowledgedBytes = 0;

@Override public int available() throws IOException {
synchronized (SpdyStream.this) {
checkNotClosed();
Expand All @@ -388,6 +380,7 @@ private SpdyDataInputStream(int bufferLength) {
}

@Override public int read(byte[] b, int offset, int count) throws IOException {
int copied = 0;
synchronized (SpdyStream.this) {
checkOffsetAndCount(b.length, offset, count);
waitUntilReadable();
Expand All @@ -397,8 +390,6 @@ private SpdyDataInputStream(int bufferLength) {
return -1;
}

int copied = 0;

// drain from [pos..buffer.length)
if (limit <= pos) {
int bytesToCopy = Math.min(count, buffer.length - pos);
Expand All @@ -419,19 +410,27 @@ private SpdyDataInputStream(int bufferLength) {
}

// Flow control: notify the peer that we're ready for more data!
unacknowledgedBytes += copied;
if (unacknowledgedBytes >= windowUpdateThreshold) {
connection.writeWindowUpdateLater(id, unacknowledgedBytes);
unacknowledgedBytes = 0;
unacknowledgedBytesRead += copied;
if (unacknowledgedBytesRead >= connection.okHttpSettings.getInitialWindowSize() / 2) {
connection.writeWindowUpdateLater(id, unacknowledgedBytesRead);
unacknowledgedBytesRead = 0;
}

if (pos == limit) {
pos = -1;
limit = 0;
}

return copied;
}
// Update connection.unacknowledgedBytesRead outside the stream lock.
synchronized (connection) { // Multiple application threads may hit this section.
connection.unacknowledgedBytesRead += copied;
if (connection.unacknowledgedBytesRead
>= connection.okHttpSettings.getInitialWindowSize() / 2) {
connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead);
connection.unacknowledgedBytesRead = 0;
}
}
return copied;
}

/**
Expand Down
Loading

0 comments on commit 358169b

Please sign in to comment.