Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapters that go the other way, to java.io. #496

Merged
merged 1 commit into from
Jan 31, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static int getDefaultPort(String protocol) {
return -1;
}

public static void checkOffsetAndCount(int arrayLength, int offset, int count) {
public static void checkOffsetAndCount(long arrayLength, long offset, long count) {
if ((offset | count) < 0 || offset > arrayLength || arrayLength - offset < count) {
throw new ArrayIndexOutOfBoundsException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Collections;
import java.util.List;

import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;

/**
* A collection of bytes in memory.
*
Expand Down Expand Up @@ -147,7 +149,7 @@ public String readUtf8(int byteCount) {
}

private byte[] readBytes(int byteCount) {
checkByteCount(byteCount);
checkOffsetAndCount(this.byteCount, 0, byteCount);

int offset = 0;
byte[] result = new byte[byteCount];
Expand Down Expand Up @@ -301,7 +303,7 @@ Segment writableSegment(int minimumCapacity) {
// yielding sink [51%, 91%, 30%] and source [62%, 82%].

if (source == this) throw new IllegalArgumentException("source == this");
source.checkByteCount(byteCount);
checkOffsetAndCount(source.byteCount, 0, byteCount);

while (byteCount > 0) {
// Is a prefix of the source's head segment all that we need to move?
Expand Down Expand Up @@ -365,7 +367,6 @@ public long indexOf(byte b) throws IOException {
}

@Override public void flush(Deadline deadline) {
throw new UnsupportedOperationException("Cannot flush() an OkBuffer");
}

@Override public void close(Deadline deadline) {
Expand Down Expand Up @@ -400,15 +401,4 @@ List<Integer> segmentSizes() {
}
return new String(result);
}

/** Throws if this has fewer bytes than {@code requested}. */
void checkByteCount(long requested) {
if (requested < 0) {
throw new IllegalArgumentException("requested < 0: " + requested);
}
if (requested > this.byteCount) {
throw new IllegalArgumentException(
String.format("requested %s > available %s", requested, this.byteCount));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.InputStream;
import java.io.OutputStream;

import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;

public final class OkBuffers {
private OkBuffers() {
}
Expand All @@ -28,7 +30,7 @@ public static Sink sink(final OutputStream out) {
return new Sink() {
@Override public void write(OkBuffer source, long byteCount, Deadline deadline)
throws IOException {
source.checkByteCount(byteCount);
checkOffsetAndCount(source.byteCount, 0, byteCount);
while (byteCount > 0) {
deadline.throwIfReached();
Segment head = source.head;
Expand Down Expand Up @@ -60,6 +62,52 @@ public static Sink sink(final OutputStream out) {
};
}

/**
* Returns an output stream that writes to {@code sink}. This may buffer data
* by deferring writes.
*/
public static OutputStream outputStream(final Sink sink) {
return new OutputStream() {
final OkBuffer buffer = new OkBuffer(); // Buffer at most one segment of data.

@Override public void write(int b) throws IOException {
buffer.writeByte((byte) b);
if (buffer.byteCount == Segment.SIZE) {
sink.write(buffer, buffer.byteCount, Deadline.NONE);
}
}

@Override public void write(byte[] data, int offset, int byteCount) throws IOException {
checkOffsetAndCount(data.length, offset, byteCount);
int limit = offset + byteCount;
while (offset < limit) {
Segment onlySegment = buffer.writableSegment(1);
int toCopy = Math.min(limit - offset, Segment.SIZE - onlySegment.limit);
System.arraycopy(data, offset, onlySegment.data, onlySegment.limit, toCopy);
offset += toCopy;
onlySegment.limit += toCopy;
buffer.byteCount += toCopy;
if (buffer.byteCount == Segment.SIZE) {
sink.write(buffer, buffer.byteCount, Deadline.NONE);
}
}
}

@Override public void flush() throws IOException {
sink.write(buffer, buffer.byteCount, Deadline.NONE);
sink.flush(Deadline.NONE);
}

@Override public void close() throws IOException {
sink.close(Deadline.NONE);
}

@Override public String toString() {
return "outputStream(" + sink + ")";
}
};
}

/** Returns a source that reads from {@code in}. */
public static Source source(final InputStream in) {
return new Source() {
Expand All @@ -85,4 +133,57 @@ public static Source source(final InputStream in) {
}
};
}

/**
* Returns an input stream that reads from {@code source}. This may buffer
* data by reading extra data eagerly.
*/
public static InputStream inputStream(final Source source) {
return new InputStream() {
final OkBuffer buffer = new OkBuffer();

@Override public int read() throws IOException {
if (buffer.byteCount == 0) {
long count = source.read(buffer, Segment.SIZE, Deadline.NONE);
if (count == -1) return -1;
}
return buffer.readByte();
}

@Override public int read(byte[] data, int offset, int byteCount) throws IOException {
checkOffsetAndCount(data.length, offset, byteCount);

if (buffer.byteCount == 0) {
long count = source.read(buffer, Segment.SIZE, Deadline.NONE);
if (count == -1) return -1;
}

Segment head = buffer.head;
int toCopy = Math.min(byteCount, head.limit - head.pos);
System.arraycopy(head.data, head.pos, data, offset, toCopy);

head.pos += toCopy;
buffer.byteCount -= toCopy;

if (head.pos == head.limit) {
buffer.head = head.pop();
SegmentPool.INSTANCE.recycle(head);
}

return toCopy;
}

@Override public int available() throws IOException {
return (int) Math.min(buffer.byteCount, Integer.MAX_VALUE);
}

@Override public void close() throws IOException {
super.close();
}

@Override public String toString() {
return "inputStream(" + source + ")";
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
Expand All @@ -41,7 +43,7 @@ public final class OkBufferTest {
try {
buffer.readUtf8(1);
fail();
} catch (IllegalArgumentException expected) {
} catch (ArrayIndexOutOfBoundsException expected) {
}
}

Expand Down Expand Up @@ -292,6 +294,26 @@ private List<Integer> moveBytesBetweenBuffers(String... contents) {
assertEquals("a" + repeat('b', 9998) + "c", out.toString("UTF-8"));
}

@Test public void outputStreamFromSink() throws Exception {
OkBuffer sink = new OkBuffer();
OutputStream out = OkBuffers.outputStream(sink);
out.write('a');
out.write(repeat('b', 9998).getBytes(UTF_8));
out.write('c');
out.flush();
assertEquals("a" + repeat('b', 9998) + "c", sink.readUtf8(10000));
}

@Test public void outputStreamFromSinkBounds() throws Exception {
OkBuffer sink = new OkBuffer();
OutputStream out = OkBuffers.outputStream(sink);
try {
out.write(new byte[100], 50, 51);
fail();
} catch (ArrayIndexOutOfBoundsException expected) {
}
}

@Test public void sourceFromInputStream() throws Exception {
InputStream in = new ByteArrayInputStream(
("a" + repeat('b', Segment.SIZE * 2) + "c").getBytes(UTF_8));
Expand All @@ -316,6 +338,62 @@ private List<Integer> moveBytesBetweenBuffers(String... contents) {
assertEquals(-1, source.read(sink, 1, Deadline.NONE));
}

@Test public void sourceFromInputStreamBounds() throws Exception {
Source source = OkBuffers.source(new ByteArrayInputStream(new byte[100]));
try {
source.read(new OkBuffer(), -1, Deadline.NONE);
fail();
} catch (IllegalArgumentException expected) {
}
}

@Test public void inputStreamFromSource() throws Exception {
OkBuffer source = new OkBuffer();
source.writeUtf8("a");
source.writeUtf8(repeat('b', Segment.SIZE));
source.writeUtf8("c");

InputStream in = OkBuffers.inputStream(source);
assertEquals(0, in.available());
assertEquals(Segment.SIZE + 2, source.byteCount());

// Reading one byte buffers a full segment.
assertEquals('a', in.read());
assertEquals(Segment.SIZE - 1, in.available());
assertEquals(2, source.byteCount());

// Reading as much as possible reads the rest of that buffered segment.
byte[] data = new byte[Segment.SIZE * 2];
assertEquals(Segment.SIZE - 1, in.read(data, 0, data.length));
assertEquals(repeat('b', Segment.SIZE - 1), new String(data, 0, Segment.SIZE - 1, UTF_8));
assertEquals(2, source.byteCount());

// Continuing to read buffers the next segment.
assertEquals('b', in.read());
assertEquals(1, in.available());
assertEquals(0, source.byteCount());

// Continuing to read reads from the buffer.
assertEquals('c', in.read());
assertEquals(0, in.available());
assertEquals(0, source.byteCount());

// Once we've exhausted the source, we're done.
assertEquals(-1, in.read());
assertEquals(0, source.byteCount());
}

@Test public void inputStreamFromSourceBounds() throws IOException {
OkBuffer source = new OkBuffer();
source.writeUtf8(repeat('a', 100));
InputStream in = OkBuffers.inputStream(source);
try {
in.read(new byte[100], 50, 51);
fail();
} catch (ArrayIndexOutOfBoundsException expected) {
}
}

@Test public void writeBytes() throws Exception {
OkBuffer data = new OkBuffer();
data.writeByte(0xab);
Expand Down