diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/BufferedSource.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/BufferedSource.java index 96063d54ce32..db62b6c28a3c 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/BufferedSource.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/BufferedSource.java @@ -54,8 +54,8 @@ public BufferedSource(Source source, OkBuffer buffer) { * will block until there are bytes to read or the source is definitely * exhausted. */ - public boolean exhausted() throws IOException { - return buffer.byteCount() == 0 && source.read(buffer, Segment.SIZE, Deadline.NONE) == -1; + public boolean exhausted(Deadline deadline) throws IOException { + return buffer.byteCount() == 0 && source.read(buffer, Segment.SIZE, deadline) == -1; } /** @@ -84,11 +84,21 @@ public short readShort() throws IOException { return buffer.readShort(); } + public int readShortLe() throws IOException { + require(2, Deadline.NONE); + return buffer.readShortLe(); + } + public int readInt() throws IOException { require(4, Deadline.NONE); return buffer.readInt(); } + public int readIntLe() throws IOException { + require(4, Deadline.NONE); + return buffer.readIntLe(); + } + /** * Reads and discards {@code byteCount} bytes from {@code source} using {@code * buffer} as a buffer. Throws an {@link EOFException} if the source is @@ -105,6 +115,20 @@ public void skip(long byteCount, Deadline deadline) throws IOException { } } + /** + * Returns the index of {@code b} in the buffer, refilling it if necessary + * until it is found. This reads an unbounded number of bytes into the buffer. + */ + public long seek(byte b, Deadline deadline) throws IOException { + long start = 0; + long index; + while ((index = buffer.indexOf(b, start)) == -1) { + start = buffer.byteCount; + if (source.read(buffer, Segment.SIZE, deadline) == -1) throw new EOFException(); + } + return index; + } + /** Returns an input stream that reads from this source. */ public InputStream inputStream() { return new InputStream() { diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/GzipSource.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/GzipSource.java index c6cf1c4fea09..11e460d039b4 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/GzipSource.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/GzipSource.java @@ -15,7 +15,6 @@ */ package com.squareup.okhttp.internal.bytes; -import java.io.EOFException; import java.io.IOException; import java.util.zip.CRC32; import java.util.zip.Inflater; @@ -34,22 +33,12 @@ public final class GzipSource implements Source { /** The current section. Always progresses forward. */ private int section = SECTION_HEADER; - /** - * This buffer is carefully shared between this source and the InflaterSource - * it wraps. In particular, this source may read more bytes than necessary for - * the GZIP header; the InflaterSource will pick those up when it starts to - * read the compressed body. And the InflaterSource may read more bytes than - * necessary for the compressed body, and this source will pick those up for - * the GZIP trailer. - */ - private final OkBuffer buffer = new OkBuffer(); - /** * Our source should yield a GZIP header (which we consume directly), followed * by deflated bytes (which we consume via an InflaterSource), followed by a * GZIP trailer (which we also consume directly). */ - private final Source source; + private final BufferedSource source; /** The inflater used to decompress the deflated body. */ private final Inflater inflater; @@ -65,8 +54,8 @@ public final class GzipSource implements Source { public GzipSource(Source source) throws IOException { this.inflater = new Inflater(true); - this.source = source; - this.inflaterSource = new InflaterSource(source, inflater, buffer); + this.source = new BufferedSource(source, new OkBuffer()); + this.inflaterSource = new InflaterSource(this.source, inflater); } @Override public long read(OkBuffer sink, long byteCount, Deadline deadline) throws IOException { @@ -108,26 +97,26 @@ private void consumeHeader(Deadline deadline) throws IOException { // +---+---+---+---+---+---+---+---+---+---+ // |ID1|ID2|CM |FLG| MTIME |XFL|OS | (more-->) // +---+---+---+---+---+---+---+---+---+---+ - require(10, deadline); - byte flags = buffer.getByte(3); + source.require(10, deadline); + byte flags = source.buffer.getByte(3); boolean fhcrc = ((flags >> FHCRC) & 1) == 1; - if (fhcrc) updateCrc(buffer, 0, 10); + if (fhcrc) updateCrc(source.buffer, 0, 10); - short id1id2 = buffer.readShort(); + short id1id2 = source.readShort(); checkEqual("ID1ID2", (short) 0x1f8b, id1id2); - buffer.skip(8); + source.skip(8, deadline); // Skip optional extra fields. // +---+---+=================================+ // | XLEN |...XLEN bytes of "extra field"...| (more-->) // +---+---+=================================+ if (((flags >> FEXTRA) & 1) == 1) { - require(2, deadline); - if (fhcrc) updateCrc(buffer, 0, 2); - int xlen = buffer.readShortLe() & 0xffff; - require(xlen, deadline); - if (fhcrc) updateCrc(buffer, 0, xlen); - buffer.skip(xlen); + source.require(2, deadline); + if (fhcrc) updateCrc(source.buffer, 0, 2); + int xlen = source.buffer.readShortLe() & 0xffff; + source.require(xlen, deadline); + if (fhcrc) updateCrc(source.buffer, 0, xlen); + source.skip(xlen, deadline); } // Skip an optional 0-terminated name. @@ -135,9 +124,9 @@ private void consumeHeader(Deadline deadline) throws IOException { // |...original file name, zero-terminated...| (more-->) // +=========================================+ if (((flags >> FNAME) & 1) == 1) { - long index = OkBuffers.seek(buffer, (byte) 0, source, deadline); - if (fhcrc) updateCrc(buffer, 0, index + 1); - buffer.skip(index + 1); + long index = source.seek((byte) 0, deadline); + if (fhcrc) updateCrc(source.buffer, 0, index + 1); + source.buffer.skip(index + 1); } // Skip an optional 0-terminated comment. @@ -145,9 +134,9 @@ private void consumeHeader(Deadline deadline) throws IOException { // |...file comment, zero-terminated...| (more-->) // +===================================+ if (((flags >> FCOMMENT) & 1) == 1) { - long index = OkBuffers.seek(buffer, (byte) 0, source, deadline); - if (fhcrc) updateCrc(buffer, 0, index + 1); - buffer.skip(index + 1); + long index = source.seek((byte) 0, deadline); + if (fhcrc) updateCrc(source.buffer, 0, index + 1); + source.skip(index + 1, deadline); } // Confirm the optional header CRC. @@ -155,7 +144,7 @@ private void consumeHeader(Deadline deadline) throws IOException { // | CRC16 | // +---+---+ if (fhcrc) { - checkEqual("FHCRC", buffer.readShortLe(), (short) crc.getValue()); + checkEqual("FHCRC", source.readShortLe(), (short) crc.getValue()); crc.reset(); } } @@ -165,9 +154,8 @@ private void consumeTrailer(Deadline deadline) throws IOException { // +---+---+---+---+---+---+---+---+ // | CRC32 | ISIZE | // +---+---+---+---+---+---+---+---+ - require(8, deadline); - checkEqual("CRC", buffer.readIntLe(), (int) crc.getValue()); - checkEqual("ISIZE", buffer.readIntLe(), inflater.getTotalOut()); + checkEqual("CRC", source.readIntLe(), (int) crc.getValue()); + checkEqual("ISIZE", source.readIntLe(), inflater.getTotalOut()); } @Override public void close(Deadline deadline) throws IOException { @@ -187,13 +175,6 @@ private void updateCrc(OkBuffer buffer, long offset, long byteCount) { } } - /** Fills the buffer with at least {@code byteCount} bytes. */ - private void require(int byteCount, Deadline deadline) throws IOException { - while (buffer.byteCount < byteCount) { - if (source.read(buffer, Segment.SIZE, deadline) == -1) throw new EOFException(); - } - } - private void checkEqual(String name, int expected, int actual) throws IOException { if (actual != expected) { throw new IOException(String.format( diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/InflaterSource.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/InflaterSource.java index 4383926eb0e3..ce67f12072ed 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/InflaterSource.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/InflaterSource.java @@ -22,10 +22,8 @@ /** A source that inflates another source. */ public final class InflaterSource implements Source { - private final Source source; + private final BufferedSource source; private final Inflater inflater; - /** This holds bytes read from the source, but not yet inflated. */ - private final OkBuffer buffer; /** * When we call Inflater.setInput(), the inflater keeps our byte array until @@ -36,15 +34,19 @@ public final class InflaterSource implements Source { private boolean closed; public InflaterSource(Source source, Inflater inflater) { - this(source, inflater, new OkBuffer()); + this(new BufferedSource(source, new OkBuffer()), inflater); } - InflaterSource(Source source, Inflater inflater, OkBuffer buffer) { + /** + * This package-private constructor shares a buffer with its trusted caller. + * In general we can't share a BufferedSource because the inflater holds input + * bytes until they are inflated. + */ + InflaterSource(BufferedSource source, Inflater inflater) { if (source == null) throw new IllegalArgumentException("source == null"); if (inflater == null) throw new IllegalArgumentException("inflater == null"); this.source = source; this.inflater = inflater; - this.buffer = buffer; } @Override public long read( @@ -87,13 +89,11 @@ public boolean refill(Deadline deadline) throws IOException { releaseInflatedBytes(); if (inflater.getRemaining() != 0) throw new IllegalStateException("?"); // TODO: possible? - // Refill the buffer with compressed data from the source. - if (buffer.byteCount == 0) { - if (source.read(buffer, Segment.SIZE, deadline) == -1) return true; - } + // If there are compressed bytes in the source, assign them to the inflater. + if (source.exhausted(deadline)) return true; // Assign buffer bytes to the inflater. - Segment head = buffer.head; + Segment head = source.buffer.head; bufferBytesHeldByInflater = head.limit - head.pos; inflater.setInput(head.data, head.pos, bufferBytesHeldByInflater); return false; @@ -104,13 +104,12 @@ private void releaseInflatedBytes() { if (bufferBytesHeldByInflater == 0) return; int toRelease = bufferBytesHeldByInflater - inflater.getRemaining(); bufferBytesHeldByInflater -= toRelease; - buffer.skip(toRelease); + source.buffer.skip(toRelease); } @Override public void close(Deadline deadline) throws IOException { if (closed) return; inflater.end(); - buffer.clear(); closed = true; source.close(deadline); } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffers.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffers.java index ff4775c5e825..47c185a45209 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffers.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffers.java @@ -15,7 +15,6 @@ */ package com.squareup.okhttp.internal.bytes; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -26,22 +25,6 @@ public final class OkBuffers { private OkBuffers() { } - /** - * Returns the index of {@code b} in {@code buffer}, refilling it if necessary - * until it is found. This reads an unbounded number of bytes into {@code - * buffer}. - */ - public static long seek(OkBuffer buffer, byte b, Source source, Deadline deadline) - throws IOException { - long start = 0; - long index; - while ((index = buffer.indexOf(b, start)) == -1) { - start = buffer.byteCount; - if (source.read(buffer, Segment.SIZE, deadline) == -1) throw new EOFException(); - } - return index; - } - /** Returns a sink that writes to {@code out}. */ public static Sink sink(final OutputStream out) { return new Sink() { diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameReader.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameReader.java index 0c086c4d6291..9eeac7eb4840 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameReader.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/FrameReader.java @@ -16,10 +16,10 @@ package com.squareup.okhttp.internal.spdy; +import com.squareup.okhttp.internal.bytes.BufferedSource; import com.squareup.okhttp.internal.bytes.ByteString; import java.io.Closeable; import java.io.IOException; -import java.io.InputStream; import java.util.List; /** Reads transport frames for SPDY/3 or HTTP/2. */ @@ -28,7 +28,8 @@ public interface FrameReader extends Closeable { boolean nextFrame(Handler handler) throws IOException; public interface Handler { - void data(boolean inFinished, int streamId, InputStream in, int length) throws IOException; + void data(boolean inFinished, int streamId, BufferedSource source, int length) + throws IOException; /** * Create or update incoming headers, creating the corresponding streams diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/HpackDraft05.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/HpackDraft05.java index 9aaf3c69dbdf..418c4310e493 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/HpackDraft05.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/HpackDraft05.java @@ -3,6 +3,7 @@ import com.squareup.okhttp.internal.BitArray; import com.squareup.okhttp.internal.bytes.BufferedSource; import com.squareup.okhttp.internal.bytes.ByteString; +import com.squareup.okhttp.internal.bytes.Deadline; import com.squareup.okhttp.internal.bytes.OkBuffer; import com.squareup.okhttp.internal.bytes.Source; import java.io.IOException; @@ -180,7 +181,7 @@ private int evictToRecoverBytes(int bytesToRecover) { * set of emitted headers. */ void readHeaders() throws IOException { - while (!source.exhausted()) { + while (!source.exhausted(Deadline.NONE)) { int b = source.readByte() & 0xff; if (b == 0x80) { // 10000000 clearReferenceSet(); diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java index 98b8c7f7ff7e..9f2228a48918 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java @@ -187,7 +187,7 @@ private void readData(Handler handler, short length, byte flags, int streamId) throws IOException { boolean inFinished = (flags & FLAG_END_STREAM) != 0; // TODO: checkState open or half-closed (local) or raise STREAM_CLOSED - handler.data(inFinished, streamId, source.inputStream(), length); + handler.data(inFinished, streamId, source, length); } private void readPriority(Handler handler, short length, byte flags, int streamId) diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java index 0a4e629bae67..e97aeac1d7e7 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java @@ -189,7 +189,7 @@ static final class Reader implements FrameReader { } else { int streamId = w1 & 0x7fffffff; boolean inFinished = (flags & FLAG_FIN) != 0; - handler.data(inFinished, streamId, source.inputStream(), length); + handler.data(inFinished, streamId, source, length); return true; } } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java index 2eb7a49c294a..c4bc9ff08ed6 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyConnection.java @@ -18,7 +18,9 @@ import com.squareup.okhttp.Protocol; import com.squareup.okhttp.internal.NamedRunnable; import com.squareup.okhttp.internal.Util; +import com.squareup.okhttp.internal.bytes.BufferedSource; import com.squareup.okhttp.internal.bytes.ByteString; +import com.squareup.okhttp.internal.bytes.Deadline; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -526,15 +528,15 @@ private Reader() { } } - @Override public void data(boolean inFinished, int streamId, InputStream in, int length) + @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length) throws IOException { SpdyStream dataStream = getStream(streamId); if (dataStream == null) { writeSynResetLater(streamId, ErrorCode.INVALID_STREAM); - Util.skipByReading(in, length); + source.skip(length, Deadline.NONE); return; } - dataStream.receiveData(in, length); + dataStream.receiveData(source, length); if (inFinished) { dataStream.receiveFin(); } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java index e36c8ebadf0a..8166cb953e78 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/SpdyStream.java @@ -17,6 +17,11 @@ package com.squareup.okhttp.internal.spdy; import com.squareup.okhttp.internal.Util; +import com.squareup.okhttp.internal.bytes.BufferedSource; +import com.squareup.okhttp.internal.bytes.Deadline; +import com.squareup.okhttp.internal.bytes.OkBuffer; +import com.squareup.okhttp.internal.bytes.Source; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; @@ -35,9 +40,9 @@ public final class SpdyStream { // blocking operations are performed while the lock is held. /** - * The total number of bytes consumed by the application - * (with {@link SpdyDataInputStream#read}), but not yet acknowledged by - * sending a {@code WINDOW_UPDATE} frame on this stream. + * The total number of bytes consumed by the application (with {@link + * SpdyDataSource#read}), but not yet acknowledged by sending a {@code + * WINDOW_UPDATE} frame on this stream. */ // Visible for testing long unacknowledgedBytesRead = 0; @@ -61,7 +66,8 @@ public final class SpdyStream { /** Headers sent in the stream reply. Null if reply is either not sent or not sent yet. */ private List
responseHeaders; - private final SpdyDataInputStream in; + private final SpdyDataSource source; + private final InputStream in; final SpdyDataOutputStream out; /** @@ -78,9 +84,10 @@ public final class SpdyStream { this.id = id; this.connection = connection; this.bytesLeftInWriteWindow = connection.peerSettings.getInitialWindowSize(); - this.in = new SpdyDataInputStream(connection.okHttpSettings.getInitialWindowSize()); + this.source = new SpdyDataSource(connection.okHttpSettings.getInitialWindowSize()); + this.in = new BufferedSource(source, new OkBuffer()).inputStream(); this.out = new SpdyDataOutputStream(); - this.in.finished = inFinished; + this.source.finished = inFinished; this.out.finished = outFinished; this.priority = priority; this.requestHeaders = requestHeaders; @@ -100,7 +107,9 @@ public synchronized boolean isOpen() { if (errorCode != null) { return false; } - if ((in.finished || in.closed) && (out.finished || out.closed) && responseHeaders != null) { + if ((source.finished || source.closed) + && (out.finished || out.closed) + && responseHeaders != null) { return false; } return true; @@ -251,7 +260,7 @@ private boolean closeInternal(ErrorCode errorCode) { if (this.errorCode != null) { return false; } - if (in.finished && out.finished) { + if (source.finished && out.finished) { return false; } this.errorCode = errorCode; @@ -292,16 +301,16 @@ void receiveHeaders(List
headers, HeadersMode headersMode) { } } - void receiveData(InputStream in, int length) throws IOException { + void receiveData(BufferedSource in, int length) throws IOException { assert (!Thread.holdsLock(SpdyStream.this)); - this.in.receive(in, length); + this.source.receive(in, length); } void receiveFin() { assert (!Thread.holdsLock(SpdyStream.this)); boolean open; synchronized (this) { - this.in.finished = true; + this.source.finished = true; open = isOpen(); notifyAll(); } @@ -322,36 +331,19 @@ int getPriority() { } /** - * An input stream that reads the incoming data frames of a stream. Although - * this class uses synchronization to safely receive incoming data frames, - * it is not intended for use by multiple readers. + * A source that reads the incoming data frames of a stream. Although this + * class uses synchronization to safely receive incoming data frames, it is + * not intended for use by multiple readers. */ - private final class SpdyDataInputStream extends InputStream { - - // Store incoming data bytes in a circular buffer. When the buffer is - // empty, pos == -1. Otherwise pos is the first byte to read and limit - // is the first byte to write. - // - // { - - - X X X X - - - } - // ^ ^ - // pos limit - // - // { X X X - - - - X X X } - // ^ ^ - // limit pos - private final byte[] buffer; - - private SpdyDataInputStream(int bufferLength) { - // TODO: We probably need to change to growable buffers here pretty soon. - // Otherwise we have a performance problem where we pay for 64 KiB even if we aren't using it. - buffer = connection.bufferPool.getBuf(bufferLength); - } + private final class SpdyDataSource implements Source { + /** Buffer to receive data from the network into. Only accessed by the reader thread. */ + private final OkBuffer receiveBuffer = new OkBuffer(); - /** the next byte to be read, or -1 if the buffer is empty. Never buffer.length */ - private int pos = -1; + /** Buffer with readable data. Guarded by SpdyStream.this. */ + private final OkBuffer readBuffer = new OkBuffer(); - /** the last byte to be read. Never buffer.length */ - private int limit; + /** Maximum number of bytes to buffer before reporting a flow control error. */ + private final long maxByteCount; /** True if the caller has closed this stream. */ private boolean closed; @@ -362,75 +354,42 @@ private SpdyDataInputStream(int bufferLength) { */ private boolean finished; - @Override public int available() throws IOException { - synchronized (SpdyStream.this) { - checkNotClosed(); - if (pos == -1) { - return 0; - } else if (limit > pos) { - return limit - pos; - } else { - return limit + (buffer.length - pos); - } - } + private SpdyDataSource(long maxByteCount) { + this.maxByteCount = maxByteCount; } - @Override public int read() throws IOException { - return Util.readSingleByte(this); - } + @Override public long read(OkBuffer sink, long byteCount, Deadline deadline) + throws IOException { + if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); - @Override public int read(byte[] b, int offset, int count) throws IOException { - int copied = 0; + long read; synchronized (SpdyStream.this) { - checkOffsetAndCount(b.length, offset, count); waitUntilReadable(); checkNotClosed(); + if (readBuffer.byteCount() == 0) return -1; // This source is exhausted. - if (pos == -1) { - return -1; - } - - // drain from [pos..buffer.length) - if (limit <= pos) { - int bytesToCopy = Math.min(count, buffer.length - pos); - System.arraycopy(buffer, pos, b, offset, bytesToCopy); - pos += bytesToCopy; - copied += bytesToCopy; - if (pos == buffer.length) { - pos = 0; - } - } - - // drain from [pos..limit) - if (copied < count) { - int bytesToCopy = Math.min(limit - pos, count - copied); - System.arraycopy(buffer, pos, b, offset + copied, bytesToCopy); - pos += bytesToCopy; - copied += bytesToCopy; - } + // Move bytes from the read buffer into the caller's buffer. + read = readBuffer.read(sink, Math.min(byteCount, readBuffer.byteCount()), deadline); // Flow control: notify the peer that we're ready for more data! - unacknowledgedBytesRead += copied; + unacknowledgedBytesRead += read; if (unacknowledgedBytesRead >= connection.okHttpSettings.getInitialWindowSize() / 2) { connection.writeWindowUpdateLater(id, unacknowledgedBytesRead); unacknowledgedBytesRead = 0; } - - if (pos == limit) { - pos = -1; - limit = 0; - } } + // Update connection.unacknowledgedBytesRead outside the stream lock. synchronized (connection) { // Multiple application threads may hit this section. - connection.unacknowledgedBytesRead += copied; + connection.unacknowledgedBytesRead += read; if (connection.unacknowledgedBytesRead >= connection.okHttpSettings.getInitialWindowSize() / 2) { connection.writeWindowUpdateLater(0, connection.unacknowledgedBytesRead); connection.unacknowledgedBytesRead = 0; } } - return copied; + + return read; } /** @@ -446,7 +405,7 @@ private void waitUntilReadable() throws IOException { remaining = readTimeoutMillis; } try { - while (pos == -1 && !finished && !closed && errorCode == null) { + while (readBuffer.byteCount() == 0 && !finished && !closed && errorCode == null) { if (readTimeoutMillis == 0) { SpdyStream.this.wait(); } else if (remaining > 0) { @@ -461,71 +420,51 @@ private void waitUntilReadable() throws IOException { } } - void receive(InputStream in, int byteCount) throws IOException { + void receive(BufferedSource in, long byteCount) throws IOException { assert (!Thread.holdsLock(SpdyStream.this)); - if (byteCount == 0) { - return; - } - - int pos; - int limit; - int firstNewByte; - boolean finished; - boolean flowControlError; - synchronized (SpdyStream.this) { - finished = this.finished; - pos = this.pos; - firstNewByte = this.limit; - limit = this.limit; - flowControlError = byteCount > buffer.length - available(); - } - - // If the peer sends more data than we can handle, discard it and close the connection. - if (flowControlError) { - Util.skipByReading(in, byteCount); - closeLater(ErrorCode.FLOW_CONTROL_ERROR); - return; - } + while (byteCount > 0) { + boolean finished; + boolean flowControlError; + synchronized (SpdyStream.this) { + finished = this.finished; + flowControlError = byteCount + readBuffer.byteCount() > maxByteCount; + } - // Discard data received after the stream is finished. It's probably a benign race. - if (finished) { - Util.skipByReading(in, byteCount); - return; - } + // If the peer sends more data than we can handle, discard it and close the connection. + if (flowControlError) { + in.skip(byteCount, Deadline.NONE); + closeLater(ErrorCode.FLOW_CONTROL_ERROR); + return; + } - // Fill the buffer without holding any locks. First fill [limit..buffer.length) if that - // won't overwrite unread data. Then fill [limit..pos). We can't hold a lock, otherwise - // writes will be blocked until reads complete. - if (pos < limit) { - int firstCopyCount = Math.min(byteCount, buffer.length - limit); - Util.readFully(in, buffer, limit, firstCopyCount); - limit += firstCopyCount; - byteCount -= firstCopyCount; - if (limit == buffer.length) { - limit = 0; + // Discard data received after the stream is finished. It's probably a benign race. + if (finished) { + in.skip(byteCount, Deadline.NONE); + return; } - } - if (byteCount > 0) { - Util.readFully(in, buffer, limit, byteCount); - limit += byteCount; - } - synchronized (SpdyStream.this) { - // Update the new limit, and mark the position as readable if necessary. - this.limit = limit; - if (this.pos == -1) { - this.pos = firstNewByte; - SpdyStream.this.notifyAll(); + // Fill the receive buffer without holding any locks. + long read = in.read(receiveBuffer, byteCount, Deadline.NONE); + if (read == -1) throw new EOFException(); + byteCount -= read; + + // Move the received data to the read buffer to the reader can read it. + synchronized (SpdyStream.this) { + boolean wasEmpty = readBuffer.byteCount() == 0; + readBuffer.write(receiveBuffer, receiveBuffer.byteCount(), Deadline.NONE); + if (wasEmpty) { + SpdyStream.this.notifyAll(); + } } } } - @Override public void close() throws IOException { + @Override public void close(Deadline deadline) throws IOException { synchronized (SpdyStream.this) { closed = true; + readBuffer.clear(); SpdyStream.this.notifyAll(); - SpdyStream.this.connection.bufferPool.returnBuf(buffer); } cancelStreamIfNecessary(); } @@ -545,7 +484,7 @@ private void cancelStreamIfNecessary() throws IOException { boolean open; boolean cancel; synchronized (this) { - cancel = !in.finished && in.closed && (out.finished || out.closed); + cancel = !source.finished && source.closed && (out.finished || out.closed); open = isOpen(); } if (cancel) { diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/BaseTestHandler.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/BaseTestHandler.java index 67384e406ea2..e990a8ffdb27 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/BaseTestHandler.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/BaseTestHandler.java @@ -15,15 +15,15 @@ */ package com.squareup.okhttp.internal.spdy; +import com.squareup.okhttp.internal.bytes.BufferedSource; import com.squareup.okhttp.internal.bytes.ByteString; import java.io.IOException; -import java.io.InputStream; import java.util.List; import static org.junit.Assert.fail; class BaseTestHandler implements FrameReader.Handler { - @Override public void data(boolean inFinished, int streamId, InputStream in, int length) + @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length) throws IOException { fail(); } diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Http20Draft09Test.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Http20Draft09Test.java index 71ba37800fdd..be9e4e8a34e1 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Http20Draft09Test.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/Http20Draft09Test.java @@ -16,12 +16,12 @@ package com.squareup.okhttp.internal.spdy; import com.squareup.okhttp.internal.Util; +import com.squareup.okhttp.internal.bytes.BufferedSource; import com.squareup.okhttp.internal.bytes.ByteString; import com.squareup.okhttp.internal.bytes.OkBuffer; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.io.InputStream; import java.util.Arrays; import java.util.List; import org.junit.Test; @@ -348,14 +348,13 @@ public void pushPromise(int streamId, int promisedStreamId, List
headerB FrameReader fr = newReader(out); fr.nextFrame(new BaseTestHandler() { - @Override public void data(boolean inFinished, int streamId, InputStream in, int length) - throws IOException { + @Override public void data( + boolean inFinished, int streamId, BufferedSource source, int length) throws IOException { assertFalse(inFinished); assertEquals(expectedStreamId, streamId); assertEquals(16383, length); - byte[] data = new byte[length]; - Util.readFully(in, data); - for (byte b : data){ + ByteString data = source.readByteString(length); + for (byte b : data.toByteArray()){ assertEquals(2, b); } } diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java index bc51ea4049ac..e4a6b07dd9fb 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/spdy/MockSpdyPeer.java @@ -17,6 +17,7 @@ package com.squareup.okhttp.internal.spdy; import com.squareup.okhttp.internal.Util; +import com.squareup.okhttp.internal.bytes.BufferedSource; import com.squareup.okhttp.internal.bytes.ByteString; import java.io.ByteArrayOutputStream; import java.io.Closeable; @@ -39,7 +40,7 @@ public final class MockSpdyPeer implements Closeable { private boolean client = false; private Variant variant = new Spdy3(); private final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - private FrameWriter frameWriter = variant.newWriter(bytesOut, client);; + private FrameWriter frameWriter = variant.newWriter(bytesOut, client); private final List outFrames = new ArrayList(); private final BlockingQueue inFrames = new LinkedBlockingQueue(); private int port; @@ -232,14 +233,13 @@ public InFrame(int sequence, FrameReader reader) { this.headersMode = headersMode; } - @Override public void data(boolean inFinished, int streamId, InputStream in, int length) + @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length) throws IOException { if (this.type != -1) throw new IllegalStateException(); this.type = Spdy3.TYPE_DATA; this.inFinished = inFinished; this.streamId = streamId; - this.data = new byte[length]; - Util.readFully(in, this.data); + this.data = source.readByteString(length).toByteArray(); } @Override public void rstStream(int streamId, ErrorCode errorCode) {