Skip to content

Commit

Permalink
fix: update gRPC writeAndClose to only set finish_write on the last m…
Browse files Browse the repository at this point in the history
…essage (#2163)

As of 2.26.0 it would set finish_write on every message emitted by writeAndClose

Update ITGapicUnbufferedWritableByteChannelTest to also include checksum values in its requests/responses.
  • Loading branch information
BenWhitehead authored Aug 11, 2023
1 parent e9746f8 commit 95df758
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.List;
import org.checkerframework.checker.nullness.qual.NonNull;

final class GapicUnbufferedWritableByteChannel<
RequestFactoryT extends WriteObjectRequestBuilderFactory>
Expand Down Expand Up @@ -82,16 +83,7 @@ public boolean isOpen() {
@Override
public void close() throws IOException {
if (!finished) {
long offset = writeCtx.getTotalSentBytes().get();
Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get();

WriteObjectRequest.Builder b =
writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset);
if (crc32cValue != null) {
b.setObjectChecksums(
ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build());
}
WriteObjectRequest message = b.build();
WriteObjectRequest message = finishMessage();
try {
flusher.close(message);
finished = true;
Expand Down Expand Up @@ -139,7 +131,7 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
.newRequestBuilder()
.setWriteOffset(offset)
.setChecksummedData(checksummedData.build());
if (!datum.isOnlyFullBlocks() || finalize) {
if (!datum.isOnlyFullBlocks()) {
builder.setFinishWrite(true);
if (cumulative != null) {
builder.setObjectChecksums(
Expand All @@ -152,6 +144,10 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
messages.add(build);
bytesConsumed += contentSize;
}
if (finalize && !finished) {
messages.add(finishMessage());
finished = true;
}

try {
flusher.flush(messages);
Expand All @@ -162,4 +158,18 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo

return bytesConsumed;
}

@NonNull
private WriteObjectRequest finishMessage() {
long offset = writeCtx.getTotalSentBytes().get();
Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get();

WriteObjectRequest.Builder b =
writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset);
if (crc32cValue != null) {
b.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build());
}
WriteObjectRequest message = b.build();
return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ private static WriteObjectRequest possiblyPairDownRequest(
}

if (message.getWriteOffset() > 0) {
b.clearWriteObjectSpec().clearObjectChecksums();
b.clearWriteObjectSpec();
}

if (message.getWriteOffset() > 0 && !message.getFinishWrite()) {
b.clearObjectChecksums();
}
return b.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.ByteString;
import com.google.storage.v2.Object;
import com.google.storage.v2.ObjectChecksums;
import com.google.storage.v2.StartResumableWriteRequest;
import com.google.storage.v2.StartResumableWriteResponse;
import com.google.storage.v2.StorageClient;
Expand Down Expand Up @@ -63,8 +64,9 @@ public final class ITGapicUnbufferedWritableByteChannelTest {
private static final Logger LOGGER =
Logger.getLogger(ITGapicUnbufferedWritableByteChannelTest.class.getName());

private static final Hasher HASHER = Hasher.enabled();
private static final ChunkSegmenter segmenter =
new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), 10, 5);
new ChunkSegmenter(HASHER, ByteStringStrategy.copy(), 10, 5);

private static final String uploadId = "upload-id";

Expand All @@ -80,31 +82,35 @@ public final class ITGapicUnbufferedWritableByteChannelTest {
private static final WriteObjectRequest req1 =
WriteObjectRequest.newBuilder()
.setUploadId(uploadId)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 0, 10)))
.setChecksummedData(getChecksummedData(ByteString.copyFrom(bytes, 0, 10), HASHER))
.build();
private static final WriteObjectRequest req2 =
WriteObjectRequest.newBuilder()
.setUploadId(uploadId)
.setWriteOffset(10)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 10, 10)))
.setChecksummedData(getChecksummedData(ByteString.copyFrom(bytes, 10, 10), HASHER))
.build();
private static final WriteObjectRequest req3 =
WriteObjectRequest.newBuilder()
.setUploadId(uploadId)
.setWriteOffset(20)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 20, 10)))
.setChecksummedData(getChecksummedData(ByteString.copyFrom(bytes, 20, 10), HASHER))
.build();
private static final WriteObjectRequest req4 =
WriteObjectRequest.newBuilder()
.setUploadId(uploadId)
.setWriteOffset(30)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 30, 10)))
.setChecksummedData(getChecksummedData(ByteString.copyFrom(bytes, 30, 10), HASHER))
.build();
private static final WriteObjectRequest req5 =
WriteObjectRequest.newBuilder()
.setUploadId(uploadId)
.setWriteOffset(40)
.setFinishWrite(true)
.setObjectChecksums(
ObjectChecksums.newBuilder()
.setCrc32C(HASHER.hash(ByteBuffer.wrap(bytes)).getValue())
.build())
.build();

