diff --git a/okhttp-tests/src/test/java/okhttp3/internal/cache2/RelayTest.java b/okhttp-tests/src/test/java/okhttp3/internal/cache2/RelayTest.java new file mode 100644 index 000000000000..cbffa87d0992 --- /dev/null +++ b/okhttp-tests/src/test/java/okhttp3/internal/cache2/RelayTest.java @@ -0,0 +1,255 @@ +/* + * Copyright (C) 2016 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.cache2; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import okio.Buffer; +import okio.BufferedSink; +import okio.BufferedSource; +import okio.ByteString; +import okio.Okio; +import okio.Pipe; +import okio.Source; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public final class RelayTest { + @Rule public final TemporaryFolder tempDir = new TemporaryFolder(); + + private ExecutorService executor = Executors.newCachedThreadPool(); + private ByteString metadata = ByteString.encodeUtf8("great metadata!"); + private File file; + + @Before public void setUp() throws Exception { + file = tempDir.newFile(); + } + + @After public void tearDown() throws Exception { + executor.shutdown(); + } + + @Test public void singleSource() throws Exception { + Buffer upstream = new Buffer(); + upstream.writeUtf8("abcdefghijklm"); + + Relay relay = Relay.edit(file, upstream, metadata, 1024); + Source source = relay.newSource(); + Buffer sourceBuffer = new Buffer(); + + assertEquals(5, source.read(sourceBuffer, 5)); + assertEquals("abcde", sourceBuffer.readUtf8()); + + assertEquals(8, source.read(sourceBuffer, 1024)); + assertEquals("fghijklm", sourceBuffer.readUtf8()); + + assertEquals(-1, source.read(sourceBuffer, 1024)); + assertEquals(0, sourceBuffer.size()); + + source.close(); + assertTrue(relay.isClosed()); + assertFile(Relay.PREFIX_CLEAN, 13L, metadata.size(), "abcdefghijklm", metadata); + } + + @Test public void multipleSources() throws Exception { + Buffer upstream = new Buffer(); + upstream.writeUtf8("abcdefghijklm"); + + Relay relay = Relay.edit(file, upstream, metadata, 1024); + BufferedSource source1 = Okio.buffer(relay.newSource()); + BufferedSource source2 = Okio.buffer(relay.newSource()); + + assertEquals("abcdefghijklm", source1.readUtf8()); + assertEquals("abcdefghijklm", source2.readUtf8()); + source1.close(); + source2.close(); + assertTrue(relay.isClosed()); + + assertFile(Relay.PREFIX_CLEAN, 13L, metadata.size(), "abcdefghijklm", metadata); + } + + @Test public void readFromBuffer() throws Exception { + Buffer upstream = new Buffer(); + upstream.writeUtf8("abcdefghij"); + + Relay relay = Relay.edit(file, upstream, metadata, 5); + BufferedSource source1 = Okio.buffer(relay.newSource()); + BufferedSource source2 = Okio.buffer(relay.newSource()); + + assertEquals("abcde", source1.readUtf8(5)); + assertEquals("abcde", source2.readUtf8(5)); + assertEquals("fghij", source2.readUtf8(5)); + assertEquals("fghij", source1.readUtf8(5)); + assertTrue(source1.exhausted()); + assertTrue(source2.exhausted()); + source1.close(); + source2.close(); + assertTrue(relay.isClosed()); + + assertFile(Relay.PREFIX_CLEAN, 10L, metadata.size(), "abcdefghij", metadata); + } + + @Test public void readFromFile() throws Exception { + Buffer upstream = new Buffer(); + upstream.writeUtf8("abcdefghijklmnopqrst"); + + Relay relay = Relay.edit(file, upstream, metadata, 5); + BufferedSource source1 = Okio.buffer(relay.newSource()); + BufferedSource source2 = Okio.buffer(relay.newSource()); + + assertEquals("abcdefghij", source1.readUtf8(10)); + assertEquals("abcdefghij", source2.readUtf8(10)); + assertEquals("klmnopqrst", source2.readUtf8(10)); + assertEquals("klmnopqrst", source1.readUtf8(10)); + assertTrue(source1.exhausted()); + assertTrue(source2.exhausted()); + source1.close(); + source2.close(); + assertTrue(relay.isClosed()); + + assertFile(Relay.PREFIX_CLEAN, 20L, metadata.size(), "abcdefghijklmnopqrst", metadata); + } + + @Test public void readAfterEdit() throws Exception { + Buffer upstream = new Buffer(); + upstream.writeUtf8("abcdefghij"); + + Relay relay1 = Relay.edit(file, upstream, metadata, 5); + BufferedSource source1 = Okio.buffer(relay1.newSource()); + assertEquals("abcdefghij", source1.readUtf8(10)); + assertTrue(source1.exhausted()); + source1.close(); + assertTrue(relay1.isClosed()); + + // Since relay1 is closed, new sources cannot be created. + assertNull(relay1.newSource()); + + Relay relay2 = Relay.read(file); + assertEquals(metadata, relay2.metadata()); + BufferedSource source2 = Okio.buffer(relay2.newSource()); + assertEquals("abcdefghij", source2.readUtf8(10)); + assertTrue(source2.exhausted()); + source2.close(); + assertTrue(relay2.isClosed()); + + // Since relay2 is closed, new sources cannot be created. + assertNull(relay2.newSource()); + + assertFile(Relay.PREFIX_CLEAN, 10L, metadata.size(), "abcdefghij", metadata); + } + + @Test public void closeBeforeExhaustLeavesDirtyFile() throws Exception { + Buffer upstream = new Buffer(); + upstream.writeUtf8("abcdefghij"); + + Relay relay1 = Relay.edit(file, upstream, metadata, 5); + BufferedSource source1 = Okio.buffer(relay1.newSource()); + assertEquals("abcdefghij", source1.readUtf8(10)); + source1.close(); // Not exhausted! + assertTrue(relay1.isClosed()); + + try { + Relay.read(file); + fail(); + } catch (IOException expected) { + assertEquals("unreadable cache file", expected.getMessage()); + } + + assertFile(Relay.PREFIX_DIRTY, -1L, -1, null, null); + } + + @Test public void redundantCallsToCloseAreIgnored() throws Exception { + Buffer upstream = new Buffer(); + upstream.writeUtf8("abcde"); + + Relay relay = Relay.edit(file, upstream, metadata, 1024); + Source source1 = relay.newSource(); + Source source2 = relay.newSource(); + + source1.close(); + source1.close(); // Unnecessary. Shouldn't decrement the reference count. + assertFalse(relay.isClosed()); + + source2.close(); + assertTrue(relay.isClosed()); + assertFile(Relay.PREFIX_DIRTY, -1L, -1, null, null); + } + + @Test public void racingReaders() throws Exception { + Pipe pipe = new Pipe(1024); + BufferedSink sink = Okio.buffer(pipe.sink()); + + Relay relay = Relay.edit(file, pipe.source(), metadata, 5); + + Future future1 = executor.submit(sourceReader(relay.newSource())); + Future future2 = executor.submit(sourceReader(relay.newSource())); + + Thread.sleep(500); + sink.writeUtf8("abcdefghij"); + + Thread.sleep(500); + sink.writeUtf8("klmnopqrst"); + sink.close(); + + assertEquals(ByteString.encodeUtf8("abcdefghijklmnopqrst"), future1.get()); + assertEquals(ByteString.encodeUtf8("abcdefghijklmnopqrst"), future2.get()); + + assertTrue(relay.isClosed()); + + assertFile(Relay.PREFIX_CLEAN, 20L, metadata.size(), "abcdefghijklmnopqrst", metadata); + } + + /** Returns a callable that reads all of source, closes it, and returns the bytes. */ + private Callable sourceReader(final Source source) { + return new Callable() { + @Override public ByteString call() throws Exception { + Buffer buffer = new Buffer(); + while (source.read(buffer, 16384) != -1) { + } + source.close(); + return buffer.readByteString(); + } + }; + } + + private void assertFile(ByteString prefix, long upstreamSize, int metadataSize, String upstream, + ByteString metadata) throws IOException { + BufferedSource source = Okio.buffer(Okio.source(file)); + assertEquals(prefix, source.readByteString(prefix.size())); + assertEquals(upstreamSize, source.readLong()); + assertEquals(metadataSize, source.readLong()); + if (upstream != null) { + assertEquals(upstream, source.readUtf8(upstreamSize)); + } + if (metadata != null) { + assertEquals(metadata, source.readByteString(metadataSize)); + } + source.close(); + } +} diff --git a/okhttp/src/main/java/okhttp3/internal/cache2/Relay.java b/okhttp/src/main/java/okhttp3/internal/cache2/Relay.java new file mode 100644 index 000000000000..b2db592d1e3d --- /dev/null +++ b/okhttp/src/main/java/okhttp3/internal/cache2/Relay.java @@ -0,0 +1,361 @@ +/* + * Copyright (C) 2016 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.cache2; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import okio.Buffer; +import okio.ByteString; +import okio.Source; +import okio.Timeout; + +import static okhttp3.internal.Util.closeQuietly; + +/** + * Replicates a single upstream source into multiple downstream sources. Each downstream source + * returns the same bytes as the upstream source. Downstream sources may read data either as it + * is returned by upstream, or after the upstream source has been exhausted. + * + *

As bytes are returned from upstream they are written to a local file. Downstream sources read + * from this file as necessary. + * + *

This class also keeps a small buffer of bytes recently read from upstream. This is intended to + * save a small amount of file I/O and data copying. + */ +// TODO(jwilson): what to do about timeouts? They could be different and unfortunately when any +// timeout is hit we like to tear down the whole stream. +final class Relay { + private static final int SOURCE_UPSTREAM = 1; + private static final int SOURCE_FILE = 2; + + static final ByteString PREFIX_CLEAN = ByteString.encodeUtf8("OkHttp cache v1\n"); + static final ByteString PREFIX_DIRTY = ByteString.encodeUtf8("OkHttp DIRTY :(\n"); + private static final long FILE_HEADER_SIZE = 32L; + + /** + * Read/write persistence of the upstream source and its metadata. Its layout is as follows: + * + *

    + *
  • 16 bytes: either {@code OkHttp cache v1\n} if the persisted file is complete. This is + * another sequence of bytes if the file is incomplete and should not be used. + *
  • 8 bytes: n: upstream data size + *
  • 8 bytes: m: metadata size + *
  • n bytes: upstream data + *
  • m bytes: metadata + *
