Skip to content

Commit

Permalink
Merge pull request square#526 from square/jwilson_0212_okbuffer_in_sp…
Browse files Browse the repository at this point in the history
…dystream

Use OkBuffer in SpdyStream.
  • Loading branch information
swankjesse committed Feb 12, 2014
2 parents c40cb63 + c8507d0 commit 8720ab4
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public BufferedSource(Source source, OkBuffer buffer) {
* will block until there are bytes to read or the source is definitely
* exhausted.
*/
public boolean exhausted() throws IOException {
return buffer.byteCount() == 0 && source.read(buffer, Segment.SIZE, Deadline.NONE) == -1;
public boolean exhausted(Deadline deadline) throws IOException {
return buffer.byteCount() == 0 && source.read(buffer, Segment.SIZE, deadline) == -1;
}

/**
Expand Down Expand Up @@ -84,11 +84,21 @@ public short readShort() throws IOException {
return buffer.readShort();
}

public int readShortLe() throws IOException {
require(2, Deadline.NONE);
return buffer.readShortLe();
}

public int readInt() throws IOException {
require(4, Deadline.NONE);
return buffer.readInt();
}

public int readIntLe() throws IOException {
require(4, Deadline.NONE);
return buffer.readIntLe();
}

/**
* Reads and discards {@code byteCount} bytes from {@code source} using {@code
* buffer} as a buffer. Throws an {@link EOFException} if the source is
Expand All @@ -105,6 +115,20 @@ public void skip(long byteCount, Deadline deadline) throws IOException {
}
}

/**
* Returns the index of {@code b} in the buffer, refilling it if necessary
* until it is found. This reads an unbounded number of bytes into the buffer.
*/
public long seek(byte b, Deadline deadline) throws IOException {
long start = 0;
long index;
while ((index = buffer.indexOf(b, start)) == -1) {
start = buffer.byteCount;
if (source.read(buffer, Segment.SIZE, deadline) == -1) throw new EOFException();
}
return index;
}

/** Returns an input stream that reads from this source. */
public InputStream inputStream() {
return new InputStream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.squareup.okhttp.internal.bytes;

import java.io.EOFException;
import java.io.IOException;
import java.util.zip.CRC32;
import java.util.zip.Inflater;
Expand All @@ -34,22 +33,12 @@ public final class GzipSource implements Source {
/** The current section. Always progresses forward. */
private int section = SECTION_HEADER;

/**
* This buffer is carefully shared between this source and the InflaterSource
* it wraps. In particular, this source may read more bytes than necessary for
* the GZIP header; the InflaterSource will pick those up when it starts to
* read the compressed body. And the InflaterSource may read more bytes than
* necessary for the compressed body, and this source will pick those up for
* the GZIP trailer.
*/
private final OkBuffer buffer = new OkBuffer();

/**
* Our source should yield a GZIP header (which we consume directly), followed
* by deflated bytes (which we consume via an InflaterSource), followed by a
* GZIP trailer (which we also consume directly).
*/
private final Source source;
private final BufferedSource source;

/** The inflater used to decompress the deflated body. */
private final Inflater inflater;
Expand All @@ -65,8 +54,8 @@ public final class GzipSource implements Source {

public GzipSource(Source source) throws IOException {
this.inflater = new Inflater(true);
this.source = source;
this.inflaterSource = new InflaterSource(source, inflater, buffer);
this.source = new BufferedSource(source, new OkBuffer());
this.inflaterSource = new InflaterSource(this.source, inflater);
}

@Override public long read(OkBuffer sink, long byteCount, Deadline deadline) throws IOException {
Expand Down Expand Up @@ -108,54 +97,54 @@ private void consumeHeader(Deadline deadline) throws IOException {
// +---+---+---+---+---+---+---+---+---+---+
// |ID1|ID2|CM |FLG| MTIME |XFL|OS | (more-->)
// +---+---+---+---+---+---+---+---+---+---+
require(10, deadline);
byte flags = buffer.getByte(3);
source.require(10, deadline);
byte flags = source.buffer.getByte(3);
boolean fhcrc = ((flags >> FHCRC) & 1) == 1;
if (fhcrc) updateCrc(buffer, 0, 10);
if (fhcrc) updateCrc(source.buffer, 0, 10);

short id1id2 = buffer.readShort();
short id1id2 = source.readShort();
checkEqual("ID1ID2", (short) 0x1f8b, id1id2);
buffer.skip(8);
source.skip(8, deadline);

// Skip optional extra fields.
// +---+---+=================================+
// | XLEN |...XLEN bytes of "extra field"...| (more-->)
// +---+---+=================================+
if (((flags >> FEXTRA) & 1) == 1) {
require(2, deadline);
if (fhcrc) updateCrc(buffer, 0, 2);
int xlen = buffer.readShortLe() & 0xffff;
require(xlen, deadline);
if (fhcrc) updateCrc(buffer, 0, xlen);
buffer.skip(xlen);
source.require(2, deadline);
if (fhcrc) updateCrc(source.buffer, 0, 2);
int xlen = source.buffer.readShortLe() & 0xffff;
source.require(xlen, deadline);
if (fhcrc) updateCrc(source.buffer, 0, xlen);
source.skip(xlen, deadline);
}

// Skip an optional 0-terminated name.
// +=========================================+
// |...original file name, zero-terminated...| (more-->)
// +=========================================+
if (((flags >> FNAME) & 1) == 1) {
long index = OkBuffers.seek(buffer, (byte) 0, source, deadline);
if (fhcrc) updateCrc(buffer, 0, index + 1);
buffer.skip(index + 1);
long index = source.seek((byte) 0, deadline);
if (fhcrc) updateCrc(source.buffer, 0, index + 1);
source.buffer.skip(index + 1);
}

// Skip an optional 0-terminated comment.
// +===================================+
// |...file comment, zero-terminated...| (more-->)
// +===================================+
if (((flags >> FCOMMENT) & 1) == 1) {
long index = OkBuffers.seek(buffer, (byte) 0, source, deadline);
if (fhcrc) updateCrc(buffer, 0, index + 1);
buffer.skip(index + 1);
long index = source.seek((byte) 0, deadline);
if (fhcrc) updateCrc(source.buffer, 0, index + 1);
source.skip(index + 1, deadline);
}

// Confirm the optional header CRC.
// +---+---+
// | CRC16 |
// +---+---+
if (fhcrc) {
checkEqual("FHCRC", buffer.readShortLe(), (short) crc.getValue());
checkEqual("FHCRC", source.readShortLe(), (short) crc.getValue());
crc.reset();
}
}
Expand All @@ -165,9 +154,8 @@ private void consumeTrailer(Deadline deadline) throws IOException {
// +---+---+---+---+---+---+---+---+
// | CRC32 | ISIZE |
// +---+---+---+---+---+---+---+---+
require(8, deadline);
checkEqual("CRC", buffer.readIntLe(), (int) crc.getValue());
checkEqual("ISIZE", buffer.readIntLe(), inflater.getTotalOut());
checkEqual("CRC", source.readIntLe(), (int) crc.getValue());
checkEqual("ISIZE", source.readIntLe(), inflater.getTotalOut());
}

@Override public void close(Deadline deadline) throws IOException {
Expand All @@ -187,13 +175,6 @@ private void updateCrc(OkBuffer buffer, long offset, long byteCount) {
}
}

/** Fills the buffer with at least {@code byteCount} bytes. */
private void require(int byteCount, Deadline deadline) throws IOException {
while (buffer.byteCount < byteCount) {
if (source.read(buffer, Segment.SIZE, deadline) == -1) throw new EOFException();
}
}

private void checkEqual(String name, int expected, int actual) throws IOException {
if (actual != expected) {
throw new IOException(String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@

/** A source that inflates another source. */
public final class InflaterSource implements Source {
private final Source source;
private final BufferedSource source;
private final Inflater inflater;
/** This holds bytes read from the source, but not yet inflated. */
private final OkBuffer buffer;

/**
* When we call Inflater.setInput(), the inflater keeps our byte array until
Expand All @@ -36,15 +34,19 @@ public final class InflaterSource implements Source {
private boolean closed;

public InflaterSource(Source source, Inflater inflater) {
this(source, inflater, new OkBuffer());
this(new BufferedSource(source, new OkBuffer()), inflater);
}

InflaterSource(Source source, Inflater inflater, OkBuffer buffer) {
/**
* This package-private constructor shares a buffer with its trusted caller.
* In general we can't share a BufferedSource because the inflater holds input
* bytes until they are inflated.
*/
InflaterSource(BufferedSource source, Inflater inflater) {
if (source == null) throw new IllegalArgumentException("source == null");
if (inflater == null) throw new IllegalArgumentException("inflater == null");
this.source = source;
this.inflater = inflater;
this.buffer = buffer;
}

@Override public long read(
Expand Down Expand Up @@ -87,13 +89,11 @@ public boolean refill(Deadline deadline) throws IOException {
releaseInflatedBytes();
if (inflater.getRemaining() != 0) throw new IllegalStateException("?"); // TODO: possible?

// Refill the buffer with compressed data from the source.
if (buffer.byteCount == 0) {
if (source.read(buffer, Segment.SIZE, deadline) == -1) return true;
}
// If there are compressed bytes in the source, assign them to the inflater.
if (source.exhausted(deadline)) return true;

// Assign buffer bytes to the inflater.
Segment head = buffer.head;
Segment head = source.buffer.head;
bufferBytesHeldByInflater = head.limit - head.pos;
inflater.setInput(head.data, head.pos, bufferBytesHeldByInflater);
return false;
Expand All @@ -104,13 +104,12 @@ private void releaseInflatedBytes() {
if (bufferBytesHeldByInflater == 0) return;
int toRelease = bufferBytesHeldByInflater - inflater.getRemaining();
bufferBytesHeldByInflater -= toRelease;
buffer.skip(toRelease);
source.buffer.skip(toRelease);
}

@Override public void close(Deadline deadline) throws IOException {
if (closed) return;
inflater.end();
buffer.clear();
closed = true;
source.close(deadline);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.squareup.okhttp.internal.bytes;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -26,22 +25,6 @@ public final class OkBuffers {
private OkBuffers() {
}

/**
* Returns the index of {@code b} in {@code buffer}, refilling it if necessary
* until it is found. This reads an unbounded number of bytes into {@code
* buffer}.
*/
public static long seek(OkBuffer buffer, byte b, Source source, Deadline deadline)
throws IOException {
long start = 0;
long index;
while ((index = buffer.indexOf(b, start)) == -1) {
start = buffer.byteCount;
if (source.read(buffer, Segment.SIZE, deadline) == -1) throw new EOFException();
}
return index;
}

/** Returns a sink that writes to {@code out}. */
public static Sink sink(final OutputStream out) {
return new Sink() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package com.squareup.okhttp.internal.spdy;

import com.squareup.okhttp.internal.bytes.BufferedSource;
import com.squareup.okhttp.internal.bytes.ByteString;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;

/** Reads transport frames for SPDY/3 or HTTP/2. */
Expand All @@ -28,7 +28,8 @@ public interface FrameReader extends Closeable {
boolean nextFrame(Handler handler) throws IOException;

public interface Handler {
void data(boolean inFinished, int streamId, InputStream in, int length) throws IOException;
void data(boolean inFinished, int streamId, BufferedSource source, int length)
throws IOException;

/**
* Create or update incoming headers, creating the corresponding streams
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.squareup.okhttp.internal.BitArray;
import com.squareup.okhttp.internal.bytes.BufferedSource;
import com.squareup.okhttp.internal.bytes.ByteString;
import com.squareup.okhttp.internal.bytes.Deadline;
import com.squareup.okhttp.internal.bytes.OkBuffer;
import com.squareup.okhttp.internal.bytes.Source;
import java.io.IOException;
Expand Down Expand Up @@ -180,7 +181,7 @@ private int evictToRecoverBytes(int bytesToRecover) {
* set of emitted headers.
*/
void readHeaders() throws IOException {
while (!source.exhausted()) {
while (!source.exhausted(Deadline.NONE)) {
int b = source.readByte() & 0xff;
if (b == 0x80) { // 10000000
clearReferenceSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private void readData(Handler handler, short length, byte flags, int streamId)
throws IOException {
boolean inFinished = (flags & FLAG_END_STREAM) != 0;
// TODO: checkState open or half-closed (local) or raise STREAM_CLOSED
handler.data(inFinished, streamId, source.inputStream(), length);
handler.data(inFinished, streamId, source, length);
}

private void readPriority(Handler handler, short length, byte flags, int streamId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ static final class Reader implements FrameReader {
} else {
int streamId = w1 & 0x7fffffff;
boolean inFinished = (flags & FLAG_FIN) != 0;
handler.data(inFinished, streamId, source.inputStream(), length);
handler.data(inFinished, streamId, source, length);
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import com.squareup.okhttp.Protocol;
import com.squareup.okhttp.internal.NamedRunnable;
import com.squareup.okhttp.internal.Util;
import com.squareup.okhttp.internal.bytes.BufferedSource;
import com.squareup.okhttp.internal.bytes.ByteString;
import com.squareup.okhttp.internal.bytes.Deadline;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -526,15 +528,15 @@ private Reader() {
}
}

@Override public void data(boolean inFinished, int streamId, InputStream in, int length)
@Override public void data(boolean inFinished, int streamId, BufferedSource source, int length)
throws IOException {
SpdyStream dataStream = getStream(streamId);
if (dataStream == null) {
writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
Util.skipByReading(in, length);
source.skip(length, Deadline.NONE);
return;
}
dataStream.receiveData(in, length);
dataStream.receiveData(source, length);
if (inFinished) {
dataStream.receiveFin();
}
Expand Down
Loading

0 comments on commit 8720ab4

Please sign in to comment.