Skip to content

Commit

Permalink
Merge pull request square#86 from square/jwilson/careful
Browse files Browse the repository at this point in the history
Be more careful around IOExceptions.
  • Loading branch information
swankjesse committed Jan 21, 2013
2 parents c37a793 + b9580d6 commit 514fbfd
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 132 deletions.
11 changes: 11 additions & 0 deletions src/main/java/com/squareup/okhttp/internal/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.net.URL;
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -313,4 +314,14 @@ public static String readAsciiLine(InputStream in) throws IOException {
}
return result.toString();
}

public static ThreadFactory newThreadFactory(final String name, final boolean daemon) {
return new ThreadFactory() {
@Override public Thread newThread(Runnable r) {
Thread result = new Thread(r, name);
result.setDaemon(daemon);
return result;
}
};
}
}
76 changes: 45 additions & 31 deletions src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ private SpdyConnection(Builder builder) {

String prefix = builder.client ? "Spdy Client " : "Spdy Server ";
readExecutor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Threads.newThreadFactory(prefix + "Reader", false));
new SynchronousQueue<Runnable>(), Util.newThreadFactory(prefix + "Reader", false));
writeExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), Threads.newThreadFactory(prefix + "Writer", false));
new LinkedBlockingQueue<Runnable>(), Util.newThreadFactory(prefix + "Writer", false));
callbackExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Threads.newThreadFactory(prefix + "Callbacks", false));
new SynchronousQueue<Runnable>(), Util.newThreadFactory(prefix + "Callbacks", false));

