Skip to content

Commit

Permalink
Delay the creation of remoteServerInstanceName.
Browse files Browse the repository at this point in the history
We need `Channel#authority()` to determine `remoteServerInstanceName` when uploading artifacts for BEP. However, we don't need to create it inside `RemoteModule#beforeCommand`, otherwise it has to wait for gRPC channel creation and server capabilities verification.

This CL changes to lazily get the `remoteServerInstanceName` right before it's needed.

Working towards bazelbuild#18607.

PiperOrigin-RevId: 562740380
Change-Id: I05a55e5d9a024f6d6d506da22691b76696b82d6c
  • Loading branch information
coeuvre authored and copybara-github committed Sep 5, 2023
1 parent 25d739b commit 35642f4
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.DigestUtil.isOldStyleDigestFunction;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toListenableFuture;
Expand All @@ -24,6 +25,7 @@
import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.common.base.Ascii;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -76,7 +78,8 @@ class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted
private final RemoteCache remoteCache;
private final String buildRequestId;
private final String commandId;
private final String remoteServerInstanceName;
private final String remoteInstanceName;
private final String remoteBytestreamUriPrefix;

private final AtomicBoolean shutdown = new AtomicBoolean();
private final Scheduler scheduler;
Expand All @@ -93,7 +96,8 @@ class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted
ExtendedEventHandler reporter,
boolean verboseFailures,
RemoteCache remoteCache,
String remoteServerInstanceName,
String remoteInstanceName,
String remoteBytestreamUriPrefix,
String buildRequestId,
String commandId,
XattrProvider xattrProvider,
Expand All @@ -104,7 +108,8 @@ class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted
this.remoteCache = remoteCache;
this.buildRequestId = buildRequestId;
this.commandId = commandId;
this.remoteServerInstanceName = remoteServerInstanceName;
this.remoteInstanceName = remoteInstanceName;
this.remoteBytestreamUriPrefix = remoteBytestreamUriPrefix;
this.scheduler = Schedulers.from(executor);
this.xattrProvider = xattrProvider;
this.remoteBuildEventUploadMode = remoteBuildEventUploadMode;
Expand Down Expand Up @@ -367,6 +372,21 @@ private Single<List<PathMetadata>> uploadLocalFiles(
.collect(Collectors.toList());
}

private Single<String> getRemoteServerInstanceName(RemoteCache remoteCache) {
if (!Strings.isNullOrEmpty(remoteBytestreamUriPrefix)) {
return Single.just(remoteBytestreamUriPrefix);
}

return toSingle(remoteCache.cacheProtocol::getAuthority, directExecutor())
.map(
a -> {
if (!Strings.isNullOrEmpty(remoteInstanceName)) {
return a + "/" + remoteInstanceName;
}
return a;
});
}

