Skip to content

Commit

Permalink
Merge pull request square#45 from square/jwilson/readhandler
Browse files Browse the repository at this point in the history
Use callbacks rather than fields to handle incoming frames.
  • Loading branch information
JakeWharton committed Oct 1, 2012
2 parents 277fa12 + 961d797 commit e6fa469
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 195 deletions.
153 changes: 68 additions & 85 deletions src/main/java/libcore/net/spdy/SpdyConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public final class SpdyConnection implements Closeable {
static final int FLAG_FIN = 0x1;
static final int FLAG_UNIDIRECTIONAL = 0x2;

static final int TYPE_EOF = -1;
static final int TYPE_DATA = 0x0;
static final int TYPE_SYN_STREAM = 0x1;
static final int TYPE_SYN_REPLY = 0x2;
Expand Down Expand Up @@ -120,18 +119,6 @@ public synchronized boolean isClient() {
return nextStreamId % 2 == 1;
}

/**
* Receive an incoming setting from a peer.
*/
private synchronized void receiveSettings(int flags, Settings settings) {
if (this.settings == null
|| (flags & Settings.FLAG_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS) != 0) {
this.settings = settings;
} else {
this.settings.merge(settings);
}
}

private synchronized SpdyStream getStream(int id) {
return streams.get(id);
}
Expand Down Expand Up @@ -302,11 +289,11 @@ public SpdyConnection build() {
}
}

private class Reader implements Runnable {
private class Reader implements Runnable, SpdyReader.Handler {
@Override public void run() {
Throwable failure = null;
try {
while (readFrame()) {
while (spdyReader.nextFrame(this)) {
}
} catch (Throwable e) {
failure = e;
Expand All @@ -318,85 +305,81 @@ private class Reader implements Runnable {
}
}

/**
* Returns true unless the last frame has been read.
*/
private boolean readFrame() throws IOException {
switch (spdyReader.nextFrame()) {
case TYPE_EOF:
return false;

case TYPE_DATA:
SpdyStream dataStream = getStream(spdyReader.id);
if (dataStream != null) {
dataStream.receiveData(spdyReader.in, spdyReader.flags, spdyReader.length);
} else {
writeSynResetLater(spdyReader.id, SpdyStream.RST_INVALID_STREAM);
Streams.skipByReading(spdyReader.in, spdyReader.length);
}
return true;

case TYPE_SYN_STREAM:
final SpdyStream synStream = new SpdyStream(spdyReader.id, SpdyConnection.this,
spdyReader.nameValueBlock, spdyReader.flags);
SpdyStream previous = streams.put(spdyReader.id, synStream);
if (previous != null) {
previous.close(SpdyStream.RST_PROTOCOL_ERROR);
}
callbackExecutor.execute(new Runnable() {
@Override public void run() {
try {
handler.receive(synStream);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
return true;

case TYPE_SYN_REPLY:
SpdyStream replyStream = getStream(spdyReader.id);
if (replyStream != null) {
// TODO: honor incoming FLAG_FIN.
replyStream.receiveReply(spdyReader.nameValueBlock);
} else {
writeSynResetLater(spdyReader.id, SpdyStream.RST_INVALID_STREAM);
}
return true;
@Override public void data(int flags, int streamId, InputStream in, int length)
throws IOException {
SpdyStream dataStream = getStream(streamId);
if (dataStream != null) {
dataStream.receiveData(in, flags, length);
} else {
writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM);
Streams.skipByReading(in, length);
}
}

case TYPE_RST_STREAM:
SpdyStream rstStream = removeStream(spdyReader.id);
if (rstStream != null) {
rstStream.receiveRstStream(spdyReader.statusCode);
@Override public void synStream(int flags, int streamId, int associatedStreamId,
int priority, List<String> nameValueBlock) {
final SpdyStream synStream = new SpdyStream(streamId, SpdyConnection.this,
nameValueBlock, flags);
SpdyStream previous;
synchronized (SpdyConnection.this) {
previous = streams.put(streamId, synStream);
}
if (previous != null) {
previous.close(SpdyStream.RST_PROTOCOL_ERROR);
return;
}
callbackExecutor.execute(new Runnable() {
@Override public void run() {
try {
handler.receive(synStream);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return true;
});
}

case SpdyConnection.TYPE_SETTINGS:
receiveSettings(spdyReader.flags, spdyReader.settings);
return true;
@Override public void synReply(int flags, int streamId, List<String> nameValueBlock)
throws IOException {
SpdyStream replyStream = getStream(streamId);
if (replyStream != null) {
// TODO: honor incoming FLAG_FIN.
replyStream.receiveReply(nameValueBlock);
} else {
writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM);
}
}

case SpdyConnection.TYPE_NOOP:
return true;
@Override public void rstStream(int flags, int streamId, int statusCode) {
SpdyStream rstStream = removeStream(streamId);
if (rstStream != null) {
rstStream.receiveRstStream(statusCode);
}
}

case SpdyConnection.TYPE_PING:
int id = spdyReader.id;
if (isClient() != (id % 2 == 1)) {
// Respond to a client ping if this is a server and vice versa.
writePingLater(id, null);
@Override public void settings(int flags, Settings newSettings) {
synchronized (SpdyConnection.this) {
if (settings == null
|| (flags & Settings.FLAG_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS) != 0) {
settings = newSettings;
} else {
Ping ping = removePing(id);
if (ping != null) {
ping.receive();
}
settings.merge(newSettings);
}
return true;
}
}

case SpdyConnection.TYPE_GOAWAY:
case SpdyConnection.TYPE_HEADERS:
throw new UnsupportedOperationException();
@Override public void noop() {
}

default:
throw new IOException("Unexpected frame: " + Integer.toHexString(spdyReader.type));
@Override public void ping(int flags, int streamId) {
if (isClient() != (streamId % 2 == 1)) {
// Respond to a client ping if this is a server and vice versa.
writePingLater(streamId, null);
} else {
Ping ping = removePing(streamId);
if (ping != null) {
ping.receive();
}
}
}
}
Expand Down
Loading

0 comments on commit e6fa469

Please sign in to comment.