Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[5.x] Remote: Add support for compression on gRPC cache #14277

Merged
merged 2 commits into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pkg_tar(
"@com_google_protobuf//:protobuf_java",
"@com_google_protobuf//:protobuf_java_util",
"@com_google_protobuf//:protobuf_javalite",
"@zstd-jni//:zstd-jni",
],
package_dir = "derived/jars",
strip_prefix = "external",
Expand Down
8 changes: 8 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,14 @@ dist_http_archive(
patch_cmds_win = EXPORT_WORKSPACE_IN_BUILD_FILE_WIN,
)

dist_http_archive(
name = "zstd-jni",
patch_cmds = EXPORT_WORKSPACE_IN_BUILD_BAZEL_FILE,
patch_cmds_win = EXPORT_WORKSPACE_IN_BUILD_BAZEL_FILE_WIN,
build_file = "//third_party:zstd-jni/zstd-jni.BUILD",
strip_prefix = "zstd-jni-1.5.0-4"
)

http_archive(
name = "org_snakeyaml",
build_file_content = """
Expand Down
15 changes: 15 additions & 0 deletions distdir_deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,21 @@ DIST_DEPS = {
"test_WORKSPACE_files",
],
},
"zstd-jni": {
"archive": "v1.5.0-4.zip",
"patch_args": ["-p1"],
"patches": [
"//third_party:zstd-jni/Native.java.patch",
],
"sha256": "d320d59b89a163c5efccbe4915ae6a49883ce653cdc670643dfa21c6063108e4",
"urls": [
"https://mirror.bazel.build/github.com/luben/zstd-jni/archive/v1.5.0-4.zip",
"https://github.com/luben/zstd-jni/archive/v1.5.0-4.zip",
],
"used_in": [
"additional_distfiles",
],
},
###################################################
#
# Build time dependencies for testing and packaging
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ filegroup(
"//src/main/java/com/google/devtools/build/lib/remote/merkletree:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/options:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/util:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/zstd:srcs",
],
visibility = ["//src:__subpackages__"],
)
Expand Down Expand Up @@ -81,6 +82,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/remote/merkletree",
"//src/main/java/com/google/devtools/build/lib/remote/options",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/remote/zstd",
"//src/main/java/com/google/devtools/build/lib/sandbox",
"//src/main/java/com/google/devtools/build/lib/skyframe:mutable_supplier",
"//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value",
Expand All @@ -94,6 +96,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
"//src/main/java/com/google/devtools/common/options",
"//src/main/protobuf:failure_details_java_proto",
"//third_party:apache_commons_compress",
"//third_party:auth",
"//third_party:caffeine",
"//third_party:flogger",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static java.lang.String.format;
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -298,9 +299,11 @@ boolean uploadsInProgress() {
}
}

private static String buildUploadResourceName(String instanceName, UUID uuid, Digest digest) {
String resourceName =
format("uploads/%s/blobs/%s/%d", uuid, digest.getHash(), digest.getSizeBytes());
private static String buildUploadResourceName(
String instanceName, UUID uuid, Digest digest, boolean compressed) {
String template =
compressed ? "uploads/%s/compressed-blobs/zstd/%s/%d" : "uploads/%s/blobs/%s/%d";
String resourceName = format(template, uuid, digest.getHash(), digest.getSizeBytes());
if (!Strings.isNullOrEmpty(instanceName)) {
resourceName = instanceName + "/" + resourceName;
}
Expand All @@ -325,7 +328,8 @@ private ListenableFuture<Void> startAsyncUpload(
}

UUID uploadId = UUID.randomUUID();
String resourceName = buildUploadResourceName(instanceName, uploadId, digest);
String resourceName =
buildUploadResourceName(instanceName, uploadId, digest, chunker.isCompressed());
AsyncUpload newUpload =
new AsyncUpload(
context,
Expand Down Expand Up @@ -405,7 +409,20 @@ ListenableFuture<Void> start() {
() ->
retrier.executeAsync(
() -> {
if (committedOffset.get() < chunker.getSize()) {
if (chunker.getSize() == 0) {
return immediateVoidFuture();
}
try {
chunker.seek(committedOffset.get());
} catch (IOException e) {
try {
chunker.reset();
} catch (IOException resetException) {
e.addSuppressed(resetException);
}
return Futures.immediateFailedFuture(e);
}
if (chunker.hasNext()) {
return callAndQueryOnFailure(committedOffset, progressiveBackoff);
}
return Futures.immediateFuture(null);
Expand All @@ -416,13 +433,19 @@ ListenableFuture<Void> start() {
return Futures.transformAsync(
callFuture,
(result) -> {
long committedSize = committedOffset.get();
long expected = chunker.getSize();
if (committedSize != expected) {
String message =
format(
"write incomplete: committed_size %d for %d total", committedSize, expected);
return Futures.immediateFailedFuture(new IOException(message));
if (!chunker.hasNext()) {
// Only check for matching committed size if we have completed the upload.
// If another client did, they might have used a different compression
// level/algorithm, so we cannot know the expected committed offset
long committedSize = committedOffset.get();
long expected = chunker.getOffset();
if (!chunker.hasNext() && committedSize != expected) {
String message =
format(
"write incomplete: committed_size %d for %d total",
committedSize, expected);
return Futures.immediateFailedFuture(new IOException(message));
}
}
return Futures.immediateFuture(null);
},
Expand Down Expand Up @@ -517,17 +540,6 @@ private ListenableFuture<Void> call(AtomicLong committedOffset) {
.withDeadlineAfter(callTimeoutSecs, SECONDS);
call = channel.newCall(ByteStreamGrpc.getWriteMethod(), callOptions);

try {
chunker.seek(committedOffset.get());
} catch (IOException e) {
try {
chunker.reset();
} catch (IOException resetException) {
e.addSuppressed(resetException);
}
return Futures.immediateFailedFuture(e);
}

SettableFuture<Void> uploadResult = SettableFuture.create();
ClientCall.Listener<WriteResponse> callListener =
new ClientCall.Listener<WriteResponse>() {
Expand Down
107 changes: 77 additions & 30 deletions src/main/java/com/google/devtools/build/lib/remote/Chunker.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.min;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.remote.zstd.ZstdCompressingInputStream;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Supplier;
Expand All @@ -55,6 +57,10 @@ static int getDefaultChunkSize() {
return defaultChunkSize;
}

public boolean isCompressed() {
return compressed;
}

/** A piece of a byte[] blob. */
public static final class Chunk {

Expand Down Expand Up @@ -98,19 +104,22 @@ public int hashCode() {
private final int chunkSize;
private final Chunk emptyChunk;

private InputStream data;
private ChunkerInputStream data;
private long offset;
private byte[] chunkCache;

private final boolean compressed;

// Set to true on the first call to next(). This is so that the Chunker can open its data source
// lazily on the first call to next(), as opposed to opening it in the constructor or on reset().
private boolean initialized;

Chunker(Supplier<InputStream> dataSupplier, long size, int chunkSize) {
Chunker(Supplier<InputStream> dataSupplier, long size, int chunkSize, boolean compressed) {
this.dataSupplier = checkNotNull(dataSupplier);
this.size = size;
this.chunkSize = chunkSize;
this.emptyChunk = new Chunk(ByteString.EMPTY, 0);
this.compressed = compressed;
}

public long getOffset() {
Expand All @@ -127,13 +136,9 @@ public long getSize() {
* <p>Closes any open resources (file handles, ...).
*/
public void reset() throws IOException {
if (data != null) {
data.close();
}
data = null;
close();
offset = 0;
initialized = false;
chunkCache = null;
}

/**
Expand All @@ -148,6 +153,9 @@ public void seek(long toOffset) throws IOException {
maybeInitialize();
ByteStreams.skipFully(data, toOffset - offset);
offset = toOffset;
if (data.finished()) {
close();
}
}

/**
Expand All @@ -157,6 +165,27 @@ public boolean hasNext() {
return data != null || !initialized;
}

/** Closes the input stream and reset chunk cache */
private void close() throws IOException {
if (data != null) {
data.close();
data = null;
}
chunkCache = null;
}

/** Attempts reading at most a full chunk and stores it in the chunkCache buffer */
private int read() throws IOException {
int count = 0;
while (count < chunkCache.length) {
int c = data.read(chunkCache, count, chunkCache.length - count);
if (c < 0) {
break;
}
count += c;
}
return count;
}
/**
* Returns the next {@link Chunk} or throws a {@link NoSuchElementException} if no data is left.
*
Expand All @@ -178,46 +207,40 @@ public Chunk next() throws IOException {
return emptyChunk;
}

// The cast to int is safe, because the return value is capped at chunkSize.
int bytesToRead = (int) Math.min(bytesLeft(), chunkSize);
if (bytesToRead == 0) {
if (data.finished()) {
chunkCache = null;
data = null;
throw new NoSuchElementException();
}

if (chunkCache == null) {
// If the output is compressed we can't know how many bytes there are yet to read,
// so we allocate the whole chunkSize, otherwise we try to compute the smallest possible value
// The cast to int is safe, because the return value is capped at chunkSize.
int cacheSize = compressed ? chunkSize : (int) min(getSize() - getOffset(), chunkSize);
// Lazily allocate it in order to save memory on small data.
// 1) bytesToRead < chunkSize: There will only ever be one next() call.
// 2) bytesToRead == chunkSize: chunkCache will be set to its biggest possible value.
// 3) bytestoRead > chunkSize: Not possible, due to Math.min above.
chunkCache = new byte[bytesToRead];
chunkCache = new byte[cacheSize];
}

long offsetBefore = offset;
try {
ByteStreams.readFully(data, chunkCache, 0, bytesToRead);
} catch (EOFException e) {
throw new IllegalStateException("Reached EOF, but expected "
+ bytesToRead + " bytes.", e);
}
offset += bytesToRead;

ByteString blob = ByteString.copyFrom(chunkCache, 0, bytesToRead);
int bytesRead = read();

if (bytesLeft() == 0) {
data.close();
data = null;
chunkCache = null;
ByteString blob = ByteString.copyFrom(chunkCache, 0, bytesRead);

// This has to happen after actualSize has been updated
// or the guard in getActualSize won't work.
offset += bytesRead;
if (data.finished()) {
close();
}

return new Chunk(blob, offsetBefore);
}

public long bytesLeft() {
return getSize() - getOffset();
}

private void maybeInitialize() throws IOException {
if (initialized) {
return;
Expand All @@ -226,7 +249,10 @@ private void maybeInitialize() throws IOException {
checkState(offset == 0);
checkState(chunkCache == null);
try {
data = dataSupplier.get();
data =
compressed
? new ChunkerInputStream(new ZstdCompressingInputStream(dataSupplier.get()))
: new ChunkerInputStream(dataSupplier.get());
} catch (RuntimeException e) {
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw e;
Expand All @@ -242,6 +268,7 @@ public static Builder builder() {
public static class Builder {
private int chunkSize = getDefaultChunkSize();
private long size;
private boolean compressed;
private Supplier<InputStream> inputStream;

public Builder setInput(byte[] data) {
Expand All @@ -251,6 +278,11 @@ public Builder setInput(byte[] data) {
return this;
}

public Builder setCompressed(boolean compressed) {
this.compressed = compressed;
return this;
}

public Builder setInput(long size, InputStream in) {
checkState(inputStream == null);
checkNotNull(in);
Expand Down Expand Up @@ -305,7 +337,22 @@ public Builder setChunkSize(int chunkSize) {

public Chunker build() {
checkNotNull(inputStream);
return new Chunker(inputStream, size, chunkSize);
return new Chunker(inputStream, size, chunkSize, compressed);
}
}

static class ChunkerInputStream extends PushbackInputStream {
ChunkerInputStream(InputStream in) {
super(in);
}

public boolean finished() throws IOException {
int c = super.read();
if (c == -1) {
return true;
}
super.unread(c);
return false;
}
}
}
Loading