diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java index 8d51ab3b86..678f138ef1 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java @@ -18,6 +18,7 @@ import static com.google.cloud.RetryHelper.runWithRetries; +import com.google.api.client.util.Preconditions; import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.services.storage.model.StorageObject; import com.google.cloud.ReadChannel; @@ -52,6 +53,7 @@ class BlobReadChannel implements ReadChannel { private final StorageObject storageObject; private int bufferPos; private byte[] buffer; + private long limit; BlobReadChannel( StorageOptions serviceOptions, BlobId blob, Map requestOptions) { @@ -62,6 +64,7 @@ class BlobReadChannel implements ReadChannel { isOpen = true; storageRpc = serviceOptions.getStorageRpcV1(); storageObject = blob.toPb(); + this.limit = Long.MAX_VALUE; } @Override @@ -71,7 +74,8 @@ public RestorableState capture() { .setPosition(position) .setIsOpen(isOpen) .setEndOfStream(endOfStream) - .setChunkSize(chunkSize); + .setChunkSize(chunkSize) + .setLimit(limit); if (buffer != null) { builder.setPosition(position + bufferPos); builder.setEndOfStream(false); @@ -119,7 +123,8 @@ public int read(ByteBuffer byteBuffer) throws IOException { if (endOfStream) { return -1; } - final int toRead = Math.max(byteBuffer.remaining(), chunkSize); + final int toRead = + Math.toIntExact(Math.min(limit - position, Math.max(byteBuffer.remaining(), chunkSize))); try { ResultRetryAlgorithm algorithm = retryAlgorithmManager.getForObjectsGet(storageObject, requestOptions); @@ -158,6 +163,18 @@ public int read(ByteBuffer byteBuffer) throws IOException { return toWrite; } + @Override + public ReadChannel limit(long limit) { + Preconditions.checkArgument(limit >= 0, "Limit must be >= 0"); + this.limit = limit; + return this; + } + + @Override + public long limit() { + return limit; + } + static class StateImpl implements RestorableState, Serializable { private static final long serialVersionUID = 3889420316004453706L; @@ -170,6 +187,7 @@ static class StateImpl implements RestorableState, Serializable { private final boolean isOpen; private final boolean endOfStream; private final int chunkSize; + private final long limit; StateImpl(Builder builder) { this.serviceOptions = builder.serviceOptions; @@ -180,6 +198,7 @@ static class StateImpl implements RestorableState, Serializable { this.isOpen = builder.isOpen; this.endOfStream = builder.endOfStream; this.chunkSize = builder.chunkSize; + this.limit = builder.limit; } static class Builder { @@ -191,6 +210,7 @@ static class Builder { private boolean isOpen; private boolean endOfStream; private int chunkSize; + private long limit; private Builder(StorageOptions options, BlobId blob, Map reqOptions) { this.serviceOptions = options; @@ -223,6 +243,11 @@ Builder setChunkSize(int chunkSize) { return this; } + Builder setLimit(long limit) { + this.limit = limit; + return this; + } + RestorableState build() { return new StateImpl(this); } @@ -241,13 +266,22 @@ public ReadChannel restore() { channel.isOpen = isOpen; channel.endOfStream = endOfStream; channel.chunkSize = chunkSize; + channel.limit = limit; return channel; } @Override public int hashCode() { return Objects.hash( - serviceOptions, blob, requestOptions, lastEtag, position, isOpen, endOfStream, chunkSize); + serviceOptions, + blob, + requestOptions, + lastEtag, + position, + isOpen, + endOfStream, + chunkSize, + limit); } @Override @@ -266,7 +300,8 @@ public boolean equals(Object obj) { && this.position == other.position && this.isOpen == other.isOpen && this.endOfStream == other.endOfStream - && this.chunkSize == other.chunkSize; + && this.chunkSize == other.chunkSize + && this.limit == other.limit; } @Override @@ -276,6 +311,7 @@ public String toString() { .add("position", position) .add("isOpen", isOpen) .add("endOfStream", endOfStream) + .add("limit", limit) .toString(); } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobReadChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobReadChannelTest.java index dafb0b2a5b..6eb2b8aa16 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobReadChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobReadChannelTest.java @@ -223,13 +223,19 @@ public void testSaveAndRestore() throws IOException { public void testStateEquals() { replay(storageRpcMock); reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS); + int limit = 342; + reader.limit(limit); @SuppressWarnings("resource") // avoid closing when you don't want partial writes to GCS ReadChannel secondReader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS); + secondReader.limit(limit); RestorableState state = reader.capture(); RestorableState secondState = secondReader.capture(); assertEquals(state, secondState); assertEquals(state.hashCode(), secondState.hashCode()); assertEquals(state.toString(), secondState.toString()); + + ReadChannel restore = secondState.restore(); + assertEquals(limit, restore.limit()); } private static byte[] randomByteArray(int size) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelTest.java new file mode 100644 index 0000000000..3f1470a02c --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelTest.java @@ -0,0 +1,107 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.it; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.NoCredentials; +import com.google.cloud.ReadChannel; +import com.google.cloud.WriteChannel; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Bucket; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.DataGeneration; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import com.google.cloud.storage.conformance.retry.TestBench; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public final class ITBlobReadChannelTest { + + private static final int _16MiB = 16 * 1024 * 1024; + private static final int _256KiB = 256 * 1024; + + @ClassRule + public static final TestBench testBench = + TestBench.newBuilder().setContainerName("blob-read-channel-test").build(); + + @Rule public final TestName testName = new TestName(); + + @Rule public final DataGeneration dataGeneration = new DataGeneration(new Random(872364872)); + + @Test + public void testLimit_smallerThanOneChunk() throws IOException { + int srcContentSize = _256KiB; + int rangeBegin = 57; + int rangeEnd = 2384; + int chunkSize = _16MiB; + doLimitTest(srcContentSize, rangeBegin, rangeEnd, chunkSize); + } + + @Test + public void testLimit_largerThanOneChunk() throws IOException { + int srcContentSize = _16MiB + (_256KiB * 3); + int rangeBegin = 384; + int rangeEnd = rangeBegin + _16MiB; + int chunkSize = _16MiB; + + doLimitTest(srcContentSize, rangeBegin, rangeEnd, chunkSize); + } + + private void doLimitTest(int srcContentSize, int rangeBegin, int rangeEnd, int chunkSize) + throws IOException { + Storage s = + StorageOptions.newBuilder() + .setProjectId("blob-read-channel-test") + .setHost(testBench.getBaseUri()) + .setCredentials(NoCredentials.getInstance()) + .build() + .getService(); + + String testNameMethodName = testName.getMethodName(); + String bucketName = String.format("bucket-%s", testNameMethodName.toLowerCase()); + String blobName = String.format("%s/src", testNameMethodName); + + Bucket bucket = s.create(BucketInfo.of(bucketName)); + BlobInfo src = BlobInfo.newBuilder(bucket, blobName).build(); + ByteBuffer content = dataGeneration.randByteBuffer(srcContentSize); + ByteBuffer expectedSubContent = content.duplicate(); + expectedSubContent.position(rangeBegin); + expectedSubContent.limit(rangeEnd); + try (WriteChannel writer = s.writer(src)) { + writer.write(content); + } + + ByteBuffer actual = ByteBuffer.allocate(rangeEnd - rangeBegin); + + try (ReadChannel reader = s.reader(src.getBlobId())) { + reader.setChunkSize(chunkSize); + reader.seek(rangeBegin); + reader.limit(rangeEnd); + reader.read(actual); + actual.flip(); + } + + assertThat(actual).isEqualTo(expectedSubContent); + } +}