From 9c6a433725364dc8d4639e712aef8ca9c9eeec20 Mon Sep 17 00:00:00 2001 From: jwilson Date: Tue, 11 Feb 2014 20:48:38 -0500 Subject: [PATCH] BufferedSource. The Source API is nice for source implementors: no annoying skip method, no annoying available() method, just one API to supply bytes to the caller. But it isn't as nice of an API for source callers. It lacks convenient APIs! This bridges the gap. Calling code should use BufferedSource, and implementing code should implement Source. --- .../okhttp/internal/bytes/BufferedSource.java | 166 ++++++++++++++++++ .../okhttp/internal/bytes/OkBuffer.java | 4 +- .../okhttp/internal/bytes/OkBuffers.java | 90 ---------- .../okhttp/internal/spdy/HpackDraft05.java | 19 +- .../okhttp/internal/spdy/Http20Draft09.java | 75 ++++---- .../internal/spdy/NameValueBlockReader.java | 27 ++- .../squareup/okhttp/internal/spdy/Spdy3.java | 58 +++--- .../okhttp/internal/bytes/OkBufferTest.java | 60 +++---- 8 files changed, 272 insertions(+), 227 deletions(-) create mode 100644 okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/BufferedSource.java diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/BufferedSource.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/BufferedSource.java new file mode 100644 index 000000000000..96063d54ce32 --- /dev/null +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/BufferedSource.java @@ -0,0 +1,166 @@ +/* + * Copyright (C) 2014 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.squareup.okhttp.internal.bytes; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +import static com.squareup.okhttp.internal.Util.checkOffsetAndCount; + +/** + * A source that keeps a buffer internally so that callers can do small reads + * without a performance penalty. + */ +public final class BufferedSource implements Source { + public final OkBuffer buffer; + public final Source source; + private boolean closed; + + public BufferedSource(Source source, OkBuffer buffer) { + this.buffer = buffer; + this.source = source; + } + + @Override public long read(OkBuffer sink, long byteCount, Deadline deadline) + throws IOException { + if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); + if (closed) throw new IllegalStateException("closed"); + + if (buffer.byteCount == 0) { + long read = source.read(buffer, Segment.SIZE, deadline); + if (read == -1) return -1; + } + + long toRead = Math.min(byteCount, buffer.byteCount); + return buffer.read(sink, toRead, deadline); + } + + /** + * Returns true if there are no more bytes in the buffer or the source. This + * will block until there are bytes to read or the source is definitely + * exhausted. + */ + public boolean exhausted() throws IOException { + return buffer.byteCount() == 0 && source.read(buffer, Segment.SIZE, Deadline.NONE) == -1; + } + + /** + * Returns when the buffer contains at least {@code byteCount} bytes. Throws + * an {@link EOFException} if the source is exhausted before the required + * bytes can be read. + */ + void require(long byteCount, Deadline deadline) throws IOException { + while (buffer.byteCount < byteCount) { + if (source.read(buffer, Segment.SIZE, deadline) == -1) throw new EOFException(); + } + } + + public byte readByte() throws IOException { + require(1, Deadline.NONE); + return buffer.readByte(); + } + + public ByteString readByteString(int byteCount) throws IOException { + require(byteCount, Deadline.NONE); + return buffer.readByteString(byteCount); + } + + public short readShort() throws IOException { + require(2, Deadline.NONE); + return buffer.readShort(); + } + + public int readInt() throws IOException { + require(4, Deadline.NONE); + return buffer.readInt(); + } + + /** + * Reads and discards {@code byteCount} bytes from {@code source} using {@code + * buffer} as a buffer. Throws an {@link EOFException} if the source is + * exhausted before the requested bytes can be skipped. + */ + public void skip(long byteCount, Deadline deadline) throws IOException { + while (byteCount > 0) { + if (buffer.byteCount == 0 && source.read(buffer, Segment.SIZE, deadline) == -1) { + throw new EOFException(); + } + long toSkip = Math.min(byteCount, buffer.byteCount()); + buffer.skip(toSkip); + byteCount -= toSkip; + } + } + + /** Returns an input stream that reads from this source. */ + public InputStream inputStream() { + return new InputStream() { + @Override public int read() throws IOException { + if (buffer.byteCount == 0) { + long count = source.read(buffer, Segment.SIZE, Deadline.NONE); + if (count == -1) return -1; + } + return buffer.readByte() & 0xff; + } + + @Override public int read(byte[] data, int offset, int byteCount) throws IOException { + checkOffsetAndCount(data.length, offset, byteCount); + + if (buffer.byteCount == 0) { + long count = source.read(buffer, Segment.SIZE, Deadline.NONE); + if (count == -1) return -1; + } + + Segment head = buffer.head; + int toCopy = Math.min(byteCount, head.limit - head.pos); + System.arraycopy(head.data, head.pos, data, offset, toCopy); + + head.pos += toCopy; + buffer.byteCount -= toCopy; + + if (head.pos == head.limit) { + buffer.head = head.pop(); + SegmentPool.INSTANCE.recycle(head); + } + + return toCopy; + } + + @Override public int available() throws IOException { + return (int) Math.min(buffer.byteCount, Integer.MAX_VALUE); + } + + @Override public void close() throws IOException { + BufferedSource.this.close(Deadline.NONE); + } + + @Override public String toString() { + return BufferedSource.this.toString() + ".inputStream()"; + } + }; + } + + @Override public void close(Deadline deadline) throws IOException { + if (closed) return; + closed = true; + source.close(deadline); + buffer.clear(); + } + + @Override public String toString() { + return "BufferedSource(" + source + ")"; + } +} diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java index ecea127c15e0..8352908499af 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffer.java @@ -367,7 +367,9 @@ Segment writableSegment(int minimumCapacity) { // an equivalent buffer [30%, 62%, 82%] and then move the head segment, // yielding sink [51%, 91%, 30%] and source [62%, 82%]. - if (source == this) throw new IllegalArgumentException("source == this"); + if (source == this) { + throw new IllegalArgumentException("source == this"); + } checkOffsetAndCount(source.byteCount, 0, byteCount); while (byteCount > 0) { diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffers.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffers.java index c6f4af837981..ff4775c5e825 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffers.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/bytes/OkBuffers.java @@ -42,35 +42,6 @@ public static long seek(OkBuffer buffer, byte b, Source source, Deadline deadlin return index; } - /** - * Returns when {@code sink} contains at least {@code byteCount} bytes. Throws - * an {@link EOFException} if the source is exhausted before the requested - * bytes can be read. - */ - public static void require(Source source, OkBuffer sink, long byteCount, Deadline deadline) - throws IOException { - while (sink.byteCount < byteCount) { - if (source.read(sink, Segment.SIZE, deadline) == -1) throw new EOFException(); - } - } - - /** - * Reads and discards {@code byteCount} bytes from {@code source} using {@code - * buffer} as a buffer. Throws an {@link EOFException} if the source is - * exhausted before the requested bytes can be skipped. - */ - public static void skip(Source source, OkBuffer buffer, long byteCount, Deadline deadline) - throws IOException { - while (byteCount > 0) { - if (buffer.byteCount == 0 && source.read(buffer, Segment.SIZE, deadline) == -1) { - throw new EOFException(); - } - long toSkip = Math.min(byteCount, buffer.byteCount()); - buffer.skip(toSkip); - byteCount -= toSkip; - } - } - /** Returns a sink that writes to {@code out}. */ public static Sink sink(final OutputStream out) { return new Sink() { @@ -180,65 +151,4 @@ public static Source source(final InputStream in) { } }; } - - /** - * Returns an input stream that reads from {@code source}. This may buffer - * data by reading extra data eagerly. - */ - public static InputStream inputStream(final Source source) { - return inputStream(source, new OkBuffer()); - } - - /** - * Returns a buffered input stream that reads from {@code source}, with {@code - * buffer} as a buffer. Bytes are drawn from {@code buffer}, which is refilled - * from {@code source} when it is empty. This may read extra data eagerly into - * {@code buffer}. - */ - public static InputStream inputStream(final Source source, final OkBuffer buffer) { - return new InputStream() { - @Override public int read() throws IOException { - if (buffer.byteCount == 0) { - long count = source.read(buffer, Segment.SIZE, Deadline.NONE); - if (count == -1) return -1; - } - return buffer.readByte() & 0xff; - } - - @Override public int read(byte[] data, int offset, int byteCount) throws IOException { - checkOffsetAndCount(data.length, offset, byteCount); - - if (buffer.byteCount == 0) { - long count = source.read(buffer, Segment.SIZE, Deadline.NONE); - if (count == -1) return -1; - } - - Segment head = buffer.head; - int toCopy = Math.min(byteCount, head.limit - head.pos); - System.arraycopy(head.data, head.pos, data, offset, toCopy); - - head.pos += toCopy; - buffer.byteCount -= toCopy; - - if (head.pos == head.limit) { - buffer.head = head.pop(); - SegmentPool.INSTANCE.recycle(head); - } - - return toCopy; - } - - @Override public int available() throws IOException { - return (int) Math.min(buffer.byteCount, Integer.MAX_VALUE); - } - - @Override public void close() throws IOException { - source.close(Deadline.NONE); - } - - @Override public String toString() { - return "inputStream(" + source + ")"; - } - }; - } } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/HpackDraft05.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/HpackDraft05.java index 1f0ea21ff081..9aaf3c69dbdf 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/HpackDraft05.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/HpackDraft05.java @@ -1,10 +1,9 @@ package com.squareup.okhttp.internal.spdy; import com.squareup.okhttp.internal.BitArray; +import com.squareup.okhttp.internal.bytes.BufferedSource; import com.squareup.okhttp.internal.bytes.ByteString; -import com.squareup.okhttp.internal.bytes.Deadline; import com.squareup.okhttp.internal.bytes.OkBuffer; -import com.squareup.okhttp.internal.bytes.OkBuffers; import com.squareup.okhttp.internal.bytes.Source; import java.io.IOException; import java.io.OutputStream; @@ -100,8 +99,7 @@ static final class Reader { private final Huffman.Codec huffmanCodec; private final List
emittedHeaders = new ArrayList
(); - private final Source source; - private final OkBuffer buffer = new OkBuffer(); + private final BufferedSource source; private int maxHeaderTableByteCount; // Visible for testing. @@ -127,7 +125,7 @@ static final class Reader { Reader(boolean client, int maxHeaderTableByteCount, Source source) { this.huffmanCodec = client ? Huffman.Codec.RESPONSE : Huffman.Codec.REQUEST; this.maxHeaderTableByteCount = maxHeaderTableByteCount; - this.source = source; + this.source = new BufferedSource(source, new OkBuffer()); } int maxHeaderTableByteCount() { @@ -182,9 +180,8 @@ private int evictToRecoverBytes(int bytesToRecover) { * set of emitted headers. */ void readHeaders() throws IOException { - while (buffer.byteCount() > 0 - || source.read(buffer, 2048, Deadline.NONE) != -1) { - int b = buffer.readByte() & 0xff; + while (!source.exhausted()) { + int b = source.readByte() & 0xff; if (b == 0x80) { // 10000000 clearReferenceSet(); } else if ((b & 0x80) == 0x80) { // 1NNNNNNN @@ -335,8 +332,7 @@ private void insertIntoHeaderTable(int index, Header entry) { } private int readByte() throws IOException { - OkBuffers.require(source, buffer, 1, Deadline.NONE); - return buffer.readByte() & 0xff; + return source.readByte() & 0xff; } int readInt(int firstByte, int prefixMask) throws IOException { @@ -375,8 +371,7 @@ ByteString readByteString(boolean asciiLowercase) throws IOException { huffmanDecode = true; } - OkBuffers.require(source, buffer, length, Deadline.NONE); - ByteString byteString = buffer.readByteString(length); + ByteString byteString = source.readByteString(length); if (huffmanDecode) { byteString = huffmanCodec.decode(byteString); // TODO: streaming Huffman! diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java index 365f4a733cf9..98b8c7f7ff7e 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Http20Draft09.java @@ -16,6 +16,7 @@ package com.squareup.okhttp.internal.spdy; import com.squareup.okhttp.Protocol; +import com.squareup.okhttp.internal.bytes.BufferedSource; import com.squareup.okhttp.internal.bytes.ByteString; import com.squareup.okhttp.internal.bytes.Deadline; import com.squareup.okhttp.internal.bytes.OkBuffer; @@ -68,8 +69,7 @@ public final class Http20Draft09 implements Variant { } static final class Reader implements FrameReader { - private final OkBuffer buffer = new OkBuffer(); - private final Source source; + private final BufferedSource source; private final ContinuationSource continuation; private final boolean client; @@ -77,29 +77,29 @@ static final class Reader implements FrameReader { final HpackDraft05.Reader hpackReader; Reader(Source source, int headerTableSize, boolean client) { - this.source = source; + this.source = new BufferedSource(source, new OkBuffer()); this.client = client; - this.continuation = new ContinuationSource(source, buffer); + this.continuation = new ContinuationSource(this.source); this.hpackReader = new HpackDraft05.Reader(client, headerTableSize, continuation); } @Override public void readConnectionHeader() throws IOException { if (client) return; // Nothing to read; servers don't send connection headers! - OkBuffers.require(source, buffer, CONNECTION_HEADER.size(), Deadline.NONE); - ByteString connectionHeader = buffer.readByteString(CONNECTION_HEADER.size()); + ByteString connectionHeader = source.readByteString(CONNECTION_HEADER.size()); if (!CONNECTION_HEADER.equals(connectionHeader)) { throw ioException("Expected a connection header but was %s", connectionHeader.utf8()); } } @Override public boolean nextFrame(Handler handler) throws IOException { + int w1; + int w2; try { - OkBuffers.require(source, buffer, 8, Deadline.NONE); + w1 = source.readInt(); + w2 = source.readInt(); } catch (IOException e) { return false; // This might be a normal socket close. } - int w1 = buffer.readInt(); - int w2 = buffer.readInt(); // boolean r = (w1 & 0xc0000000) != 0; // Reserved: Ignore first 2 bits. short length = (short) ((w1 & 0x3fff0000) >> 16); // 14-bit unsigned == max 16383 @@ -147,7 +147,7 @@ static final class Reader implements FrameReader { default: // Implementations MUST ignore frames of unsupported or unrecognized types. - OkBuffers.skip(source, buffer, length, Deadline.NONE); + source.skip(length, Deadline.NONE); } return true; } @@ -160,8 +160,7 @@ private void readHeaders(Handler handler, short length, byte flags, int streamId int priority = -1; if ((flags & FLAG_PRIORITY) != 0) { - OkBuffers.require(source, buffer, 4, Deadline.NONE); - priority = buffer.readInt() & 0x7fffffff; + priority = source.readInt() & 0x7fffffff; length -= 4; // account for above read. } @@ -188,15 +187,14 @@ private void readData(Handler handler, short length, byte flags, int streamId) throws IOException { boolean inFinished = (flags & FLAG_END_STREAM) != 0; // TODO: checkState open or half-closed (local) or raise STREAM_CLOSED - handler.data(inFinished, streamId, OkBuffers.inputStream(source, buffer), length); + handler.data(inFinished, streamId, source.inputStream(), length); } private void readPriority(Handler handler, short length, byte flags, int streamId) throws IOException { if (length != 4) throw ioException("TYPE_PRIORITY length: %d != 4", length); if (streamId == 0) throw ioException("TYPE_PRIORITY streamId == 0"); - OkBuffers.require(source, buffer, 4, Deadline.NONE); - int w1 = buffer.readInt(); + int w1 = source.readInt(); // boolean r = (w1 & 0x80000000) != 0; // Reserved. int priority = (w1 & 0x7fffffff); handler.priority(streamId, priority); @@ -206,8 +204,7 @@ private void readRstStream(Handler handler, short length, byte flags, int stream throws IOException { if (length != 4) throw ioException("TYPE_RST_STREAM length: %d != 4", length); if (streamId == 0) throw ioException("TYPE_RST_STREAM streamId == 0"); - OkBuffers.require(source, buffer, 4, Deadline.NONE); - int errorCodeInt = buffer.readInt(); + int errorCodeInt = source.readInt(); ErrorCode errorCode = ErrorCode.fromHttp2(errorCodeInt); if (errorCode == null) { throw ioException("TYPE_RST_STREAM unexpected error code: %d", errorCodeInt); @@ -226,10 +223,9 @@ private void readSettings(Handler handler, short length, byte flags, int streamI if (length % 8 != 0) throw ioException("TYPE_SETTINGS length %% 8 != 0: %s", length); Settings settings = new Settings(); - OkBuffers.require(source, buffer, length, Deadline.NONE); for (int i = 0; i < length; i += 8) { - int w1 = buffer.readInt(); - int value = buffer.readInt(); + int w1 = source.readInt(); + int value = source.readInt(); // int r = (w1 & 0xff000000) >>> 24; // Reserved. int id = w1 & 0xffffff; settings.set(id, 0, value); @@ -245,8 +241,7 @@ private void readPushPromise(Handler handler, short length, byte flags, int stre if (streamId == 0) { throw ioException("PROTOCOL_ERROR: TYPE_PUSH_PROMISE streamId == 0"); } - OkBuffers.require(source, buffer, 4, Deadline.NONE); - int promisedStreamId = buffer.readInt() & 0x7fffffff; + int promisedStreamId = source.readInt() & 0x7fffffff; length -= 4; // account for above read. List
headerBlock = readHeaderBlock(length, flags, streamId); handler.pushPromise(streamId, promisedStreamId, headerBlock); @@ -256,9 +251,8 @@ private void readPing(Handler handler, short length, byte flags, int streamId) throws IOException { if (length != 8) throw ioException("TYPE_PING length != 8: %s", length); if (streamId != 0) throw ioException("TYPE_PING streamId != 0"); - OkBuffers.require(source, buffer, 8, Deadline.NONE); - int payload1 = buffer.readInt(); - int payload2 = buffer.readInt(); + int payload1 = source.readInt(); + int payload2 = source.readInt(); boolean ack = (flags & FLAG_ACK) != 0; handler.ping(ack, payload1, payload2); } @@ -267,9 +261,8 @@ private void readGoAway(Handler handler, short length, byte flags, int streamId) throws IOException { if (length < 8) throw ioException("TYPE_GOAWAY length < 8: %s", length); if (streamId != 0) throw ioException("TYPE_GOAWAY streamId != 0"); - OkBuffers.require(source, buffer, 8, Deadline.NONE); - int lastStreamId = buffer.readInt(); - int errorCodeInt = buffer.readInt(); + int lastStreamId = source.readInt(); + int errorCodeInt = source.readInt(); int opaqueDataLength = length - 8; ErrorCode errorCode = ErrorCode.fromHttp2(errorCodeInt); if (errorCode == null) { @@ -277,8 +270,7 @@ private void readGoAway(Handler handler, short length, byte flags, int streamId) } ByteString debugData = ByteString.EMPTY; if (opaqueDataLength > 0) { // Must read debug data in order to not corrupt the connection. - OkBuffers.require(source, buffer, opaqueDataLength, Deadline.NONE); - debugData = buffer.readByteString(opaqueDataLength); + debugData = source.readByteString(opaqueDataLength); } handler.goAway(lastStreamId, errorCode, debugData); } @@ -286,8 +278,7 @@ private void readGoAway(Handler handler, short length, byte flags, int streamId) private void readWindowUpdate(Handler handler, short length, byte flags, int streamId) throws IOException { if (length != 4) throw ioException("TYPE_WINDOW_UPDATE length !=4: %s", length); - OkBuffers.require(source, buffer, 4, Deadline.NONE); - long increment = (buffer.readInt() & 0x7fffffff); + long increment = (source.readInt() & 0x7fffffff); if (increment == 0) throw ioException("windowSizeIncrement was 0", increment); handler.windowUpdate(streamId, increment); } @@ -490,8 +481,7 @@ private static IOException ioException(String message, Object... args) throws IO * HpackDraft05.Reader#readHeaders()}. */ static final class ContinuationSource implements Source { - private final Source source; - private final OkBuffer buffer; + private final BufferedSource source; int length; byte flags; @@ -499,23 +489,19 @@ static final class ContinuationSource implements Source { int left; - public ContinuationSource(Source source, OkBuffer buffer) { + public ContinuationSource(BufferedSource source) { this.source = source; - this.buffer = buffer; } @Override public long read(OkBuffer sink, long byteCount, Deadline deadline) throws IOException { while (left == 0) { if ((flags & FLAG_END_HEADERS) != 0) return -1; - readContinuationHeader(deadline); + readContinuationHeader(); // TODO: test case for empty continuation header? } - long toRead = Math.min(byteCount, left); - long read = buffer.byteCount() > 0 - ? buffer.read(sink, toRead, deadline) - : source.read(sink, toRead, deadline); + long read = source.read(sink, Math.min(byteCount, left), deadline); if (read == -1) return -1; left -= read; return read; @@ -524,11 +510,10 @@ public ContinuationSource(Source source, OkBuffer buffer) { @Override public void close(Deadline deadline) throws IOException { } - private void readContinuationHeader(Deadline deadline) throws IOException { - OkBuffers.require(source, buffer, 8, deadline); + private void readContinuationHeader() throws IOException { int previousStreamId = streamId; - int w1 = buffer.readInt(); - int w2 = buffer.readInt(); + int w1 = source.readInt(); + int w2 = source.readInt(); length = left = (short) ((w1 & 0x3fff0000) >> 16); byte type = (byte) ((w1 & 0xff00) >> 8); flags = (byte) (w1 & 0xff); diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/NameValueBlockReader.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/NameValueBlockReader.java index 9efdbdf75100..b78ac60f346d 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/NameValueBlockReader.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/NameValueBlockReader.java @@ -1,10 +1,10 @@ package com.squareup.okhttp.internal.spdy; +import com.squareup.okhttp.internal.bytes.BufferedSource; import com.squareup.okhttp.internal.bytes.ByteString; import com.squareup.okhttp.internal.bytes.Deadline; import com.squareup.okhttp.internal.bytes.InflaterSource; import com.squareup.okhttp.internal.bytes.OkBuffer; -import com.squareup.okhttp.internal.bytes.OkBuffers; import com.squareup.okhttp.internal.bytes.Source; import java.io.IOException; import java.util.ArrayList; @@ -28,10 +28,10 @@ class NameValueBlockReader { */ private int compressedLimit; - /** This buffer holds inflated bytes read from inflaterSource. */ - private final OkBuffer inflatedBuffer = new OkBuffer(); + /** This source holds inflated bytes. */ + private final BufferedSource source; - public NameValueBlockReader(final OkBuffer sourceBuffer, final Source source) { + public NameValueBlockReader(final BufferedSource source) { // Limit the inflater input stream to only those bytes in the Name/Value // block. We cut the inflater off at its source because we can't predict the // ratio of compressed bytes to uncompressed bytes. @@ -39,10 +39,7 @@ public NameValueBlockReader(final OkBuffer sourceBuffer, final Source source) { @Override public long read(OkBuffer sink, long byteCount, Deadline deadline) throws IOException { if (compressedLimit == 0) return -1; // Out of data for the current block. - byteCount = Math.min(byteCount, compressedLimit); - long read = sourceBuffer.byteCount() > 0 - ? sourceBuffer.read(sink, byteCount, deadline) - : source.read(sink, byteCount, deadline); + long read = source.read(sink, Math.min(byteCount, compressedLimit), deadline); if (read == -1) return -1; compressedLimit -= read; return read; @@ -66,14 +63,14 @@ public NameValueBlockReader(final OkBuffer sourceBuffer, final Source source) { } }; - inflaterSource = new InflaterSource(throttleSource, inflater); + this.inflaterSource = new InflaterSource(throttleSource, inflater); + this.source = new BufferedSource(inflaterSource, new OkBuffer()); } public List
readNameValueBlock(int length) throws IOException { this.compressedLimit += length; - OkBuffers.require(inflaterSource, inflatedBuffer, 4, Deadline.NONE); - int numberOfPairs = inflatedBuffer.readInt(); + int numberOfPairs = source.readInt(); if (numberOfPairs < 0) throw new IOException("numberOfPairs < 0: " + numberOfPairs); if (numberOfPairs > 1024) throw new IOException("numberOfPairs > 1024: " + numberOfPairs); @@ -90,10 +87,8 @@ public List
readNameValueBlock(int length) throws IOException { } private ByteString readByteString() throws IOException { - OkBuffers.require(inflaterSource, inflatedBuffer, 4, Deadline.NONE); - int length = inflatedBuffer.readInt(); - OkBuffers.require(inflaterSource, inflatedBuffer, length, Deadline.NONE); - return inflatedBuffer.readByteString(length); + int length = source.readInt(); + return source.readByteString(length); } private void doneReading() throws IOException { @@ -107,6 +102,6 @@ private void doneReading() throws IOException { } public void close(Deadline deadline) throws IOException { - inflaterSource.close(deadline); + source.close(deadline); } } diff --git a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java index d9c6ee1b1fe5..0a4e629bae67 100644 --- a/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java +++ b/okhttp-protocols/src/main/java/com/squareup/okhttp/internal/spdy/Spdy3.java @@ -18,6 +18,7 @@ import com.squareup.okhttp.Protocol; import com.squareup.okhttp.internal.Platform; import com.squareup.okhttp.internal.Util; +import com.squareup.okhttp.internal.bytes.BufferedSource; import com.squareup.okhttp.internal.bytes.ByteString; import com.squareup.okhttp.internal.bytes.Deadline; import com.squareup.okhttp.internal.bytes.OkBuffer; @@ -109,14 +110,13 @@ final class Spdy3 implements Variant { /** Read spdy/3 frames. */ static final class Reader implements FrameReader { - private final OkBuffer buffer = new OkBuffer(); - private final Source source; + private final BufferedSource source; private final boolean client; private final NameValueBlockReader headerBlockReader; Reader(Source source, boolean client) { - this.source = source; - this.headerBlockReader = new NameValueBlockReader(buffer, source); + this.source = new BufferedSource(source, new OkBuffer()); + this.headerBlockReader = new NameValueBlockReader(this.source); this.client = client; } @@ -128,13 +128,14 @@ static final class Reader implements FrameReader { * more frames on the stream. */ @Override public boolean nextFrame(Handler handler) throws IOException { + int w1; + int w2; try { - OkBuffers.require(source, buffer, 8, Deadline.NONE); + w1 = source.readInt(); + w2 = source.readInt(); } catch (IOException e) { return false; // This might be a normal socket close. } - int w1 = buffer.readInt(); - int w2 = buffer.readInt(); boolean control = (w1 & 0x80000000) != 0; int flags = (w2 & 0xff000000) >>> 24; @@ -182,22 +183,21 @@ static final class Reader implements FrameReader { return true; default: - OkBuffers.skip(source, buffer, length, Deadline.NONE); + source.skip(length, Deadline.NONE); return true; } } else { int streamId = w1 & 0x7fffffff; boolean inFinished = (flags & FLAG_FIN) != 0; - handler.data(inFinished, streamId, OkBuffers.inputStream(source, buffer), length); + handler.data(inFinished, streamId, source.inputStream(), length); return true; } } private void readSynStream(Handler handler, int flags, int length) throws IOException { - OkBuffers.require(source, buffer, 12, Deadline.NONE); - int w1 = buffer.readInt(); - int w2 = buffer.readInt(); - int s3 = buffer.readShort(); + int w1 = source.readInt(); + int w2 = source.readInt(); + int s3 = source.readShort(); int streamId = w1 & 0x7fffffff; int associatedStreamId = w2 & 0x7fffffff; int priority = (s3 & 0xe000) >>> 13; @@ -211,8 +211,7 @@ private void readSynStream(Handler handler, int flags, int length) throws IOExce } private void readSynReply(Handler handler, int flags, int length) throws IOException { - OkBuffers.require(source, buffer, 4, Deadline.NONE); - int w1 = buffer.readInt(); + int w1 = source.readInt(); int streamId = w1 & 0x7fffffff; List
headerBlock = headerBlockReader.readNameValueBlock(length - 4); boolean inFinished = (flags & FLAG_FIN) != 0; @@ -221,9 +220,8 @@ private void readSynReply(Handler handler, int flags, int length) throws IOExcep private void readRstStream(Handler handler, int flags, int length) throws IOException { if (length != 8) throw ioException("TYPE_RST_STREAM length: %d != 8", length); - OkBuffers.require(source, buffer, 8, Deadline.NONE); - int streamId = buffer.readInt() & 0x7fffffff; - int errorCodeInt = buffer.readInt(); + int streamId = source.readInt() & 0x7fffffff; + int errorCodeInt = source.readInt(); ErrorCode errorCode = ErrorCode.fromSpdy3Rst(errorCodeInt); if (errorCode == null) { throw ioException("TYPE_RST_STREAM unexpected error code: %d", errorCodeInt); @@ -232,8 +230,7 @@ private void readRstStream(Handler handler, int flags, int length) throws IOExce } private void readHeaders(Handler handler, int flags, int length) throws IOException { - OkBuffers.require(source, buffer, 4, Deadline.NONE); - int w1 = buffer.readInt(); + int w1 = source.readInt(); int streamId = w1 & 0x7fffffff; List
headerBlock = headerBlockReader.readNameValueBlock(length - 4); handler.headers(false, false, streamId, -1, -1, headerBlock, HeadersMode.SPDY_HEADERS); @@ -241,9 +238,8 @@ private void readHeaders(Handler handler, int flags, int length) throws IOExcept private void readWindowUpdate(Handler handler, int flags, int length) throws IOException { if (length != 8) throw ioException("TYPE_WINDOW_UPDATE length: %d != 8", length); - OkBuffers.require(source, buffer, 8, Deadline.NONE); - int w1 = buffer.readInt(); - int w2 = buffer.readInt(); + int w1 = source.readInt(); + int w2 = source.readInt(); int streamId = w1 & 0x7fffffff; long increment = w2 & 0x7fffffff; if (increment == 0) throw ioException("windowSizeIncrement was 0", increment); @@ -252,17 +248,15 @@ private void readWindowUpdate(Handler handler, int flags, int length) throws IOE private void readPing(Handler handler, int flags, int length) throws IOException { if (length != 4) throw ioException("TYPE_PING length: %d != 4", length); - OkBuffers.require(source, buffer, 4, Deadline.NONE); - int id = buffer.readInt(); + int id = source.readInt(); boolean ack = client == ((id & 1) == 1); handler.ping(ack, id, 0); } private void readGoAway(Handler handler, int flags, int length) throws IOException { if (length != 8) throw ioException("TYPE_GOAWAY length: %d != 8", length); - OkBuffers.require(source, buffer, 8, Deadline.NONE); - int lastGoodStreamId = buffer.readInt() & 0x7fffffff; - int errorCodeInt = buffer.readInt(); + int lastGoodStreamId = source.readInt() & 0x7fffffff; + int errorCodeInt = source.readInt(); ErrorCode errorCode = ErrorCode.fromSpdyGoAway(errorCodeInt); if (errorCode == null) { throw ioException("TYPE_GOAWAY unexpected error code: %d", errorCodeInt); @@ -271,16 +265,14 @@ private void readGoAway(Handler handler, int flags, int length) throws IOExcepti } private void readSettings(Handler handler, int flags, int length) throws IOException { - OkBuffers.require(source, buffer, 4, Deadline.NONE); - int numberOfEntries = buffer.readInt(); + int numberOfEntries = source.readInt(); if (length != 4 + 8 * numberOfEntries) { throw ioException("TYPE_SETTINGS length: %d != 4 + 8 * %d", length, numberOfEntries); } - OkBuffers.require(source, buffer, 8 * numberOfEntries, Deadline.NONE); Settings settings = new Settings(); for (int i = 0; i < numberOfEntries; i++) { - int w1 = buffer.readInt(); - int value = buffer.readInt(); + int w1 = source.readInt(); + int value = source.readInt(); int idFlags = (w1 & 0xff000000) >>> 24; int id = w1 & 0xffffff; settings.set(id, idFlags, value); diff --git a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java index caa7bcf39354..1b96890cb720 100644 --- a/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java +++ b/okhttp-protocols/src/test/java/com/squareup/okhttp/internal/bytes/OkBufferTest.java @@ -384,7 +384,7 @@ private List moveBytesBetweenBuffers(String... contents) { source.writeUtf8(repeat('b', Segment.SIZE)); source.writeUtf8("c"); - InputStream in = OkBuffers.inputStream(source); + InputStream in = new BufferedSource(source, new OkBuffer()).inputStream(); assertEquals(0, in.available()); assertEquals(Segment.SIZE + 2, source.byteCount()); @@ -417,7 +417,7 @@ private List moveBytesBetweenBuffers(String... contents) { @Test public void inputStreamFromSourceBounds() throws IOException { OkBuffer source = new OkBuffer(); source.writeUtf8(repeat('a', 100)); - InputStream in = OkBuffers.inputStream(source); + InputStream in = new BufferedSource(source, new OkBuffer()).inputStream(); try { in.read(new byte[100], 50, 51); fail(); @@ -568,60 +568,60 @@ private List moveBytesBetweenBuffers(String... contents) { } @Test public void requireTracksBufferFirst() throws Exception { - OkBuffer buffer = new OkBuffer(); - buffer.writeUtf8("aa"); - OkBuffer source = new OkBuffer(); source.writeUtf8("bb"); - OkBuffers.require(source, buffer, 2, Deadline.NONE); - assertEquals(2, buffer.byteCount()); + BufferedSource bufferedSource = new BufferedSource(source, new OkBuffer()); + bufferedSource.buffer.writeUtf8("aa"); + + bufferedSource.require(2, Deadline.NONE); + assertEquals(2, bufferedSource.buffer.byteCount()); assertEquals(2, source.byteCount()); } @Test public void requireIncludesBufferBytes() throws Exception { - OkBuffer buffer = new OkBuffer(); - buffer.writeUtf8("a"); - OkBuffer source = new OkBuffer(); source.writeUtf8("b"); - OkBuffers.require(source, buffer, 2, Deadline.NONE); - assertEquals("ab", buffer.readUtf8(2)); + BufferedSource bufferedSource = new BufferedSource(source, new OkBuffer()); + bufferedSource.buffer.writeUtf8("a"); + + bufferedSource.require(2, Deadline.NONE); + assertEquals("ab", bufferedSource.buffer.readUtf8(2)); } @Test public void requireInsufficientData() throws Exception { - OkBuffer buffer = new OkBuffer(); - OkBuffer source = new OkBuffer(); source.writeUtf8("a"); + BufferedSource bufferedSource = new BufferedSource(source, new OkBuffer()); + try { - OkBuffers.require(source, buffer, 2, Deadline.NONE); + bufferedSource.require(2, Deadline.NONE); fail(); } catch (EOFException expected) { } } @Test public void requireReadsOneSegmentAtATime() throws Exception { - OkBuffer buffer = new OkBuffer(); - OkBuffer source = new OkBuffer(); source.writeUtf8(repeat('a', Segment.SIZE)); source.writeUtf8(repeat('b', Segment.SIZE)); - OkBuffers.require(source, buffer, 2, Deadline.NONE); + BufferedSource bufferedSource = new BufferedSource(source, new OkBuffer()); + + bufferedSource.require(2, Deadline.NONE); assertEquals(Segment.SIZE, source.byteCount()); - assertEquals(Segment.SIZE, buffer.byteCount()); + assertEquals(Segment.SIZE, bufferedSource.buffer.byteCount()); } @Test public void skipInsufficientData() throws Exception { - OkBuffer buffer = new OkBuffer(); - OkBuffer source = new OkBuffer(); source.writeUtf8("a"); + + BufferedSource bufferedSource = new BufferedSource(source, new OkBuffer()); try { - OkBuffers.require(source, buffer, 2, Deadline.NONE); + bufferedSource.skip(2, Deadline.NONE); fail(); } catch (EOFException expected) { } @@ -631,21 +631,21 @@ private List moveBytesBetweenBuffers(String... contents) { OkBuffer source = new OkBuffer(); source.writeUtf8(repeat('a', Segment.SIZE)); source.writeUtf8(repeat('b', Segment.SIZE)); - OkBuffer buffer = new OkBuffer(); - OkBuffers.skip(source, buffer, 2, Deadline.NONE); + BufferedSource bufferedSource = new BufferedSource(source, new OkBuffer()); + bufferedSource.skip(2, Deadline.NONE); assertEquals(Segment.SIZE, source.byteCount()); - assertEquals(Segment.SIZE - 2, buffer.byteCount()); + assertEquals(Segment.SIZE - 2, bufferedSource.buffer.byteCount()); } @Test public void skipTracksBufferFirst() throws Exception { - OkBuffer buffer = new OkBuffer(); - buffer.writeUtf8("aa"); - OkBuffer source = new OkBuffer(); source.writeUtf8("bb"); - OkBuffers.skip(source, buffer, 2, Deadline.NONE); - assertEquals(0, buffer.byteCount()); + BufferedSource bufferedSource = new BufferedSource(source, new OkBuffer()); + bufferedSource.buffer.writeUtf8("aa"); + + bufferedSource.skip(2, Deadline.NONE); + assertEquals(0, bufferedSource.buffer.byteCount()); assertEquals(2, source.byteCount()); }