Skip to content

Commit

Permalink
Make the BesUploadMode per transport instead of per invocation.
Browse files Browse the repository at this point in the history
As a result, `--build_event_[text|json|binary]_file_upload_mode` are introduced to set the upload mode for build event file respectively. The default value for them is `wait_for_upload_complete`. The implicit requirement on `--bes_upload_mode=wait_for_upload_complete` is also removed.

Alternate to bazelbuild#19322.

PiperOrigin-RevId: 581903589
Change-Id: I7e35f8b43f21e00fad80f5c83c9e0d0f3f435737
  • Loading branch information
coeuvre authored and copybara-github committed Nov 13, 2023
1 parent 1f75299 commit 353000e
Show file tree
Hide file tree
Showing 18 changed files with 314 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.buildeventservice;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -80,9 +81,11 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -113,6 +116,9 @@ public abstract class BuildEventServiceModule<OptionsT extends BuildEventService
private boolean isRunsPerTestOverTheLimit;
private BuildEventArtifactUploaderFactory uploaderFactoryToCleanup;

private BuildEventOutputStreamFactory buildEventOutputStreamFactory =
file -> new BufferedOutputStream(Files.newOutputStream(Paths.get(file)));

/**
* Holds the close futures for the upload of each transport with timeouts attached to them using
* {@link #constructCloseFuturesMapWithTimeouts(ImmutableMap)} obtained from {@link
Expand All @@ -133,8 +139,6 @@ public abstract class BuildEventServiceModule<OptionsT extends BuildEventService
private ImmutableMap<BuildEventTransport, ListenableFuture<Void>>
halfCloseFuturesWithTimeoutsMap = ImmutableMap.of();

private BesUploadMode previousUploadMode = BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE;

// TODO(lpino): Use Optional instead of @Nullable for the members below.
@Nullable private OutErr outErr;
@Nullable private ImmutableSet<BuildEventTransport> bepTransports;
Expand Down Expand Up @@ -198,6 +202,17 @@ private void cancelAndResetPendingUploads() {
resetPendingUploads();
}

private void removeFromPendingUploads(
Map<BuildEventTransport, ListenableFuture<Void>> transportFutures) {
transportFutures
.values()
.forEach(closeFuture -> closeFuture.cancel(/* mayInterruptIfRunning= */ true));
closeFuturesWithTimeoutsMap =
closeFuturesWithTimeoutsMap.entrySet().stream()
.filter(entry -> !transportFutures.containsKey(entry.getKey()))
.collect(toImmutableMap(Entry::getKey, Entry::getValue));
}

private static boolean isTimeoutException(ExecutionException e) {
return e.getCause() instanceof TimeoutException;
}
Expand All @@ -219,20 +234,31 @@ private void waitForPreviousInvocation(boolean isShutdown) {
return;
}

ImmutableMap<BuildEventTransport, ListenableFuture<Void>> waitingFutureMap = null;
boolean cancelCloseFutures = true;
switch (previousUploadMode) {
case FULLY_ASYNC:
waitingFutureMap =
isShutdown ? closeFuturesWithTimeoutsMap : halfCloseFuturesWithTimeoutsMap;
cancelCloseFutures = false;
break;
case WAIT_FOR_UPLOAD_COMPLETE:
case NOWAIT_FOR_UPLOAD_COMPLETE:
waitingFutureMap = closeFuturesWithTimeoutsMap;
cancelCloseFutures = true;
break;
}
ImmutableMap<BuildEventTransport, ListenableFuture<Void>> waitingFutureMap =
closeFuturesWithTimeoutsMap.entrySet().stream()
.map(
entry -> {
var transport = entry.getKey();
var closeFuture = entry.getValue();
ListenableFuture<Void> future = closeFuture;
if (transport.getBesUploadMode() == BesUploadMode.FULLY_ASYNC) {
future =
isShutdown ? closeFuture : halfCloseFuturesWithTimeoutsMap.get(transport);
if (future == null) {
future = closeFuture;
}
}
return new SimpleEntry<>(transport, future);
})
.collect(toImmutableMap(Entry::getKey, Entry::getValue));
ImmutableMap<BuildEventTransport, ListenableFuture<Void>> cancelCloseFutures =
closeFuturesWithTimeoutsMap.entrySet().stream()
.filter(
entry -> {
var transport = entry.getKey();
return transport.getBesUploadMode() != BesUploadMode.FULLY_ASYNC;
})
.collect(toImmutableMap(Entry::getKey, Entry::getValue));