readExecutor.execute(new Reader());
}
Expand Down Expand Up @@ -317,7 +317,17 @@ public void shutdown(int statusCode) throws IOException {
* internal executor services.
*/
@Override public void close() throws IOException {
shutdown(GOAWAY_OK);
close(GOAWAY_OK, SpdyStream.RST_CANCEL);
}

private void close(int shutdownStatusCode, int rstStatusCode) throws IOException {
assert (!Thread.holdsLock(this));
IOException thrown = null;
try {
shutdown(shutdownStatusCode);
} catch (IOException e) {
thrown = e;
}

SpdyStream[] streamsToClose = null;
Ping[] pingsToCancel = null;
Expand All @@ -335,8 +345,9 @@ public void shutdown(int statusCode) throws IOException {
if (streamsToClose != null) {
for (SpdyStream stream : streamsToClose) {
try {
stream.close(SpdyStream.RST_CANCEL);
} catch (Throwable ignored) {
stream.close(rstStatusCode);
} catch (IOException e) {
if (thrown != null) thrown = e;
}
}
}
Expand All @@ -350,7 +361,17 @@ public void shutdown(int statusCode) throws IOException {
writeExecutor.shutdown();
callbackExecutor.shutdown();
readExecutor.shutdown();
Util.closeAll(spdyReader, spdyWriter);
try {
spdyReader.close();
} catch (IOException e) {
thrown = e;
}
try {
spdyWriter.close();
} catch (IOException e) {
if (thrown == null) thrown = e;
}
if (thrown != null) throw thrown;
}

public static class Builder {
Expand Down Expand Up @@ -389,15 +410,21 @@ public SpdyConnection build() {

private class Reader implements Runnable, SpdyReader.Handler {
@Override public void run() {
int shutdownStatusCode = GOAWAY_INTERNAL_ERROR;
int rstStatusCode = SpdyStream.RST_INTERNAL_ERROR;
try {
while (spdyReader.nextFrame(this)) {
}
} catch (ProtocolException e) {
shutdownLater(GOAWAY_PROTOCOL_ERROR);
shutdownStatusCode = GOAWAY_OK;
rstStatusCode = SpdyStream.RST_CANCEL;
} catch (IOException e) {
throw new RuntimeException(e);
shutdownStatusCode = GOAWAY_PROTOCOL_ERROR;
rstStatusCode = SpdyStream.RST_PROTOCOL_ERROR;
} finally {
Util.closeQuietly(SpdyConnection.this);
try {
close(shutdownStatusCode, rstStatusCode);
} catch (IOException ignored) {
}
}
}

Expand All @@ -409,14 +436,9 @@ private class Reader implements Runnable, SpdyReader.Handler {
Util.skipByReading(in, length);
return;
}
try {
dataStream.receiveData(in, length);
if ((flags & SpdyConnection.FLAG_FIN) != 0) {
dataStream.receiveFin();
}
} catch (ProtocolException e) {
Util.skipByReading(in, length);
dataStream.closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR);
dataStream.receiveData(in, length);
if ((flags & SpdyConnection.FLAG_FIN) != 0) {
dataStream.receiveFin();
}
}

Expand Down Expand Up @@ -456,25 +478,17 @@ private class Reader implements Runnable, SpdyReader.Handler {
writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM);
return;
}
try {
replyStream.receiveReply(nameValueBlock);
if ((flags & SpdyConnection.FLAG_FIN) != 0) {
replyStream.receiveFin();
}
} catch (ProtocolException e) {
replyStream.closeLater(SpdyStream.RST_STREAM_IN_USE);
replyStream.receiveReply(nameValueBlock);
if ((flags & SpdyConnection.FLAG_FIN) != 0) {
replyStream.receiveFin();
}
}

@Override public void headers(int flags, int streamId, List<String> nameValueBlock)
throws IOException {
SpdyStream replyStream = getStream(streamId);
if (replyStream != null) {
try {
replyStream.receiveHeaders(nameValueBlock);
} catch (ProtocolException e) {
replyStream.closeLater(SpdyStream.RST_PROTOCOL_ERROR);
}
replyStream.receiveHeaders(nameValueBlock);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.zip.InflaterInputStream;

/**
* Read version 2 SPDY frames.
* Read spdy/3 frames.
*/
final class SpdyReader implements Closeable {
static final byte[] DICTIONARY = ("\u0000\u0000\u0000\u0007options\u0000\u0000\u0000\u0004hea"
Expand Down
54 changes: 38 additions & 16 deletions src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ProtocolException;
import java.net.SocketTimeoutException;
import static java.nio.ByteOrder.BIG_ENDIAN;
import java.util.ArrayList;
Expand Down Expand Up @@ -101,6 +100,8 @@ public final class SpdyStream {

SpdyStream(int id, SpdyConnection connection, int flags, int priority, int slot,
List<String> requestHeaders, Settings settings) {
if (connection == null) throw new NullPointerException("connection == null");
if (requestHeaders == null) throw new NullPointerException("requestHeaders == null");
this.id = id;
this.connection = connection;
this.priority = priority;
Expand All @@ -124,7 +125,8 @@ public final class SpdyStream {
* Returns true if this stream is open. A stream is open until either:
* <ul>
* <li>A {@code SYN_RESET} frame abnormally terminates the stream.
* <li>Both input and output streams have transmitted all data.
* <li>Both input and output streams have transmitted all data and
* headers.
* </ul>
* Note that the input stream may continue to yield data even after a stream
* reports itself as not open. This is because input data is buffered.
Expand All @@ -133,7 +135,7 @@ public synchronized boolean isOpen() {
if (rstStatusCode != -1) {
return false;
}
if ((in.finished || in.closed) && (out.finished || out.closed)) {
if ((in.finished || in.closed) && (out.finished || out.closed) && responseHeaders != null) {
return false;
}
return true;
Expand Down Expand Up @@ -287,25 +289,39 @@ private boolean closeInternal(int rstStatusCode) {

void receiveReply(List<String> strings) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
boolean streamInUseError = false;
boolean open = true;
synchronized (this) {
if (!isLocallyInitiated() || responseHeaders != null) {
throw new ProtocolException();
if (isLocallyInitiated() && responseHeaders == null) {
responseHeaders = strings;
open = isOpen();
notifyAll();
} else {
streamInUseError = true;
}
responseHeaders = strings;
notifyAll();
}
if (streamInUseError) {
closeLater(SpdyStream.RST_STREAM_IN_USE);
} else if (!open) {
connection.removeStream(id);
}
}

void receiveHeaders(List<String> headers) throws IOException {
assert (!Thread.holdsLock(SpdyStream.this));
boolean protocolError = false;
synchronized (this) {
if (responseHeaders == null) {
throw new ProtocolException();
if (responseHeaders != null) {
List<String> newHeaders = new ArrayList<String>();
newHeaders.addAll(responseHeaders);
newHeaders.addAll(headers);
this.responseHeaders = newHeaders;
} else {
protocolError = true;
}
List<String> newHeaders = new ArrayList<String>();
newHeaders.addAll(responseHeaders);
newHeaders.addAll(headers);
this.responseHeaders = newHeaders;
}
if (protocolError) {
closeLater(SpdyStream.RST_PROTOCOL_ERROR);
}
}

Expand Down Expand Up @@ -513,14 +529,20 @@ void receive(InputStream in, int byteCount) throws IOException {
int limit;
int firstNewByte;
boolean finished;
boolean flowControlError;
synchronized (SpdyStream.this) {
finished = this.finished;
pos = this.pos;
firstNewByte = this.limit;
limit = this.limit;
if (byteCount > buffer.length - available()) {
throw new ProtocolException();
}
flowControlError = byteCount > buffer.length - available();
}

// If the peer sends more data than we can handle, discard it and close the connection.
if (flowControlError) {
Util.skipByReading(in, byteCount);
closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR);
return;
}

// Discard data received after the stream is finished. It's probably a benign race.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.zip.Deflater;

/**
* Write version 2 SPDY frames.
* Write spdy/3 frames.
*/
final class SpdyWriter implements Closeable {
final DataOutputStream out;
Expand Down
34 changes: 0 additions & 34 deletions src/main/java/com/squareup/okhttp/internal/spdy/Threads.java

This file was deleted.

24 changes: 19 additions & 5 deletions src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public final class MockSpdyPeer implements Closeable {
private final BlockingQueue<InFrame> inFrames = new LinkedBlockingQueue<InFrame>();
private int port;
private final Executor executor = Executors.newCachedThreadPool(
Threads.newThreadFactory("MockSpdyPeer", true));
Util.newThreadFactory("MockSpdyPeer", true));
private ServerSocket serverSocket;
private Socket socket;

Expand All @@ -52,8 +52,17 @@ public void acceptFrame() {
}

public SpdyWriter sendFrame() {
OutFrame frame = new OutFrame(frameCount++, bytesOut.size());
outFrames.add(frame);
outFrames.add(new OutFrame(frameCount++, bytesOut.size(), Integer.MAX_VALUE));
return spdyWriter;
}

/**
* Sends a frame, truncated to {@code truncateToLength} bytes. This is only
* useful for testing error handling as the truncated frame will be
* malformed.
*/
public SpdyWriter sendTruncatedFrame(int truncateToLength) {
outFrames.add(new OutFrame(frameCount++, bytesOut.size(), truncateToLength));
return spdyWriter;
}

Expand Down Expand Up @@ -99,6 +108,7 @@ private void readAndWriteFrames() throws IOException {

if (nextOutFrame != null && nextOutFrame.sequence == i) {
int start = nextOutFrame.start;
int truncateToLength = nextOutFrame.truncateToLength;
int end;
if (outFramesIterator.hasNext()) {
nextOutFrame = outFramesIterator.next();
Expand All @@ -108,7 +118,8 @@ private void readAndWriteFrames() throws IOException {
}

// write a frame
out.write(outBytes, start, end - start);
int length = Math.min(end - start, truncateToLength);
out.write(outBytes, start, length);

} else {
// read a frame
Expand All @@ -117,6 +128,7 @@ private void readAndWriteFrames() throws IOException {
inFrames.add(inFrame);
}
}
Util.closeQuietly(socket);
}

public Socket openSocket() throws IOException {
Expand All @@ -139,10 +151,12 @@ public Socket openSocket() throws IOException {
private static class OutFrame {
private final int sequence;
private final int start;
private final int truncateToLength;

private OutFrame(int sequence, int start) {
private OutFrame(int sequence, int start, int truncateToLength) {
this.sequence = sequence;
this.start = start;
this.truncateToLength = truncateToLength;
}
}

Expand Down
Loading

0 comments on commit 514fbfd

Please sign in to comment.