Skip to content

Commit

Permalink
Merge pull request square#524 from square/jwilson_0211_buffered_source
Browse files Browse the repository at this point in the history
BufferedSource.
  • Loading branch information
Adrian Cole committed Feb 12, 2014
2 parents 545807e + 9c6a433 commit c40cb63
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 227 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright (C) 2014 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.okhttp.internal.bytes;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;

import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;

/**
* A source that keeps a buffer internally so that callers can do small reads
* without a performance penalty.
*/
public final class BufferedSource implements Source {
public final OkBuffer buffer;
public final Source source;
private boolean closed;

public BufferedSource(Source source, OkBuffer buffer) {
this.buffer = buffer;
this.source = source;
}

@Override public long read(OkBuffer sink, long byteCount, Deadline deadline)
throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");

if (buffer.byteCount == 0) {
long read = source.read(buffer, Segment.SIZE, deadline);
if (read == -1) return -1;
}

long toRead = Math.min(byteCount, buffer.byteCount);
return buffer.read(sink, toRead, deadline);
}

/**
* Returns true if there are no more bytes in the buffer or the source. This
* will block until there are bytes to read or the source is definitely
* exhausted.
*/
public boolean exhausted() throws IOException {
return buffer.byteCount() == 0 && source.read(buffer, Segment.SIZE, Deadline.NONE) == -1;
}

/**
* Returns when the buffer contains at least {@code byteCount} bytes. Throws
* an {@link EOFException} if the source is exhausted before the required
* bytes can be read.
*/
void require(long byteCount, Deadline deadline) throws IOException {
while (buffer.byteCount < byteCount) {
if (source.read(buffer, Segment.SIZE, deadline) == -1) throw new EOFException();
}
}

public byte readByte() throws IOException {
require(1, Deadline.NONE);
return buffer.readByte();
}

public ByteString readByteString(int byteCount) throws IOException {
require(byteCount, Deadline.NONE);
return buffer.readByteString(byteCount);
}

public short readShort() throws IOException {
require(2, Deadline.NONE);
return buffer.readShort();
}

public int readInt() throws IOException {
require(4, Deadline.NONE);
return buffer.readInt();
}

/**
* Reads and discards {@code byteCount} bytes from {@code source} using {@code
* buffer} as a buffer. Throws an {@link EOFException} if the source is
* exhausted before the requested bytes can be skipped.
*/
public void skip(long byteCount, Deadline deadline) throws IOException {
while (byteCount > 0) {
if (buffer.byteCount == 0 && source.read(buffer, Segment.SIZE, deadline) == -1) {
throw new EOFException();
}
long toSkip = Math.min(byteCount, buffer.byteCount());
buffer.skip(toSkip);
byteCount -= toSkip;
}
}

/** Returns an input stream that reads from this source. */
public InputStream inputStream() {
return new InputStream() {
@Override public int read() throws IOException {
if (buffer.byteCount == 0) {
long count = source.read(buffer, Segment.SIZE, Deadline.NONE);
if (count == -1) return -1;
}
return buffer.readByte() & 0xff;
}

@Override public int read(byte[] data, int offset, int byteCount) throws IOException {
checkOffsetAndCount(data.length, offset, byteCount);

if (buffer.byteCount == 0) {
long count = source.read(buffer, Segment.SIZE, Deadline.NONE);
if (count == -1) return -1;
}

Segment head = buffer.head;
int toCopy = Math.min(byteCount, head.limit - head.pos);
System.arraycopy(head.data, head.pos, data, offset, toCopy);

head.pos += toCopy;
buffer.byteCount -= toCopy;

if (head.pos == head.limit) {
buffer.head = head.pop();
SegmentPool.INSTANCE.recycle(head);
}

return toCopy;
}

@Override public int available() throws IOException {
return (int) Math.min(buffer.byteCount, Integer.MAX_VALUE);
}

@Override public void close() throws IOException {
BufferedSource.this.close(Deadline.NONE);
}

@Override public String toString() {
return BufferedSource.this.toString() + ".inputStream()";
}
};
}

@Override public void close(Deadline deadline) throws IOException {
if (closed) return;
closed = true;
source.close(deadline);
buffer.clear();
}

