Skip to content

Commit

Permalink
Remote: Add support for compression on gRPC cache
Browse files Browse the repository at this point in the history
Add support for compressed transfers from/to gRPC remote caches with flag --experimental_remote_cache_compression.

Fixes bazelbuild#13344.

Closes bazelbuild#14041.

PiperOrigin-RevId: 409328001
  • Loading branch information
AlessandroPatti authored and coeuvre committed Nov 15, 2021
1 parent ae76eac commit f076f6b
Show file tree
Hide file tree
Showing 23 changed files with 1,020 additions and 86 deletions.
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pkg_tar(
"@com_google_protobuf//:protobuf_java",
"@com_google_protobuf//:protobuf_java_util",
"@com_google_protobuf//:protobuf_javalite",
"@zstd-jni//:zstd-jni",
],
package_dir = "derived/jars",
strip_prefix = "external",
Expand Down
8 changes: 8 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,14 @@ dist_http_archive(
patch_cmds_win = EXPORT_WORKSPACE_IN_BUILD_FILE_WIN,
)

dist_http_archive(
name = "zstd-jni",
patch_cmds = EXPORT_WORKSPACE_IN_BUILD_BAZEL_FILE,
patch_cmds_win = EXPORT_WORKSPACE_IN_BUILD_BAZEL_FILE_WIN,
build_file = "//third_party:zstd-jni/zstd-jni.BUILD",
strip_prefix = "zstd-jni-1.5.0-4"
)

