diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 2d08f192beeb..4bbba491bf9f 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -186,11 +186,13 @@ task integrationTest(type: Test, dependsOn: processTestResources) { def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests' def firestoreDb = project.findProperty('firestoreDb') ?: 'firestoredb' + def host = project.findProperty('host') ?: 'batch-firestore.googleapis.com:443' systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ "--runner=DirectRunner", "--project=${gcpProject}", "--tempRoot=${gcpTempRoot}", "--firestoreDb=${firestoreDb}", + "--host=${host}", ]) // Disable Gradle cache: these ITs interact with live service that should always be considered "out of date" @@ -224,12 +226,14 @@ task integrationTestKms(type: Test) { def gcpTempRoot = project.findProperty('gcpTempRootKms') ?: 'gs://temp-storage-for-end-to-end-tests-cmek' def dataflowKmsKey = project.findProperty('dataflowKmsKey') ?: "projects/apache-beam-testing/locations/global/keyRings/beam-it/cryptoKeys/test" def firestoreDb = project.findProperty('firestoreDb') ?: 'firestoredb' + def host = project.findProperty('host') ?: 'batch-firestore.googleapis.com:443' systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ "--runner=DirectRunner", "--project=${gcpProject}", "--tempRoot=${gcpTempRoot}", "--dataflowKmsKey=${dataflowKmsKey}", "--firestoreDb=${firestoreDb}", + "--host=${host}", ]) // Disable Gradle cache: these ITs interact with live service that should always be considered "out of date" diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java index edaf99d03409..1be6568372d9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java @@ -57,4 +57,22 @@ public interface FirestoreOptions extends PipelineOptions { /** Set the Firestore database ID to connect to. */ void setFirestoreDb(String firestoreDb); + + /** + * A host port pair to allow connecting to a Cloud Firestore instead of the default live service. + * + * @return the string representation of a host and port pair to be used when constructing Cloud + * Firestore clients. + */ + @Description("Firestore endpoint (host and port)") + @Default.String("batch-firestore.googleapis.com:443") + String getHost(); + + /** + * Define a host port pair to allow connecting to a Cloud Firestore instead of the default live + * service. + * + * @param host the host and port to connect to + */ + void setHost(String host); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java index d8b8ba75a1d6..7c5a2a68090e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java @@ -99,7 +99,7 @@ FirestoreStub getFirestoreStub(PipelineOptions options) { GcpOptions gcpOptions = options.as(GcpOptions.class); builder .setCredentialsProvider(FixedCredentialsProvider.create(gcpOptions.getGcpCredential())) - .setEndpoint("batch-firestore.googleapis.com:443"); + .setEndpoint(firestoreOptions.getHost()); } ClientContext clientContext = ClientContext.create(builder.build()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java index b6f1c93959e1..c37ae7a92965 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java @@ -92,12 +92,10 @@ abstract class BaseFirestoreIT { .build(); protected static String project; - protected GcpOptions options; @Before public void setup() { - options = TestPipeline.testingPipelineOptions().as(GcpOptions.class); - project = options.getProject(); + project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); } private static Instant toWriteTime(WriteResult result) { @@ -166,7 +164,7 @@ public final void listCollections() throws Exception { .build()); PAssert.that(actualCollectionIds).containsInAnyOrder(allCollectionIds); - testPipeline.run(options); + testPipeline.run(TestPipeline.testingPipelineOptions()); // Reading from readTime should only get collection IDs written in the batch before readTime. PCollection actualCollectionIdsAtReadTime = @@ -181,7 +179,7 @@ public final void listCollections() throws Exception { .withRpcQosOptions(RPC_QOS_OPTIONS) .build()); PAssert.that(actualCollectionIdsAtReadTime).containsInAnyOrder(collectionIds); - testPipeline2.run(options); + testPipeline2.run(TestPipeline.testingPipelineOptions()); } @Test @@ -212,7 +210,7 @@ public final void listDocuments() throws Exception { .apply(ParDo.of(new DocumentToName())); PAssert.that(listDocumentPaths).containsInAnyOrder(allDocumentPaths); - testPipeline.run(options); + testPipeline.run(TestPipeline.testingPipelineOptions()); // Reading from readTime should only get the documents written before readTime. PCollection listDocumentPathsAtReadTime = @@ -230,7 +228,7 @@ public final void listDocuments() throws Exception { PAssert.that(listDocumentPathsAtReadTime) .containsInAnyOrder(documentGenerator.expectedDocumentPaths()); - testPipeline2.run(options); + testPipeline2.run(TestPipeline.testingPipelineOptions()); } @Test @@ -264,7 +262,7 @@ public final void runQuery() throws Exception { .apply(ParDo.of(new DocumentToName())); PAssert.that(listDocumentPaths).containsInAnyOrder(allDocumentPaths); - testPipeline.run(options); + testPipeline.run(TestPipeline.testingPipelineOptions()); // Reading from readTime should only get the documents written before readTime. PCollection listDocumentPathsAtReadTime = @@ -283,7 +281,7 @@ public final void runQuery() throws Exception { PAssert.that(listDocumentPathsAtReadTime) .containsInAnyOrder(documentGenerator.expectedDocumentPaths()); - testPipeline2.run(options); + testPipeline2.run(TestPipeline.testingPipelineOptions()); } @Test @@ -323,7 +321,7 @@ public final void partitionQuery() throws Exception { .apply(ParDo.of(new DocumentToName())); PAssert.that(listDocumentPaths).containsInAnyOrder(allDocumentPaths); - testPipeline.run(options); + testPipeline.run(TestPipeline.testingPipelineOptions()); // Reading from readTime should only get the documents written before readTime. PCollection listDocumentPathsAtReadTime = @@ -343,7 +341,7 @@ public final void partitionQuery() throws Exception { PAssert.that(listDocumentPathsAtReadTime) .containsInAnyOrder(documentGenerator.expectedDocumentPaths()); - testPipeline2.run(options); + testPipeline2.run(TestPipeline.testingPipelineOptions()); } @Test @@ -387,7 +385,7 @@ public final void batchGet() throws Exception { .apply(ParDo.of(new DocumentToName())); PAssert.that(listDocumentPaths).containsInAnyOrder(allDocumentPaths); - testPipeline.run(options); + testPipeline.run(TestPipeline.testingPipelineOptions()); // Reading from readTime should only get the documents written before readTime. PCollection listDocumentPathsAtReadTime = @@ -407,7 +405,7 @@ public final void batchGet() throws Exception { PAssert.that(listDocumentPathsAtReadTime) .containsInAnyOrder(documentGenerator.expectedDocumentPaths()); - testPipeline2.run(options); + testPipeline2.run(TestPipeline.testingPipelineOptions()); } @Test @@ -445,7 +443,7 @@ protected final void runWriteTest( .apply(createWrite) .apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(RPC_QOS_OPTIONS).build()); - testPipeline.run(options); + testPipeline.run(TestPipeline.testingPipelineOptions()); List actualDocumentIds = helper diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java index 90aba760b7f6..117489b100d7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java @@ -104,7 +104,7 @@ enum DataLayout { DataLayout value() default DataLayout.Shallow; } - private final GcpOptions options; + private final GcpOptions gcpOptions; private final org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions firestoreBeamOptions; private final FirestoreOptions firestoreOptions; @@ -125,17 +125,17 @@ enum DataLayout { "initialization.fields.uninitialized") // testClass and testName are managed via #apply public FirestoreTestingHelper(CleanupMode cleanupMode) { this.cleanupMode = cleanupMode; - options = TestPipeline.testingPipelineOptions().as(GcpOptions.class); + gcpOptions = TestPipeline.testingPipelineOptions().as(GcpOptions.class); firestoreBeamOptions = TestPipeline.testingPipelineOptions() .as(org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions.class); firestoreOptions = FirestoreOptions.newBuilder() - .setCredentials(options.getGcpCredential()) - .setProjectId(options.getProject()) + .setCredentials(gcpOptions.getGcpCredential()) + .setProjectId(gcpOptions.getProject()) .setDatabaseId(firestoreBeamOptions.getFirestoreDb()) + .setHost(firestoreBeamOptions.getHost()) .build(); - fs = firestoreOptions.getService(); rpc = (FirestoreRpc) firestoreOptions.getRpc(); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreV1IT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreV1IT.java index e259021be130..20898ecd767a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreV1IT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreV1IT.java @@ -55,6 +55,7 @@ import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.WriteFailure; import org.apache.beam.sdk.io.gcp.firestore.RpcQosOptions; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -133,7 +134,7 @@ public void batchWrite_partialFailureOutputsToDeadLetterQueue() assertFalse(iterator.hasNext()); return null; }); - testPipeline.run(this.options); + testPipeline.run(TestPipeline.testingPipelineOptions()); ApiFuture actualDocsQuery = helper.getBaseDocument().collection(collectionId).orderBy("__name__").get();