private static final WriteObjectResponse resp1 =
Expand All @@ -123,35 +129,24 @@ public final class ITGapicUnbufferedWritableByteChannelTest {

@Test
public void directUpload() throws IOException, InterruptedException, ExecutionException {
Object obj = Object.newBuilder().setBucket("buck").setName("obj").build();
WriteObjectSpec spec = WriteObjectSpec.newBuilder().setResource(obj).build();

byte[] bytes = DataGenerator.base64Characters().genBytes(40);
WriteObjectRequest req1 =
WriteObjectRequest.newBuilder()
ITGapicUnbufferedWritableByteChannelTest.req1
.toBuilder()
.clearUploadId()
.setWriteObjectSpec(spec)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 0, 10)))
.build();
WriteObjectRequest req2 =
WriteObjectRequest.newBuilder()
.setWriteOffset(10)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 10, 10)))
.build();
ITGapicUnbufferedWritableByteChannelTest.req2.toBuilder().clearUploadId().build();
WriteObjectRequest req3 =
WriteObjectRequest.newBuilder()
.setWriteOffset(20)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 20, 10)))
.build();
ITGapicUnbufferedWritableByteChannelTest.req3.toBuilder().clearUploadId().build();
WriteObjectRequest req4 =
WriteObjectRequest.newBuilder()
.setWriteOffset(30)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 30, 10)))
.build();
ITGapicUnbufferedWritableByteChannelTest.req4.toBuilder().clearUploadId().build();
WriteObjectRequest req5 =
WriteObjectRequest.newBuilder().setWriteOffset(40).setFinishWrite(true).build();
ITGapicUnbufferedWritableByteChannelTest.req5.toBuilder().clearUploadId().build();

WriteObjectResponse resp =
WriteObjectResponse.newBuilder().setResource(obj.toBuilder().setSize(40)).build();
WriteObjectResponse resp = resp5;

WriteObjectRequest base = WriteObjectRequest.newBuilder().setWriteObjectSpec(spec).build();
WriteObjectRequestBuilderFactory reqFactory = WriteObjectRequestBuilderFactory.simple(base);
Expand Down Expand Up @@ -314,10 +309,7 @@ public boolean shouldRetry(Throwable t, Object ignore) {

@Test
public void resumableUpload_finalizeWhenWriteAndCloseCalledEvenWhenQuantumAligned()
throws IOException, InterruptedException, ExecutionException {
int quantum = 10;
ChunkSegmenter segmenter =
new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), 50, quantum);
throws IOException {
SettableApiFuture<WriteObjectResponse> result = SettableApiFuture.create();

AtomicReference<List<WriteObjectRequest>> actualFlush = new AtomicReference<>();
Expand All @@ -342,18 +334,10 @@ public void close(@Nullable WriteObjectRequest req) {
}
});

byte[] bytes = DataGenerator.base64Characters().genBytes(quantum);

long written = c.writeAndClose(ByteBuffer.wrap(bytes));
WriteObjectRequest expectedRequest =
WriteObjectRequest.newBuilder()
.setUploadId(uploadId)
.setChecksummedData(getChecksummedData(ByteString.copyFrom(bytes), Hasher.noop()))
.setFinishWrite(true)
.build();

assertThat(written).isEqualTo(10);
assertThat(actualFlush.get()).isEqualTo(ImmutableList.of(expectedRequest));
assertThat(written).isEqualTo(40);
assertThat(actualFlush.get()).isEqualTo(ImmutableList.of(req1, req2, req3, req4, req5));
// calling close is okay, as long as the provided request is null
assertThat(actualClose.get()).isAnyOf(closeRequestSentinel, null);
}
Expand Down

0 comments on commit 95df758

Please sign in to comment.