Skip to content

Commit

Permalink
Only check for expected commited size if we completed the upload
Browse files Browse the repository at this point in the history
  • Loading branch information
AlessandroPatti committed Oct 15, 2021
1 parent c900c54 commit e8369f5
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -432,13 +432,19 @@ ListenableFuture<Void> start() {
return Futures.transformAsync(
callFuture,
(result) -> {
long committedSize = committedOffset.get();
long expected = chunker.getFinalSize();
if (committedSize != expected) {
String message =
format(
"write incomplete: committed_size %d for %d total", committedSize, expected);
return Futures.immediateFailedFuture(new IOException(message));
if (!chunker.hasNext()) {
// Only check for matching committed size if we have completed the upload.
// If another client did, they might have used a different compression
// level/algorithm, so we cannot know the expected committed offset
long committedSize = committedOffset.get();
long expected = chunker.getOffset();
if (!chunker.hasNext() && committedSize != expected) {
String message =
format(
"write incomplete: committed_size %d for %d total",
committedSize, expected);
return Futures.immediateFailedFuture(new IOException(message));
}
}
return Futures.immediateFuture(null);
},
Expand Down
28 changes: 0 additions & 28 deletions src/main/java/com/google/devtools/build/lib/remote/Chunker.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,23 +130,6 @@ public long getSize() {
return size;
}

/**
* Closes the input stream and return the final size,
* that is the source size if the transfer is uncompressed,
* or the compressed size otherwise.
*/
public long getFinalSize() throws IOException {
if (compressed && getSize() > 0) {
// If the source is compressed, we cannot know the final size
// until we have exhausted the whole input.
exhaust();
close();
return getOffset();
}
close();
return getSize();
}

/**
* Reset the {@link Chunker} state to when it was newly constructed.
*
Expand Down Expand Up @@ -175,17 +158,6 @@ public void seek(long toOffset) throws IOException {
}
}

/**
* Consume the input stream and closes it
* @throws IOException
*/
private void exhaust() throws IOException {
maybeInitialize();
while(hasNext()) {
next();
}
}

/**
* Returns {@code true} if a subsequent call to {@link #next()} returns a {@link Chunk} object;
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,10 @@ public void progressiveCompressedUploadShouldWork() throws Exception {
Chunker chunker = Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());

long expectedSize = chunker.getFinalSize();
while (chunker.hasNext()) {
chunker.next();
}
long expectedSize = chunker.getOffset();
chunker.reset();

serviceRegistry.addService(
Expand Down Expand Up @@ -639,7 +642,7 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamOb
}

@Test
public void incorrectCommittedSizeFailsUpload() throws Exception {
public void incorrectCommittedSizeFailsCompletedUpload() throws Exception {
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
ByteStreamUploader uploader =
Expand All @@ -660,10 +663,23 @@ public void incorrectCommittedSizeFailsUpload() throws Exception {
new ByteStreamImplBase() {
@Override
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
streamObserver.onNext(
WriteResponse.newBuilder().setCommittedSize(blob.length + 1).build());
streamObserver.onCompleted();
return new NoopStreamObserver();
return new StreamObserver<WriteRequest>() {
@Override
public void onNext(WriteRequest writeRequest) {}

@Override
public void onError(Throwable throwable) {
fail("onError should never be called.");
}

@Override
public void onCompleted() {
WriteResponse response =
WriteResponse.newBuilder().setCommittedSize(blob.length + 1).build();
streamObserver.onNext(response);
streamObserver.onCompleted();
}
};
}
});

Expand All @@ -680,6 +696,38 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamOb
blockUntilInternalStateConsistent(uploader);
}

@Test
public void incorrectCommittedSizeDoesNotFailsIncompleteUpload() throws Exception {
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
300,
retrier);

byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);

Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());

serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
streamObserver.onNext(WriteResponse.newBuilder().setCommittedSize(CHUNK_SIZE).build());
streamObserver.onCompleted();
return new NoopStreamObserver();
}
});

uploader.uploadBlob(context, hash, chunker, true);
blockUntilInternalStateConsistent(uploader);
}

@Test
public void multipleBlobsUploadShouldWork() throws Exception {
RemoteRetrier retrier =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void testActualSizeIsCorrectAfterSeek() throws IOException {
chunker.seek(5);
chunker.next();
assertThat(chunker.hasNext()).isFalse();
assertThat(chunker.getFinalSize()).isEqualTo(expected);
assertThat(chunker.getOffset()).isEqualTo(expected);
}
}

Expand Down

0 comments on commit e8369f5

Please sign in to comment.