Skip to content

Commit

Permalink
feat: Plumb PartNamingStrategy for Parallel Composite Uploads in Tran…
Browse files Browse the repository at this point in the history
…sfer Manager (#2547)

* feat: Plumb PartNamingStrategy for Parallel Composite Uploads in Transfer Manager

* address pr comments

* fix broken builds
  • Loading branch information
sydney-munro authored May 16, 2024
1 parent a63eccb commit 79d721d
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.google.cloud.storage.transfermanager;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy;
import com.google.cloud.storage.StorageOptions;
import com.google.common.base.MoreObjects;
import java.util.Objects;
Expand All @@ -31,18 +34,22 @@ public final class TransferManagerConfig {
private final boolean allowDivideAndConquerDownload;
private final boolean allowParallelCompositeUpload;

private final PartNamingStrategy partNamingStrategy;

private final StorageOptions storageOptions;

TransferManagerConfig(
int maxWorkers,
int perWorkerBufferSize,
boolean allowDivideAndConquerDownload,
boolean allowParallelCompositeUpload,
PartNamingStrategy partNamingStrategy,
StorageOptions storageOptions) {
this.maxWorkers = maxWorkers;
this.perWorkerBufferSize = perWorkerBufferSize;
this.allowDivideAndConquerDownload = allowDivideAndConquerDownload;
this.allowParallelCompositeUpload = allowParallelCompositeUpload;
this.partNamingStrategy = partNamingStrategy;
this.storageOptions = storageOptions;
}

Expand Down Expand Up @@ -101,6 +108,15 @@ public StorageOptions getStorageOptions() {
return storageOptions;
}

/**
* Part Naming Strategy to be used during Parallel Composite Uploads
*
* @see Builder#setParallelCompositeUploadPartNamingStrategy(PartNamingStrategy)
*/
public PartNamingStrategy getParallelCompositeUploadPartNamingStrategy() {
return partNamingStrategy;
}

/** The service object for {@link TransferManager} */
public TransferManager getService() {
return new TransferManagerImpl(this, DefaultQos.of(this));
Expand Down Expand Up @@ -169,13 +185,15 @@ public static class Builder {
private boolean allowParallelCompositeUpload;

private StorageOptions storageOptions;
private PartNamingStrategy partNamingStrategy;

private Builder() {
this.perWorkerBufferSize = 16 * 1024 * 1024;
this.maxWorkers = 2 * Runtime.getRuntime().availableProcessors();
this.allowDivideAndConquerDownload = false;
this.allowParallelCompositeUpload = false;
this.storageOptions = StorageOptions.getDefaultInstance();
this.partNamingStrategy = PartNamingStrategy.noPrefix();
}

/**
Expand Down Expand Up @@ -246,6 +264,21 @@ public Builder setStorageOptions(StorageOptions storageOptions) {
return this;
}

/**
* Part Naming Strategy that Transfer Manager will use during Parallel Composite Upload
*
* <p><i>Default Value:</i> {@link PartNamingStrategy#noPrefix()}
*
* @return the instance of Builder with the value for PartNamingStrategy modified.
* @see TransferManagerConfig#getParallelCompositeUploadPartNamingStrategy()
*/
public Builder setParallelCompositeUploadPartNamingStrategy(
PartNamingStrategy partNamingStrategy) {
checkNotNull(partNamingStrategy);
this.partNamingStrategy = partNamingStrategy;
return this;
}

/**
* Creates a TransferManagerConfig object.
*
Expand All @@ -257,6 +290,7 @@ public TransferManagerConfig build() {
perWorkerBufferSize,
allowDivideAndConquerDownload,
allowParallelCompositeUpload,
partNamingStrategy,
storageOptions);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ final class TransferManagerImpl implements TransferManager {
.withBufferAllocationStrategy(
BufferAllocationStrategy.fixedPool(
transferManagerConfig.getMaxWorkers(),
transferManagerConfig.getPerWorkerBufferSize()));
transferManagerConfig.getPerWorkerBufferSize()))
.withPartNamingStrategy(
transferManagerConfig.getParallelCompositeUploadPartNamingStrategy());
storageOptions = storageOptions.toBuilder().setBlobWriteSessionConfig(pcuConfig).build();
}
this.pcuQueue = new ConcurrentLinkedDeque<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.DataGenerator;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.cloud.storage.Storage.BlobWriteOption;
Expand Down Expand Up @@ -141,6 +142,28 @@ public void uploadFiles() throws Exception {
}
}

@Test
public void uploadFilesPartNaming() throws Exception {
TransferManagerConfig config =
TransferManagerConfigTestingInstances.defaults(storage.getOptions())
.toBuilder()
.setAllowParallelCompositeUpload(true)
.setPerWorkerBufferSize(128 * 1024)
.setParallelCompositeUploadPartNamingStrategy(PartNamingStrategy.prefix("not-root"))
.build();
long size = CHUNK_THRESHOLD + 100L;
try (TransferManager transferManager = config.getService();
TmpFile tmpFile = DataGenerator.base64Characters().tempFile(baseDir, size)) {
ParallelUploadConfig parallelUploadConfig =
ParallelUploadConfig.newBuilder().setBucketName(bucket.getName()).build();
UploadJob job =
transferManager.uploadFiles(
Collections.singletonList(tmpFile.getPath()), parallelUploadConfig);
List<UploadResult> uploadResults = job.getUploadResults();
assertThat(uploadResults.get(0).getStatus()).isEqualTo(TransferStatus.SUCCESS);
}
}

@Test
public void uploadFilesWithOpts() throws Exception {
TransferManagerConfig config =
Expand Down

0 comments on commit 79d721d

Please sign in to comment.