Skip to content

Commit

Permalink
Merge pull request #496 from square/jwilson_0130_adapters
Browse files Browse the repository at this point in the history
Adapters that go the other way, to java.io.
  • Loading branch information
swankjesse committed Jan 31, 2014
2 parents d587a44 + 2d01579 commit 52804f6
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static int getDefaultPort(String protocol) {
return -1;
}

public static void checkOffsetAndCount(int arrayLength, int offset, int count) {
public static void checkOffsetAndCount(long arrayLength, long offset, long count) {
if ((offset | count) < 0 || offset > arrayLength || arrayLength - offset < count) {
throw new ArrayIndexOutOfBoundsException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Collections;
import java.util.List;

import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;

/**
* A collection of bytes in memory.
*
Expand Down Expand Up @@ -147,7 +149,7 @@ public String readUtf8(int byteCount) {
}

private byte[] readBytes(int byteCount) {
checkByteCount(byteCount);
checkOffsetAndCount(this.byteCount, 0, byteCount);

int offset = 0;
byte[] result = new byte[byteCount];
Expand Down Expand Up @@ -301,7 +303,7 @@ Segment writableSegment(int minimumCapacity) {
// yielding sink [51%, 91%, 30%] and source [62%, 82%].

if (source == this) throw new IllegalArgumentException("source == this");
source.checkByteCount(byteCount);
checkOffsetAndCount(source.byteCount, 0, byteCount);

while (byteCount > 0) {
// Is a prefix of the source's head segment all that we need to move?
Expand Down Expand Up @@ -365,7 +367,6 @@ public long indexOf(byte b) throws IOException {
}

@Override public void flush(Deadline deadline) {
throw new UnsupportedOperationException("Cannot flush() an OkBuffer");
}

@Override public void close(Deadline deadline) {
Expand Down Expand Up @@ -400,15 +401,4 @@ List<Integer> segmentSizes() {
}
return new String(result);
}

/** Throws if this has fewer bytes than {@code requested}. */
void checkByteCount(long requested) {
if (requested < 0) {
throw new IllegalArgumentException("requested < 0: " + requested);
}
if (requested > this.byteCount) {
throw new IllegalArgumentException(
String.format("requested %s > available %s", requested, this.byteCount));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.InputStream;
import java.io.OutputStream;

import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;

public final class OkBuffers {
private OkBuffers() {
}
Expand All @@ -28,7 +30,7 @@ public static Sink sink(final OutputStream out) {
return new Sink() {
@Override public void write(OkBuffer source, long byteCount, Deadline deadline)
throws IOException {
source.checkByteCount(byteCount);
checkOffsetAndCount(source.byteCount, 0, byteCount);
while (byteCount > 0) {
deadline.throwIfReached();
Segment head = source.head;
Expand Down Expand Up @@ -60,6 +62,52 @@ public static Sink sink(final OutputStream out) {
};
}

/**
* Returns an output stream that writes to {@code sink}. This may buffer data
* by deferring writes.
*/
public static OutputStream outputStream(final Sink sink) {
return new OutputStream() {
final OkBuffer buffer = new OkBuffer(); // Buffer at most one segment of data.

@Override public void write(int b) throws IOException {
buffer.writeByte((byte) b);
if (buffer.byteCount == Segment.SIZE) {
sink.write(buffer, buffer.byteCount, Deadline.NONE);
}
}

@Override public void write(byte[] data, int offset, int byteCount) throws IOException {
checkOffsetAndCount(data.length, offset, byteCount);
int limit = offset + byteCount;
while (offset < limit) {
Segment onlySegment = buffer.writableSegment(1);
int toCopy = Math.min(limit - offset, Segment.SIZE - onlySegment.limit);
System.arraycopy(data, offset, onlySegment.data, onlySegment.limit, toCopy);
offset += toCopy;
onlySegment.limit += toCopy;
buffer.byteCount += toCopy;
if (buffer.byteCount == Segment.SIZE) {
sink.write(buffer, buffer.byteCount, Deadline.NONE);
}
}
}

@Override public void flush() throws IOException {
sink.write(buffer, buffer.byteCount, Deadline.NONE);
sink.flush(Deadline.NONE);
}

@Override public void close() throws IOException {
sink.close(Deadline.NONE);
}

@Override public String toString() {
return "outputStream(" + sink + ")";
}
};
}

/** Returns a source that reads from {@code in}. */
public static Source source(final InputStream in) {
return new Source() {
Expand All @@ -85,4 +133,57 @@ public static Source source(final InputStream in) {
}
};
}

/**
* Returns an input stream that reads from {@code source}. This may buffer
* data by reading extra data eagerly.
*/
public static InputStream inputStream(final Source source) {
return new InputStream() {
final OkBuffer buffer = new OkBuffer();

@Override public int read() throws IOException {
if (buffer.byteCount == 0) {
long count = source.read(buffer, Segment.SIZE, Deadline.NONE);
if (count == -1) return -1;
}
return buffer.readByte();
}

@Override public int read(byte[] data, int offset, int byteCount) throws IOException {
checkOffsetAndCount(data.length, offset, byteCount);

if (buffer.byteCount == 0) {
long count = source.read(buffer, Segment.SIZE, Deadline.NONE);
if (count == -1) return -1;
}

Segment head = buffer.head;
int toCopy = Math.min(byteCount, head.limit - head.pos);
System.arraycopy(head.data, head.pos, data, offset, toCopy);

head.pos += toCopy;
buffer.byteCount -= toCopy;

if (head.pos == head.limit) {
buffer.head = head.pop();
SegmentPool.INSTANCE.recycle(head);
}

return toCopy;
}

@Override public int available() throws IOException {
return (int) Math.min(buffer.byteCount, Integer.MAX_VALUE);
}

@Override public void close() throws IOException {
super.close();
}

@Override public String toString() {
return "inputStream(" + source + ")";
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
Expand All @@ -41,7 +43,7 @@ public final class OkBufferTest {
try {
buffer.readUtf8(1);
fail();
} catch (IllegalArgumentException expected) {
} catch (ArrayIndexOutOfBoundsException expected) {
}
}

Expand Down Expand Up @@ -292,6 +294,26 @@ private List<Integer> moveBytesBetweenBuffers(String... contents) {
assertEquals("a" + repeat('b', 9998) + "c", out.toString("UTF-8"));
}

@Test public void outputStreamFromSink() throws Exception {
OkBuffer sink = new OkBuffer();
OutputStream out = OkBuffers.outputStream(sink);
out.write('a');
out.write(repeat('b', 9998).getBytes(UTF_8));
out.write('c');
out.flush();
assertEquals("a" + repeat('b', 9998) + "c", sink.readUtf8(10000));
}

@Test public void outputStreamFromSinkBounds() throws Exception {
OkBuffer sink = new OkBuffer();
OutputStream out = OkBuffers.outputStream(sink);
try {
out.write(new byte[100], 50, 51);
fail();
} catch (ArrayIndexOutOfBoundsException expected) {
}
}

@Test public void sourceFromInputStream() throws Exception {
InputStream in = new ByteArrayInputStream(
("a" + repeat('b', Segment.SIZE * 2) + "c").getBytes(UTF_8));
Expand All @@ -316,6 +338,62 @@ private List<Integer> moveBytesBetweenBuffers(String... contents) {
assertEquals(-1, source.read(sink, 1, Deadline.NONE));
}

@Test public void sourceFromInputStreamBounds() throws Exception {
Source source = OkBuffers.source(new ByteArrayInputStream(new byte[100]));
try {
source.read(new OkBuffer(), -1, Deadline.NONE);
fail();
} catch (IllegalArgumentException expected) {
}
}

@Test public void inputStreamFromSource() throws Exception {
OkBuffer source = new OkBuffer();
source.writeUtf8("a");
source.writeUtf8(repeat('b', Segment.SIZE));
source.writeUtf8("c");

InputStream in = OkBuffers.inputStream(source);
assertEquals(0, in.available());
assertEquals(Segment.SIZE + 2, source.byteCount());

// Reading one byte buffers a full segment.
assertEquals('a', in.read());
assertEquals(Segment.SIZE - 1, in.available());
assertEquals(2, source.byteCount());

// Reading as much as possible reads the rest of that buffered segment.
byte[] data = new byte[Segment.SIZE * 2];
assertEquals(Segment.SIZE - 1, in.read(data, 0, data.length));
assertEquals(repeat('b', Segment.SIZE - 1), new String(data, 0, Segment.SIZE - 1, UTF_8));
assertEquals(2, source.byteCount());

// Continuing to read buffers the next segment.
assertEquals('b', in.read());
assertEquals(1, in.available());
assertEquals(0, source.byteCount());

// Continuing to read reads from the buffer.
assertEquals('c', in.read());
assertEquals(0, in.available());
assertEquals(0, source.byteCount());

// Once we've exhausted the source, we're done.
assertEquals(-1, in.read());
assertEquals(0, source.byteCount());
}

@Test public void inputStreamFromSourceBounds() throws IOException {
OkBuffer source = new OkBuffer();
source.writeUtf8(repeat('a', 100));
InputStream in = OkBuffers.inputStream(source);
try {
in.read(new byte[100], 50, 51);
fail();
} catch (ArrayIndexOutOfBoundsException expected) {
}
}

@Test public void writeBytes() throws Exception {
OkBuffer data = new OkBuffer();
data.writeByte(0xab);
Expand Down

0 comments on commit 52804f6

Please sign in to comment.