+ * + *

This is closed and assigned to null when the last source is closed and no further sources + * are permitted. + */ + private RandomAccessFile file; + + /** The thread that currently has access to upstream. Possibly null. Guarded by this. */ + private Thread upstreamReader; + + /** + * Null once the file has a complete copy of the upstream bytes. Only the {@code upstreamReader} + * thread may access this source. + */ + private Source upstream; + + /** + * A buffer for {@code upstreamReader} to use when pulling bytes from upstream. Only the {@code + * upstreamReader} thread may access this buffer. + */ + private final Buffer upstreamBuffer = new Buffer(); + + /** The number of bytes consumed from {@link #upstream}. Guarded by this. */ + private long upstreamPos; + + /** True if there are no further bytes to read from {@code upstream}. Guarded by this. */ + private boolean complete; + + /** User-supplied additional data persisted with the source data. */ + private final ByteString metadata; + + /** + * The most recently read bytes from {@link #upstream}. This is a suffix of {@link #file}. Guarded + * by this. + */ + private final Buffer buffer = new Buffer(); + + /** The maximum size of {@code buffer}. */ + private final long bufferMaxSize; + + /** + * Reference count of the number of active sources reading this stream. When decremented to 0 + * resources are released and all following calls to {@link #newSource} return null. Guarded by + * this. + */ + private int sourceCount; + + private Relay(RandomAccessFile file, Source upstream, long upstreamPos, ByteString metadata, + long bufferMaxSize) { + this.file = file; + this.upstream = upstream; + this.complete = upstream == null; + this.upstreamPos = upstreamPos; + this.metadata = metadata; + this.bufferMaxSize = bufferMaxSize; + } + + /** + * Creates a new relay that reads a live stream from {@code upstream}, using {@code file} to share + * that data with other sources. + * + *

Warning: callers to this method must immediately call {@link #newSource} to + * create a source and close that when they're done. Otherwise a handle to {@code file} will be + * leaked. + */ + public static Relay edit( + File file, Source upstream, ByteString metadata, long bufferMaxSize) throws IOException { + RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); + Relay result = new Relay(randomAccessFile, upstream, 0L, metadata, bufferMaxSize); + + // Write a dirty header. That way if we crash we won't attempt to recover this. + randomAccessFile.setLength(0L); + result.writeHeader(PREFIX_DIRTY, -1L, -1L); + + return result; + } + + /** + * Creates a relay that reads a recorded stream from {@code file}. + * + *

