Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parameterize GoogleCloudStorage provider in GcsUtil to unblock gcs-co… #33368

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
package org.apache.beam.sdk.extensions.gcp.options;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.services.storage.Storage;
import com.google.auth.Credentials;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorage;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions;
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -54,6 +59,15 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline

void setGoogleCloudStorageReadOptions(GoogleCloudStorageReadOptions value);

@JsonIgnore
@Description(
"The GoogleCloudStorageProvider instance that should be used to instantiate a GoogleCloudStorage client.")
@Default.InstanceFactory(GcsUtil.GoogleCloudStorageProviderFactory.class)
@Hidden
GoogleCloudStorageProvider getGoogleCloudStorageProvider();

void setGoogleCloudStorageProvider(GoogleCloudStorageProvider value);

/**
* The ExecutorService instance to use to create threads, can be overridden to specify an
* ExecutorService that is compatible with the user's environment. If unset, the default is to use
Expand Down Expand Up @@ -208,4 +222,13 @@ public PathValidator create(PipelineOptions options) {
.build();
}
}

@FunctionalInterface
public interface GoogleCloudStorageProvider {
GoogleCloudStorage get(
GoogleCloudStorageOptions options,
Storage storage,
Credentials credentials,
HttpRequestInitializer httpRequestInitializer);
Comment on lines +228 to +232
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 4 params should cover both the 2.x constructor and the 3.x Builder

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ public GcsUtil create(PipelineOptions options) {
gcsOptions.getEnableBucketWriteMetricCounter()
? gcsOptions.getGcsWriteCounterPrefix()
: null),
gcsOptions.getGoogleCloudStorageReadOptions());
gcsOptions.getGoogleCloudStorageReadOptions(),
gcsOptions.getGoogleCloudStorageProvider());
}

/** Returns an instance of {@link GcsUtil} based on the given parameters. */
Expand All @@ -174,7 +175,8 @@ public static GcsUtil create(
Credentials credentials,
@Nullable Integer uploadBufferSizeBytes,
GcsCountersOptions gcsCountersOptions,
GoogleCloudStorageReadOptions gcsReadOptions) {
GoogleCloudStorageReadOptions gcsReadOptions,
GcsOptions.GoogleCloudStorageProvider googleCloudStorageProvider) {
return new GcsUtil(
storageClient,
httpRequestInitializer,
Expand All @@ -184,7 +186,17 @@ public static GcsUtil create(
uploadBufferSizeBytes,
null,
gcsCountersOptions,
gcsReadOptions);
gcsReadOptions,
googleCloudStorageProvider);
}
}

public static class GoogleCloudStorageProviderFactory
implements DefaultValueFactory<GcsOptions.GoogleCloudStorageProvider> {
@Override
public GcsOptions.GoogleCloudStorageProvider create(PipelineOptions options) {
return (storageOptions, storage, credentials, httpRequestInitializer) ->
new GoogleCloudStorageImpl(storageOptions, storage, credentials);
}
}