Stopwatch stopwatch = Stopwatch.createStarted();
try {
Expand Down Expand Up @@ -261,7 +287,7 @@ private void waitForPreviousInvocation(boolean isShutdown) {
waitedMillis / 1000, waitedMillis % 1000);
reporter.handle(Event.warn(msg));
logger.atWarning().withCause(exception).log("%s", msg);
cancelCloseFutures = true;
cancelCloseFutures = closeFuturesWithTimeoutsMap;
} catch (ExecutionException e) {
String msg;
// Futures.withTimeout wraps the TimeoutException in an ExecutionException when the future
Expand All @@ -281,13 +307,12 @@ private void waitForPreviousInvocation(boolean isShutdown) {
}
reporter.handle(Event.warn(msg));
logger.atWarning().withCause(e).log("%s", msg);
cancelCloseFutures = true;
cancelCloseFutures = closeFuturesWithTimeoutsMap;
} finally {
if (cancelCloseFutures) {
cancelAndResetPendingUploads();
} else {
resetPendingUploads();
}
cancelCloseFutures
.values()
.forEach(closeFuture -> closeFuture.cancel(/* mayInterruptIfRunning= */ true));
resetPendingUploads();
}
}

Expand Down Expand Up @@ -481,8 +506,7 @@ public void blazeShutdown() {
}

private void waitForBuildEventTransportsToClose(
Map<BuildEventTransport, ListenableFuture<Void>> transportFutures,
boolean besUploadModeIsSynchronous)
Map<BuildEventTransport, ListenableFuture<Void>> transportFutures)
throws AbruptExitException {
final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -519,9 +543,7 @@ private void waitForBuildEventTransportsToClose(
e.getCause().getMessage()),
e);
} finally {
if (besUploadModeIsSynchronous) {
cancelAndResetPendingUploads();
}
removeFromPendingUploads(transportFutures);
executor.shutdown();
}
}
Expand Down Expand Up @@ -579,17 +601,16 @@ public boolean cancel(boolean mayInterruptIfRunning) {
}