Warning: callers to this method must immediately call {@link #newSource} to + * create a source and close that when they're done. Otherwise a handle to {@code file} will be + * leaked. + */ + public static Relay read(File file) throws IOException { + RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); + FileOperator fileOperator = new FileOperator(randomAccessFile.getChannel()); + + // Read the header. + Buffer header = new Buffer(); + fileOperator.read(0, header, FILE_HEADER_SIZE); + ByteString prefix = header.readByteString(PREFIX_CLEAN.size()); + if (!prefix.equals(PREFIX_CLEAN)) throw new IOException("unreadable cache file"); + long upstreamSize = header.readLong(); + long metadataSize = header.readLong(); + + // Read the metadata. + Buffer metadataBuffer = new Buffer(); + fileOperator.read(FILE_HEADER_SIZE + upstreamSize, metadataBuffer, metadataSize); + ByteString metadata = metadataBuffer.readByteString(); + + // Return the result. + return new Relay(randomAccessFile, null, upstreamSize, metadata, 0L); + } + + private void writeHeader( + ByteString prefix, long upstreamSize, long metadataSize) throws IOException { + Buffer header = new Buffer(); + header.write(prefix); + header.writeLong(upstreamSize); + header.writeLong(metadataSize); + if (header.size() != FILE_HEADER_SIZE) throw new IllegalArgumentException(); + + FileOperator fileOperator = new FileOperator(file.getChannel()); + fileOperator.write(0, header, FILE_HEADER_SIZE); + } + + private void writeMetadata(long upstreamSize) throws IOException { + Buffer metadataBuffer = new Buffer(); + metadataBuffer.write(metadata); + + FileOperator fileOperator = new FileOperator(file.getChannel()); + fileOperator.write(FILE_HEADER_SIZE + upstreamSize, metadataBuffer, metadata.size()); + } + + void commit(long upstreamSize) throws IOException { + // Write metadata to the end of the file. + writeMetadata(upstreamSize); + file.getChannel().force(false); + + // Once everything else is in place we can swap the dirty header for a clean one. + writeHeader(PREFIX_CLEAN, upstreamSize, metadata.size()); + file.getChannel().force(false); + + // This file is complete. + synchronized (Relay.this) { + complete = true; + } + + closeQuietly(upstream); + upstream = null; + } + + boolean isClosed() { + return file == null; + } + + public ByteString metadata() { + return metadata; + } + + /** + * Returns a new source that returns the same bytes as upstream. Returns null if this relay has + * been closed and no further sources are possible. In that case callers should retry after + * building a new relay with {@link #read}. + */ + public Source newSource() { + synchronized (Relay.this) { + if (file == null) return null; + sourceCount++; + } + + return new RelaySource(); + } + + class RelaySource implements Source { + private final Timeout timeout = new Timeout(); + + /** The operator to read and write the shared file. Null if this source is closed. */ + private FileOperator fileOperator = new FileOperator(file.getChannel()); + + /** The next byte to read. This is always less than or equal to {@code upstreamPos}. */ + private long sourcePos; + + /** + * Selects where to find the bytes for a read and read them. This is one of three sources. + * + *

Upstream:

+ * In this case the current thread is assigned as the upstream reader. We read bytes from + * upstream and copy them to both the file and to the buffer. Finally we release the upstream + * reader lock and return the new bytes. + * + *

The file

+ * In this case we copy bytes from the file to the {@code sink}. + * + *

The buffer

+ * In this case the bytes are immediately copied into {@code sink} and the number of bytes + * copied is returned. + * + *

If upstream would be selected but another thread is already reading upstream this will + * block until that read completes. It is possible to time out while waiting for that. + */ + @Override public long read(Buffer sink, long byteCount) throws IOException { + if (fileOperator == null) throw new IllegalStateException("closed"); + + long upstreamPos; + int source; + + selectSource: + synchronized (Relay.this) { + // We need new data from upstream. + while (sourcePos == (upstreamPos = Relay.this.upstreamPos)) { + // No more data upstream. We're done. + if (complete) return -1L; + + // Another thread is already reading. Wait for that. + if (upstreamReader != null) { + timeout.waitUntilNotified(Relay.this); + continue; + } + + // We will do the read. + upstreamReader = Thread.currentThread(); + source = SOURCE_UPSTREAM; + break selectSource; + } + + long bufferPos = upstreamPos - buffer.size(); + + // Bytes of the read precede the buffer. Read from the file. + if (sourcePos < bufferPos) { + source = SOURCE_FILE; + break selectSource; + } + + // The buffer has the data we need. Read from there and return immediately. + long bytesToRead = Math.min(byteCount, upstreamPos - sourcePos); + buffer.copyTo(sink, sourcePos - bufferPos, bytesToRead); + sourcePos += bytesToRead; + return bytesToRead; + } + + // Read from the file. + if (source == SOURCE_FILE) { + long bytesToRead = Math.min(byteCount, upstreamPos - sourcePos); + fileOperator.read(FILE_HEADER_SIZE + sourcePos, sink, bytesToRead); + sourcePos += bytesToRead; + return bytesToRead; + } + + // Read from upstream. This always reads a full buffer: that might be more than what the + // current call to Source.read() has requested. + try { + long upstreamBytesRead = upstream.read(upstreamBuffer, bufferMaxSize); + + // If we've exhausted upstream, we're done. + if (upstreamBytesRead == -1L) { + commit(upstreamPos); + return -1L; + } + + // Update this source and prepare this call's result. + long bytesRead = Math.min(upstreamBytesRead, byteCount); + upstreamBuffer.copyTo(sink, 0, bytesRead); + sourcePos += bytesRead; + + // Append the upstream bytes to the file. + fileOperator.write( + FILE_HEADER_SIZE + upstreamPos, upstreamBuffer.clone(), upstreamBytesRead); + + synchronized (Relay.this) { + // Append new upstream bytes into the buffer. Trim it to its max size. + buffer.write(upstreamBuffer, upstreamBytesRead); + if (buffer.size() > bufferMaxSize) { + buffer.skip(buffer.size() - bufferMaxSize); + } + + // Now that the file and buffer have bytes, adjust upstreamPos. + Relay.this.upstreamPos += upstreamBytesRead; + } + + return bytesRead; + } finally { + synchronized (Relay.this) { + upstreamReader = null; + Relay.this.notifyAll(); + } + } + } + + @Override public Timeout timeout() { + return timeout; + } + + @Override public void close() throws IOException { + if (fileOperator == null) return; // Already closed. + fileOperator = null; + + RandomAccessFile fileToClose = null; + synchronized (Relay.this) { + sourceCount--; + if (sourceCount == 0) { + fileToClose = file; + file = null; + } + } + + if (fileToClose != null) { + closeQuietly(fileToClose); + } + } + } +}