http_archive(
name = "org_snakeyaml",
build_file_content = """
Expand Down
15 changes: 15 additions & 0 deletions distdir_deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,21 @@ DIST_DEPS = {
"test_WORKSPACE_files",
],
},
"zstd-jni": {
"archive": "v1.5.0-4.zip",
"patch_args": ["-p1"],
"patches": [
"//third_party:zstd-jni/Native.java.patch",
],
"sha256": "d320d59b89a163c5efccbe4915ae6a49883ce653cdc670643dfa21c6063108e4",
"urls": [
"https://mirror.bazel.build/github.com/luben/zstd-jni/archive/v1.5.0-4.zip",
"https://github.com/luben/zstd-jni/archive/v1.5.0-4.zip",
],
"used_in": [
"additional_distfiles",
],
},
###################################################
#
# Build time dependencies for testing and packaging
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ filegroup(
"//src/main/java/com/google/devtools/build/lib/remote/merkletree:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/options:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/util:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/zstd:srcs",
],
visibility = ["//src:__subpackages__"],
)
Expand Down Expand Up @@ -81,6 +82,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/remote/merkletree",
"//src/main/java/com/google/devtools/build/lib/remote/options",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/remote/zstd",
"//src/main/java/com/google/devtools/build/lib/sandbox",
"//src/main/java/com/google/devtools/build/lib/skyframe:mutable_supplier",
"//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value",
Expand All @@ -94,6 +96,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
"//src/main/java/com/google/devtools/common/options",
"//src/main/protobuf:failure_details_java_proto",
"//third_party:apache_commons_compress",
"//third_party:auth",
"//third_party:caffeine",
"//third_party:flogger",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static java.lang.String.format;
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -298,9 +299,11 @@ boolean uploadsInProgress() {
}
}

private static String buildUploadResourceName(String instanceName, UUID uuid, Digest digest) {
String resourceName =
format("uploads/%s/blobs/%s/%d", uuid, digest.getHash(), digest.getSizeBytes());
private static String buildUploadResourceName(
String instanceName, UUID uuid, Digest digest, boolean compressed) {
String template =
compressed ? "uploads/%s/compressed-blobs/zstd/%s/%d" : "uploads/%s/blobs/%s/%d";
String resourceName = format(template, uuid, digest.getHash(), digest.getSizeBytes());
if (!Strings.isNullOrEmpty(instanceName)) {
resourceName = instanceName + "/" + resourceName;
}
Expand All @@ -325,7 +328,8 @@ private ListenableFuture<Void> startAsyncUpload(
}

UUID uploadId = UUID.randomUUID();
String resourceName = buildUploadResourceName(instanceName, uploadId, digest);
String resourceName =
buildUploadResourceName(instanceName, uploadId, digest, chunker.isCompressed());
AsyncUpload newUpload =
new AsyncUpload(
context,
Expand Down Expand Up @@ -405,7 +409,20 @@ ListenableFuture<Void> start() {
() ->
retrier.executeAsync(
() -> {
if (committedOffset.get() < chunker.getSize()) {
if (chunker.getSize() == 0) {
return immediateVoidFuture();
}
try {
chunker.seek(committedOffset.get());
} catch (IOException e) {
try {
chunker.reset();
} catch (IOException resetException) {
e.addSuppressed(resetException);
}
return Futures.immediateFailedFuture(e);
}
if (chunker.hasNext()) {
return callAndQueryOnFailure(committedOffset, progressiveBackoff);
}
return Futures.immediateFuture(null);
Expand All @@ -416,13 +433,19 @@ ListenableFuture<Void> start() {
return Futures.transformAsync(
callFuture,
(result) -> {
long committedSize = committedOffset.get();
long expected = chunker.getSize();
if (committedSize != expected) {
String message =
format(
"write incomplete: committed_size %d for %d total", committedSize, expected);
return Futures.immediateFailedFuture(new IOException(message));
if (!chunker.hasNext()) {
// Only check for matching committed size if we have completed the upload.
// If another client did, they might have used a different compression
// level/algorithm, so we cannot know the expected committed offset
long committedSize = committedOffset.get();
long expected = chunker.getOffset();
if (!chunker.hasNext() && committedSize != expected) {
String message =
format(
"write incomplete: committed_size %d for %d total",
committedSize, expected);
return Futures.immediateFailedFuture(new IOException(message));
}
}
return Futures.immediateFuture(null);
},
Expand Down Expand Up @@ -517,17 +540,6 @@ private ListenableFuture<Void> call(AtomicLong committedOffset) {
.withDeadlineAfter(callTimeoutSecs, SECONDS);
call = channel.newCall(ByteStreamGrpc.getWriteMethod(), callOptions);

try {
chunker.seek(committedOffset.get());
} catch (IOException e) {
try {
chunker.reset();
} catch (IOException resetException) {
e.addSuppressed(resetException);
}
return Futures.immediateFailedFuture(e);
}

SettableFuture<Void> uploadResult = SettableFuture.create();
ClientCall.Listener<WriteResponse> callListener =
new ClientCall.Listener<WriteResponse>() {
Expand Down
107 changes: 77 additions & 30 deletions src/main/java/com/google/devtools/build/lib/remote/Chunker.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.min;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.remote.zstd.ZstdCompressingInputStream;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Supplier;
Expand All @@ -55,6 +57,10 @@ static int getDefaultChunkSize() {
return defaultChunkSize;
}

public boolean isCompressed() {
return compressed;
}

/** A piece of a byte[] blob. */
public static final class Chunk {

Expand Down Expand Up @@ -98,19 +104,22 @@ public int hashCode() {
private final int chunkSize;
private final Chunk emptyChunk;

private InputStream data;
private ChunkerInputStream data;
private long offset;
private byte[] chunkCache;

private final boolean compressed;

// Set to true on the first call to next(). This is so that the Chunker can open its data source
// lazily on the first call to next(), as opposed to opening it in the constructor or on reset().
private boolean initialized;

Chunker(Supplier<InputStream> dataSupplier, long size, int chunkSize) {
Chunker(Supplier<InputStream> dataSupplier, long size, int chunkSize, boolean compressed) {
this.dataSupplier = checkNotNull(dataSupplier);
this.size = size;
this.chunkSize = chunkSize;
this.emptyChunk = new Chunk(ByteString.EMPTY, 0);
this.compressed = compressed;
}

public long getOffset() {
Expand All @@ -127,13 +136,9 @@ public long getSize() {
* <p>Closes any open resources (file handles, ...).
*/
public void reset() throws IOException {
if (data != null) {
data.close();
}
data = null;
close();
offset = 0;
initialized = false;
chunkCache = null;
}

/**
Expand All @@ -148,6 +153,9 @@ public void seek(long toOffset) throws IOException {
maybeInitialize();
ByteStreams.skipFully(data, toOffset - offset);
offset = toOffset;
if (data.finished()) {
close();
}
}

/**
Expand All @@ -157,6 +165,27 @@ public boolean hasNext() {
return data != null || !initialized;
}

/** Closes the input stream and reset chunk cache */
private void close() throws IOException {
if (data != null) {
data.close();
data = null;
}
chunkCache = null;
}

/** Attempts reading at most a full chunk and stores it in the chunkCache buffer */
private int read() throws IOException {
int count = 0;
while (count < chunkCache.length) {
int c = data.read(chunkCache, count, chunkCache.length - count);
if (c < 0) {
break;
}
count += c;
}
return count;
}
/**
* Returns the next {@link Chunk} or throws a {@link NoSuchElementException} if no data is left.
*
Expand All @@ -178,46 +207,40 @@ public Chunk next() throws IOException {
return emptyChunk;
}

// The cast to int is safe, because the return value is capped at chunkSize.
int bytesToRead = (int) Math.min(bytesLeft(), chunkSize);
if (bytesToRead == 0) {
if (data.finished()) {
chunkCache = null;
data = null;
throw new NoSuchElementException();
}

if (chunkCache == null) {
// If the output is compressed we can't know how many bytes there are yet to read,
// so we allocate the whole chunkSize, otherwise we try to compute the smallest possible value
// The cast to int is safe, because the return value is capped at chunkSize.
int cacheSize = compressed ? chunkSize : (int) min(getSize() - getOffset(), chunkSize);
// Lazily allocate it in order to save memory on small data.
// 1) bytesToRead < chunkSize: There will only ever be one next() call.
// 2) bytesToRead == chunkSize: chunkCache will be set to its biggest possible value.
// 3) bytestoRead > chunkSize: Not possible, due to Math.min above.
chunkCache = new byte[bytesToRead];
chunkCache = new byte[cacheSize];
}

long offsetBefore = offset;
try {
ByteStreams.readFully(data, chunkCache, 0, bytesToRead);
} catch (EOFException e) {
throw new IllegalStateException("Reached EOF, but expected "
+ bytesToRead + " bytes.", e);
}
offset += bytesToRead;

ByteString blob = ByteString.copyFrom(chunkCache, 0, bytesToRead);
int bytesRead = read();

if (bytesLeft() == 0) {
data.close();
data = null;
chunkCache = null;
ByteString blob = ByteString.copyFrom(chunkCache, 0, bytesRead);

// This has to happen after actualSize has been updated
// or the guard in getActualSize won't work.
offset += bytesRead;
if (data.finished()) {
close();
}

return new Chunk(blob, offsetBefore);
}

public long bytesLeft() {
return getSize() - getOffset();
}

private void maybeInitialize() throws IOException {
if (initialized) {
return;
Expand All @@ -226,7 +249,10 @@ private void maybeInitialize() throws IOException {
checkState(offset == 0);
checkState(chunkCache == null);
try {
data = dataSupplier.get();
data =
compressed
? new ChunkerInputStream(new ZstdCompressingInputStream(dataSupplier.get()))
: new ChunkerInputStream(dataSupplier.get());
} catch (RuntimeException e) {
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw e;
Expand All @@ -242,6 +268,7 @@ public static Builder builder() {
public static class Builder {
private int chunkSize = getDefaultChunkSize();
private long size;
private boolean compressed;
private Supplier<InputStream> inputStream;

public Builder setInput(byte[] data) {
Expand All @@ -251,6 +278,11 @@ public Builder setInput(byte[] data) {
return this;
}

public Builder setCompressed(boolean compressed) {
this.compressed = compressed;
return this;
}

public Builder setInput(long size, InputStream in) {
checkState(inputStream == null);
checkNotNull(in);
Expand Down Expand Up @@ -305,7 +337,22 @@ public Builder setChunkSize(int chunkSize) {

public Chunker build() {
checkNotNull(inputStream);
return new Chunker(inputStream, size, chunkSize);
return new Chunker(inputStream, size, chunkSize, compressed);
}
}

static class ChunkerInputStream extends PushbackInputStream {
ChunkerInputStream(InputStream in) {
super(in);
}

public boolean finished() throws IOException {
int c = super.read();
if (c == -1) {
return true;
}
super.unread(c);
return false;
}
}
}
Loading

0 comments on commit f076f6b

Please sign in to comment.