private Single<PathConverter> doUpload(Map<Path, LocalFile> files) {
if (files.isEmpty()) {
return Single.just(PathConverter.NO_CONVERSION);
Expand Down Expand Up @@ -403,10 +423,15 @@ private Single<PathConverter> doUpload(Map<Path, LocalFile> files) {
.collect(Collectors.toList())
.flatMap(paths -> queryRemoteCache(remoteCache, context, paths))
.flatMap(paths -> uploadLocalFiles(remoteCache, context, paths))
.map(
.flatMap(
paths ->
new PathConverterImpl(
remoteServerInstanceName, paths, remoteBuildEventUploadMode)),
getRemoteServerInstanceName(remoteCache)
.map(
remoteServerInstanceName ->
new PathConverterImpl(
remoteServerInstanceName,
paths,
remoteBuildEventUploadMode))),
RemoteCache::release);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class ByteStreamBuildEventArtifactUploaderFactory implements BuildEventArtifactU
private final ExtendedEventHandler reporter;
private final boolean verboseFailures;
private final RemoteCache remoteCache;
private final String remoteServerInstanceName;
private final String remoteInstanceName;
private final String remoteBytestreamUriPrefix;
private final String buildRequestId;
private final String commandId;
private final RemoteBuildEventUploadMode remoteBuildEventUploadMode;
Expand All @@ -42,15 +43,17 @@ class ByteStreamBuildEventArtifactUploaderFactory implements BuildEventArtifactU
ExtendedEventHandler reporter,
boolean verboseFailures,
RemoteCache remoteCache,
String remoteServerInstanceName,
String remoteInstanceName,
String remoteBytestreamUriPrefix,
String buildRequestId,
String commandId,
RemoteBuildEventUploadMode remoteBuildEventUploadMode) {
this.executor = executor;
this.reporter = reporter;
this.verboseFailures = verboseFailures;
this.remoteCache = remoteCache;
this.remoteServerInstanceName = remoteServerInstanceName;
this.remoteInstanceName = remoteInstanceName;
this.remoteBytestreamUriPrefix = remoteBytestreamUriPrefix;
this.buildRequestId = buildRequestId;
this.commandId = commandId;
this.remoteBuildEventUploadMode = remoteBuildEventUploadMode;
Expand All @@ -65,7 +68,8 @@ public BuildEventArtifactUploader create(CommandEnvironment env) {
reporter,
verboseFailures,
remoteCache.retain(),
remoteServerInstanceName,
remoteInstanceName,
remoteBytestreamUriPrefix,
buildRequestId,
commandId,
env.getXattrProvider(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ public CacheCapabilities getCacheCapabilities() {
return channel.getServerCapabilities().getCacheCapabilities();
}

@Override
public ListenableFuture<String> getAuthority() {
return channel.withChannelFuture(ch -> Futures.immediateFuture(ch.authority()));
}

@Override
public ListenableFuture<CachedActionResult> downloadActionResult(
RemoteActionExecutionContext context, ActionKey actionKey, boolean inlineOutErr) {
Expand Down Expand Up @@ -521,4 +526,8 @@ ListenableFuture<Void> uploadChunker(
Retrier getRetrier() {
return this.retrier;
}

public ReferenceCountedChannel getChannel() {
return channel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
import com.google.devtools.common.options.OptionsBase;
import com.google.devtools.common.options.OptionsParsingResult;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.netty.handler.codec.DecoderException;
Expand Down Expand Up @@ -544,22 +543,6 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
}
}

String remoteBytestreamUriPrefix = remoteOptions.remoteBytestreamUriPrefix;
if (Strings.isNullOrEmpty(remoteBytestreamUriPrefix)) {
try {
remoteBytestreamUriPrefix = cacheChannel.withChannelBlocking(Channel::authority);
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
}
if (!Strings.isNullOrEmpty(remoteOptions.remoteInstanceName)) {
remoteBytestreamUriPrefix += "/" + remoteOptions.remoteInstanceName;
}
}

RemoteCacheClient cacheClient =
new GrpcCacheClient(
cacheChannel.retain(), callCredentialsProvider, remoteOptions, retrier, digestUtil);
Expand Down Expand Up @@ -652,7 +635,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
env.getReporter(),
verboseFailures,
actionContextProvider.getRemoteCache(),
remoteBytestreamUriPrefix,
remoteOptions.remoteInstanceName,
remoteOptions.remoteBytestreamUriPrefix,
buildRequestId,
invocationId,
remoteOptions.remoteBuildEventUploadMode));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
public interface RemoteCacheClient extends MissingDigestsFinder {
CacheCapabilities getCacheCapabilities();

ListenableFuture<String> getAuthority();

/**
* A key in the remote action cache. The type wraps around a {@link Digest} of an {@link Action}.
* Action keys are special in that they aren't content-addressable but refer to action results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ public CacheCapabilities getCacheCapabilities() {
return remoteCache.getCacheCapabilities();
}

@Override
public ListenableFuture<String> getAuthority() {
return remoteCache.getAuthority();
}

@Override
public ListenableFuture<CachedActionResult> downloadActionResult(
RemoteActionExecutionContext context, ActionKey actionKey, boolean inlineOutErr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.disk;

import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.devtools.build.lib.remote.util.DigestUtil.isOldStyleDigestFunction;

import build.bazel.remote.execution.v2.ActionCacheUpdateCapabilities;
Expand Down Expand Up @@ -97,7 +98,7 @@ private ListenableFuture<Void> download(Digest digest, OutputStream out, boolean
} else {
try (InputStream in = p.getInputStream()) {
ByteStreams.copy(in, out);
return Futures.immediateFuture(null);
return immediateFuture(null);
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
Expand All @@ -117,7 +118,7 @@ public ListenableFuture<Void> downloadBlob(
Utils.verifyBlobContents(digest, digestOut.digest());
}
out.flush();
return Futures.immediateFuture(null);
return immediateFuture(null);
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
Expand Down Expand Up @@ -169,6 +170,11 @@ public CacheCapabilities getCacheCapabilities() {
.build();
}

@Override
public ListenableFuture<String> getAuthority() {
return immediateFuture("");
}

@Override
public ListenableFuture<CachedActionResult> downloadActionResult(
RemoteActionExecutionContext context, ActionKey actionKey, boolean inlineOutErr) {
Expand All @@ -177,16 +183,16 @@ public ListenableFuture<CachedActionResult> downloadActionResult(
actionKey, (digest, out) -> download(digest, out, /* isActionCache= */ true)),
actionResult -> {
if (actionResult == null) {
return Futures.immediateFuture(null);
return immediateFuture(null);
}

try {
checkActionResult(actionResult);
} catch (CacheNotFoundException e) {
return Futures.immediateFuture(null);
return immediateFuture(null);
}

return Futures.immediateFuture(CachedActionResult.disk(actionResult));
return immediateFuture(CachedActionResult.disk(actionResult));
},
MoreExecutors.directExecutor());
}
Expand All @@ -196,7 +202,7 @@ public ListenableFuture<Void> uploadActionResult(
RemoteActionExecutionContext context, ActionKey actionKey, ActionResult actionResult) {
try (InputStream data = actionResult.toByteString().newInput()) {
saveFile(actionKey.getDigest().getHash(), data, /* actionResult= */ true);
return Futures.immediateFuture(null);
return immediateFuture(null);
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
Expand All @@ -213,7 +219,7 @@ public ListenableFuture<Void> uploadFile(
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
return Futures.immediateFuture(null);
return immediateFuture(null);
}

@Override
Expand All @@ -224,15 +230,15 @@ public ListenableFuture<Void> uploadBlob(
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
return Futures.immediateFuture(null);
return immediateFuture(null);
}

@Override
public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(
RemoteActionExecutionContext context, Iterable<Digest> digests) {
// Both upload and download check if the file exists before doing I/O. So we don't
// have to do it here.
return Futures.immediateFuture(ImmutableSet.copyOf(digests));
return immediateFuture(ImmutableSet.copyOf(digests));
}

protected Path toPathNoSplit(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,11 @@ public CacheCapabilities getCacheCapabilities() {
.build();
}

@Override
public ListenableFuture<String> getAuthority() {
return Futures.immediateFuture("");
}

@Override
public ListenableFuture<CachedActionResult> downloadActionResult(
RemoteActionExecutionContext context, ActionKey actionKey, boolean inlineOutErr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,8 @@ private ByteStreamBuildEventArtifactUploader newArtifactUploader(RemoteCache rem
reporter,
/* verboseFailures= */ true,
remoteCache,
/* remoteServerInstanceName= */ "localhost/instance",
/* remoteInstanceName= */ "",
/* remoteBytestreamUriPrefix= */ "localhost/instance",
/* buildRequestId= */ "none",
/* commandId= */ "none",
SyscallCache.NO_CACHE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public CacheCapabilities getCacheCapabilities() {
return CacheCapabilities.getDefaultInstance();
}

@Override
public ListenableFuture<String> getAuthority() {
return Futures.immediateFuture("");
}

@Override
public ListenableFuture<CachedActionResult> downloadActionResult(
RemoteActionExecutionContext context, ActionKey actionKey, boolean inlineOutErr) {
Expand Down

0 comments on commit 35642f4

Please sign in to comment.