Expand Down Expand Up @@ -228,6 +240,7 @@ public static GcsUtil create(
private final Credentials credentials;

private GoogleCloudStorage googleCloudStorage;
private GcsOptions.GoogleCloudStorageProvider googleCloudStorageProvider;
private GoogleCloudStorageOptions googleCloudStorageOptions;

private final int rewriteDataOpBatchLimit;
Expand Down Expand Up @@ -261,22 +274,25 @@ public static boolean isWildcard(GcsPath spec) {
@Nullable Integer uploadBufferSizeBytes,
@Nullable Integer rewriteDataOpBatchLimit,
GcsCountersOptions gcsCountersOptions,
GoogleCloudStorageReadOptions gcsReadOptions) {
GoogleCloudStorageReadOptions gcsReadOptions,
GcsOptions.GoogleCloudStorageProvider googleCloudStorageProvider) {
this.storageClient = storageClient;
this.httpRequestInitializer = httpRequestInitializer;
this.uploadBufferSizeBytes = uploadBufferSizeBytes;
this.executorService = executorService;
this.credentials = credentials;
this.maxBytesRewrittenPerCall = null;
this.numRewriteTokensUsed = null;
this.googleCloudStorageProvider = googleCloudStorageProvider;
googleCloudStorageOptions =
GoogleCloudStorageOptions.builder()
.setAppName("Beam")
.setReadChannelOptions(gcsReadOptions)
.setGrpcEnabled(shouldUseGrpc)
.build();
googleCloudStorage =
createGoogleCloudStorage(googleCloudStorageOptions, storageClient, credentials);
googleCloudStorageProvider.get(
googleCloudStorageOptions, storageClient, credentials, httpRequestInitializer);
this.batchRequestSupplier =
() -> {
// Capture reference to this so that the most recent storageClient and initializer
Expand Down Expand Up @@ -509,6 +525,11 @@ void setCloudStorageImpl(GoogleCloudStorageOptions g) {
googleCloudStorageOptions = g;
}

@VisibleForTesting
void setCloudStorageProviderImpl(GcsOptions.GoogleCloudStorageProvider p) {
googleCloudStorageProvider = p;
}

/**
* Create an integer consumer that updates the counter identified by a prefix and a bucket name.
*/
Expand Down Expand Up @@ -695,8 +716,11 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO
GoogleCloudStorageOptions newGoogleCloudStorageOptions =
googleCloudStorageOptions.toBuilder().setWriteChannelOptions(wcOptions).build();
GoogleCloudStorage gcpStorage =
createGoogleCloudStorage(
newGoogleCloudStorageOptions, this.storageClient, this.credentials);
googleCloudStorageProvider.get(
newGoogleCloudStorageOptions,
this.storageClient,
this.credentials,
httpRequestInitializer);
StorageResourceId resourceId =
new StorageResourceId(
path.getBucket(),
Expand Down Expand Up @@ -737,11 +761,6 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO
}
}

GoogleCloudStorage createGoogleCloudStorage(
GoogleCloudStorageOptions options, Storage storage, Credentials credentials) {
return new GoogleCloudStorageImpl(options, storage, credentials);
}

/**
* Checks whether the GCS bucket exists. Similar to {@link #bucketAccessible(GcsPath)}, but throws
* exception if the bucket is inaccessible due to permissions or does not exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public void testGcpCoreApiSurface() throws Exception {
classesInPackage("com.google.api.client.http"),
classesInPackage("com.google.api.client.json"),
classesInPackage("com.google.api.client.util"),
classesInPackage("com.google.cloud.hadoop.util"),
classesInPackage("com.google.api.services.storage"),
classesInPackage("com.google.auth"),
classesInPackage("com.fasterxml.jackson.annotation"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,8 @@ public static GcsUtilMock createMockWithMockStorage(PipelineOptions options, byt
gcsUtilMock.googleCloudStorage = googleCloudStorageMock;
// set the mock in the super object as well
gcsUtilMock.setCloudStorageImpl(gcsUtilMock.googleCloudStorage);
gcsUtilMock.setCloudStorageProviderImpl(
(storageOpts, storage, credentials, httpRequestInitializer) -> googleCloudStorageMock);

if (readPayload == null) {
Mockito.when(googleCloudStorageMock.create(Mockito.any(), Mockito.any()))
Expand Down Expand Up @@ -1657,7 +1659,8 @@ public static GcsUtilMock createMock(PipelineOptions options) {
gcsOptions.getEnableBucketWriteMetricCounter()
? gcsOptions.getGcsWriteCounterPrefix()
: null),
gcsOptions.getGoogleCloudStorageReadOptions());
gcsOptions.getGoogleCloudStorageReadOptions(),
gcsOptions.getGoogleCloudStorageProvider());
}

private GcsUtilMock(
Expand All @@ -1669,7 +1672,8 @@ private GcsUtilMock(
@Nullable Integer uploadBufferSizeBytes,
@Nullable Integer rewriteDataOpBatchLimit,
GcsCountersOptions gcsCountersOptions,
GoogleCloudStorageReadOptions gcsReadOptions) {
GoogleCloudStorageReadOptions gcsReadOptions,
GcsOptions.GoogleCloudStorageProvider googleCloudStorageProvider) {
super(
storageClient,
httpRequestInitializer,
Expand All @@ -1679,13 +1683,8 @@ private GcsUtilMock(
uploadBufferSizeBytes,
rewriteDataOpBatchLimit,
gcsCountersOptions,
gcsReadOptions);
}

@Override
GoogleCloudStorage createGoogleCloudStorage(
GoogleCloudStorageOptions options, Storage storage, Credentials credentials) {
return googleCloudStorage;
gcsReadOptions,
googleCloudStorageProvider);
}
}

Expand All @@ -1698,7 +1697,9 @@ public void testCreate() throws IOException {
GoogleCloudStorage mockStorage = Mockito.mock(GoogleCloudStorage.class);
WritableByteChannel mockChannel = Mockito.mock(WritableByteChannel.class);

gcsUtil.googleCloudStorage = mockStorage;
gcsUtil.setCloudStorageImpl(mockStorage);
gcsUtil.setCloudStorageProviderImpl(
(options, storage, credentials, httpRequestInitializer) -> mockStorage);

when(mockStorage.create(any(), any())).thenReturn(mockChannel);

Expand All @@ -1716,7 +1717,9 @@ public void testCreateWithException() throws IOException {

GoogleCloudStorage mockStorage = Mockito.mock(GoogleCloudStorage.class);

gcsUtil.googleCloudStorage = mockStorage;
gcsUtil.setCloudStorageImpl(mockStorage);
gcsUtil.setCloudStorageProviderImpl(
(options, storage, credentials, httpRequestInitializer) -> mockStorage);

when(mockStorage.create(any(), any())).thenThrow(new RuntimeException("testException"));

Expand Down
Loading