@Override public String toString() {
return "BufferedSource(" + source + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ Segment writableSegment(int minimumCapacity) {
// an equivalent buffer [30%, 62%, 82%] and then move the head segment,
// yielding sink [51%, 91%, 30%] and source [62%, 82%].

if (source == this) throw new IllegalArgumentException("source == this");
if (source == this) {
throw new IllegalArgumentException("source == this");
}
checkOffsetAndCount(source.byteCount, 0, byteCount);

while (byteCount > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,35 +42,6 @@ public static long seek(OkBuffer buffer, byte b, Source source, Deadline deadlin
return index;
}

/**
* Returns when {@code sink} contains at least {@code byteCount} bytes. Throws
* an {@link EOFException} if the source is exhausted before the requested
* bytes can be read.
*/
public static void require(Source source, OkBuffer sink, long byteCount, Deadline deadline)
throws IOException {
while (sink.byteCount < byteCount) {
if (source.read(sink, Segment.SIZE, deadline) == -1) throw new EOFException();
}
}

/**
* Reads and discards {@code byteCount} bytes from {@code source} using {@code
* buffer} as a buffer. Throws an {@link EOFException} if the source is
* exhausted before the requested bytes can be skipped.
*/
public static void skip(Source source, OkBuffer buffer, long byteCount, Deadline deadline)
throws IOException {
while (byteCount > 0) {
if (buffer.byteCount == 0 && source.read(buffer, Segment.SIZE, deadline) == -1) {
throw new EOFException();
}
long toSkip = Math.min(byteCount, buffer.byteCount());
buffer.skip(toSkip);
byteCount -= toSkip;
}
}

/** Returns a sink that writes to {@code out}. */
public static Sink sink(final OutputStream out) {
return new Sink() {
Expand Down Expand Up @@ -180,65 +151,4 @@ public static Source source(final InputStream in) {
}
};
}

/**
* Returns an input stream that reads from {@code source}. This may buffer
* data by reading extra data eagerly.
*/
public static InputStream inputStream(final Source source) {
return inputStream(source, new OkBuffer());
}

/**
* Returns a buffered input stream that reads from {@code source}, with {@code
* buffer} as a buffer. Bytes are drawn from {@code buffer}, which is refilled
* from {@code source} when it is empty. This may read extra data eagerly into
* {@code buffer}.
*/
public static InputStream inputStream(final Source source, final OkBuffer buffer) {
return new InputStream() {
@Override public int read() throws IOException {
if (buffer.byteCount == 0) {
long count = source.read(buffer, Segment.SIZE, Deadline.NONE);
if (count == -1) return -1;
}
return buffer.readByte() & 0xff;
}

@Override public int read(byte[] data, int offset, int byteCount) throws IOException {
checkOffsetAndCount(data.length, offset, byteCount);

if (buffer.byteCount == 0) {
long count = source.read(buffer, Segment.SIZE, Deadline.NONE);
if (count == -1) return -1;
}

Segment head = buffer.head;
int toCopy = Math.min(byteCount, head.limit - head.pos);
System.arraycopy(head.data, head.pos, data, offset, toCopy);

head.pos += toCopy;
buffer.byteCount -= toCopy;

if (head.pos == head.limit) {
buffer.head = head.pop();
SegmentPool.INSTANCE.recycle(head);
}

return toCopy;
}

@Override public int available() throws IOException {
return (int) Math.min(buffer.byteCount, Integer.MAX_VALUE);
}

@Override public void close() throws IOException {
source.close(Deadline.NONE);
}

@Override public String toString() {
return "inputStream(" + source + ")";
}
};
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package com.squareup.okhttp.internal.spdy;

import com.squareup.okhttp.internal.BitArray;
import com.squareup.okhttp.internal.bytes.BufferedSource;
import com.squareup.okhttp.internal.bytes.ByteString;
import com.squareup.okhttp.internal.bytes.Deadline;
import com.squareup.okhttp.internal.bytes.OkBuffer;
import com.squareup.okhttp.internal.bytes.OkBuffers;
import com.squareup.okhttp.internal.bytes.Source;
import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -100,8 +99,7 @@ static final class Reader {
private final Huffman.Codec huffmanCodec;

private final List<Header> emittedHeaders = new ArrayList<Header>();
private final Source source;
private final OkBuffer buffer = new OkBuffer();
private final BufferedSource source;
private int maxHeaderTableByteCount;

// Visible for testing.
Expand All @@ -127,7 +125,7 @@ static final class Reader {
Reader(boolean client, int maxHeaderTableByteCount, Source source) {
this.huffmanCodec = client ? Huffman.Codec.RESPONSE : Huffman.Codec.REQUEST;
this.maxHeaderTableByteCount = maxHeaderTableByteCount;
this.source = source;
this.source = new BufferedSource(source, new OkBuffer());
}

int maxHeaderTableByteCount() {
Expand Down Expand Up @@ -182,9 +180,8 @@ private int evictToRecoverBytes(int bytesToRecover) {
* set of emitted headers.
*/
void readHeaders() throws IOException {
while (buffer.byteCount() > 0
|| source.read(buffer, 2048, Deadline.NONE) != -1) {
int b = buffer.readByte() & 0xff;
while (!source.exhausted()) {
int b = source.readByte() & 0xff;
if (b == 0x80) { // 10000000
clearReferenceSet();
} else if ((b & 0x80) == 0x80) { // 1NNNNNNN
Expand Down Expand Up @@ -335,8 +332,7 @@ private void insertIntoHeaderTable(int index, Header entry) {
}

private int readByte() throws IOException {
OkBuffers.require(source, buffer, 1, Deadline.NONE);
return buffer.readByte() & 0xff;
return source.readByte() & 0xff;
}

int readInt(int firstByte, int prefixMask) throws IOException {
Expand Down Expand Up @@ -375,8 +371,7 @@ ByteString readByteString(boolean asciiLowercase) throws IOException {
huffmanDecode = true;
}

OkBuffers.require(source, buffer, length, Deadline.NONE);
ByteString byteString = buffer.readByteString(length);
ByteString byteString = source.readByteString(length);

if (huffmanDecode) {
byteString = huffmanCodec.decode(byteString); // TODO: streaming Huffman!
Expand Down
Loading

0 comments on commit c40cb63

Please sign in to comment.