private void closeBepTransports() throws AbruptExitException {
previousUploadMode = besOptions.besUploadMode;
closeFuturesWithTimeoutsMap =
constructCloseFuturesMapWithTimeouts(streamer.getCloseFuturesMap());
halfCloseFuturesWithTimeoutsMap =
constructCloseFuturesMapWithTimeouts(streamer.getHalfClosedMap());
boolean besUploadModeIsSynchronous =
besOptions.besUploadMode == BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE;
Map<BuildEventTransport, ListenableFuture<Void>> blockingTransportFutures = new HashMap<>();
for (Map.Entry<BuildEventTransport, ListenableFuture<Void>> entry :
closeFuturesWithTimeoutsMap.entrySet()) {
BuildEventTransport bepTransport = entry.getKey();
boolean besUploadModeIsSynchronous =
bepTransport.getBesUploadMode() == BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE;
if (!bepTransport.mayBeSlow() || besUploadModeIsSynchronous) {
blockingTransportFutures.put(bepTransport, entry.getValue());
} else {
Expand All @@ -599,7 +620,7 @@ private void closeBepTransports() throws AbruptExitException {
}
}
if (!blockingTransportFutures.isEmpty()) {
waitForBuildEventTransportsToClose(blockingTransportFutures, besUploadModeIsSynchronous);
waitForBuildEventTransportsToClose(blockingTransportFutures);
}
}

Expand Down Expand Up @@ -764,16 +785,18 @@ private ImmutableSet<BuildEventTransport> createBepTransports(
if (!Strings.isNullOrEmpty(besStreamOptions.buildEventTextFile)) {
try {
BufferedOutputStream bepTextOutputStream =
new BufferedOutputStream(
Files.newOutputStream(Paths.get(besStreamOptions.buildEventTextFile)));

buildEventOutputStreamFactory.create(besStreamOptions.buildEventTextFile);
BuildEventArtifactUploader localFileUploader =
besStreamOptions.buildEventTextFilePathConversion
? uploaderSupplier.get()
: new LocalFilesArtifactUploader();
bepTransportsBuilder.add(
new TextFormatFileTransport(
bepTextOutputStream, bepOptions, localFileUploader, artifactGroupNamer));
bepTextOutputStream,
bepOptions,
localFileUploader,
artifactGroupNamer,
besStreamOptions.buildEventTextFileUploadMode));
} catch (IOException exception) {
// TODO(b/125216340): Consider making this a warning instead of an error once the
// associated bug has been resolved.
Expand All @@ -791,16 +814,18 @@ private ImmutableSet<BuildEventTransport> createBepTransports(
if (!Strings.isNullOrEmpty(besStreamOptions.buildEventBinaryFile)) {
try {
BufferedOutputStream bepBinaryOutputStream =
new BufferedOutputStream(
Files.newOutputStream(Paths.get(besStreamOptions.buildEventBinaryFile)));

buildEventOutputStreamFactory.create(besStreamOptions.buildEventBinaryFile);
BuildEventArtifactUploader localFileUploader =
besStreamOptions.buildEventBinaryFilePathConversion
? uploaderSupplier.get()
: new LocalFilesArtifactUploader();
bepTransportsBuilder.add(
new BinaryFormatFileTransport(
bepBinaryOutputStream, bepOptions, localFileUploader, artifactGroupNamer));
bepBinaryOutputStream,
bepOptions,
localFileUploader,
artifactGroupNamer,
besStreamOptions.buildEventBinaryFileUploadMode));
} catch (IOException exception) {
// TODO(b/125216340): Consider making this a warning instead of an error once the
// associated bug has been resolved.
Expand All @@ -818,8 +843,7 @@ private ImmutableSet<BuildEventTransport> createBepTransports(
if (!Strings.isNullOrEmpty(besStreamOptions.buildEventJsonFile)) {
try {
BufferedOutputStream bepJsonOutputStream =
new BufferedOutputStream(
Files.newOutputStream(Paths.get(besStreamOptions.buildEventJsonFile)));
buildEventOutputStreamFactory.create(besStreamOptions.buildEventJsonFile);
BuildEventArtifactUploader localFileUploader =
besStreamOptions.buildEventJsonFilePathConversion
? uploaderSupplier.get()
Expand All @@ -830,7 +854,8 @@ private ImmutableSet<BuildEventTransport> createBepTransports(
bepOptions,
localFileUploader,
artifactGroupNamer,
makeJsonTypeRegistry()));
makeJsonTypeRegistry(),
besStreamOptions.buildEventJsonFileUploadMode));
} catch (IOException exception) {
// TODO(b/125216340): Consider making this a warning instead of an error once the
// associated bug has been resolved.
Expand Down Expand Up @@ -877,6 +902,11 @@ protected abstract BuildEventServiceClient getBesClient(

protected abstract Set<String> allowedCommands(OptionsT besOptions);

@VisibleForTesting
void setBuildEventOutputStreamFactory(BuildEventOutputStreamFactory factory) {
this.buildEventOutputStreamFactory = factory;
}

protected ImmutableSet<String> getBesKeywords(
OptionsT besOptions, @Nullable OptionsParsingResult startupOptionsProvider) {
List<String> userKeywords = besOptions.besKeywords;
Expand Down Expand Up @@ -933,4 +963,9 @@ BuildEventArtifactUploader get() throws IOException {
throw exception;
}
}

@VisibleForTesting
interface BuildEventOutputStreamFactory {
BufferedOutputStream create(String file) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode;
import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
import com.google.devtools.build.lib.buildeventstream.BuildEvent;
Expand All @@ -37,6 +38,7 @@
public class BuildEventServiceTransport implements BuildEventTransport {
private final BuildEventServiceUploader besUploader;
private final Duration besTimeout;
private final BesUploadMode besUploadMode;

private BuildEventServiceTransport(
BuildEventServiceClient besClient,
Expand All @@ -49,7 +51,8 @@ private BuildEventServiceTransport(
EventBus eventBus,
Duration closeTimeout,
Sleeper sleeper,
Timestamp commandStartTime) {
Timestamp commandStartTime,
BesUploadMode besUploadMode) {
this.besTimeout = closeTimeout;
this.besUploader =
new BuildEventServiceUploader.Builder()
Expand All @@ -64,6 +67,7 @@ private BuildEventServiceTransport(
.eventBus(eventBus)
.commandStartTime(commandStartTime)
.build();
this.besUploadMode = besUploadMode;
}

@Override
Expand Down Expand Up @@ -91,6 +95,11 @@ public boolean mayBeSlow() {
return true;
}

@Override
public BesUploadMode getBesUploadMode() {
return besUploadMode;
}

@Override
public void sendBuildEvent(BuildEvent event) {
besUploader.enqueueEvent(event);
Expand Down Expand Up @@ -188,7 +197,8 @@ public BuildEventServiceTransport build() {
checkNotNull(eventBus),
(besOptions.besTimeout != null) ? besOptions.besTimeout : Duration.ZERO,
sleeper != null ? sleeper : new JavaSleeper(),
checkNotNull(commandStartTime));
checkNotNull(commandStartTime),
besOptions.besUploadMode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ java_library(
deps = [
"//src/main/java/com/google/devtools/build/lib/actions:artifacts",
"//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
"//src/main/java/com/google/devtools/build/lib/buildeventservice:buildeventservice-options",
"//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto",
"//src/main/java/com/google/devtools/build/lib/cmdline",
"//src/main/java/com/google/devtools/build/lib/collect/nestedset",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
// limitations under the License.
package com.google.devtools.build.lib.buildeventstream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode;
import java.time.Duration;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -24,14 +24,12 @@
*
* <p>All implementations need to be thread-safe. All methods are expected to return quickly.
*
* <p>Notice that this interface does not provide any error handling API. A transport may choose
* to log interesting errors to the command line and/or abort the whole build.
* <p>Notice that this interface does not provide any error handling API. A transport may choose to
* log interesting errors to the command line and/or abort the whole build.
*/
@ThreadSafe
public interface BuildEventTransport {
/**
* The name of this transport as can be displayed to a user.
*/
/** The name of this transport as can be displayed to a user. */
String name();

/**
Expand Down Expand Up @@ -87,7 +85,9 @@ default Duration getTimeout() {
*/
boolean mayBeSlow();

@VisibleForTesting
/** Returns the desired {@link BesUploadMode} for the transport. */
BesUploadMode getBesUploadMode();

@Nullable
BuildEventArtifactUploader getUploader();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ java_library(
name = "transports",
srcs = glob(["*.java"]),
deps = [
"//src/main/java/com/google/devtools/build/lib/buildeventservice:buildeventservice-options",
"//src/main/java/com/google/devtools/build/lib/buildeventstream",
"//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto",
"//src/main/java/com/google/devtools/build/lib/util:abrupt_exit_exception",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.devtools.build.lib.buildeventstream.transports;

import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode;
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
import com.google.devtools.build.lib.buildeventstream.BuildEvent;
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
Expand All @@ -34,8 +35,9 @@ public BinaryFormatFileTransport(
BufferedOutputStream outputStream,
BuildEventProtocolOptions options,
BuildEventArtifactUploader uploader,
ArtifactGroupNamer namer) {
super(outputStream, options, uploader, namer);
ArtifactGroupNamer namer,
BesUploadMode besUploadMode) {
super(outputStream, options, uploader, namer, besUploadMode);
}

@Override
Expand Down
Loading

0 comments on commit 353000e

Please sign in to comment.