Skip to content

Commit

Permalink
Enable FirestoreV1IT to run against a non default host (#27256)
Browse files Browse the repository at this point in the history
* Added FIRESTORE_HOST env variable to allow endpoint (i.e. host + port) other than default endpoint for testing purposes. If FIRESTORE_HOST env variable is set, it will be used as Firestore endpoint. If FIRESTORE_HOST env variable is not set, the default endpoint will be used.

* Fix format.

* Set host only if options don't have it set already.

* Used FIRESTORE_EMULATOR_HOST env var for emulator host.

* Updated to use host as argument in FirestoreV1IT

* Don't inject value back into FirestoreOptions

* Updates for review comments

* Removed unused import
  • Loading branch information
SabaSathya authored Jul 26, 2023
1 parent 1f4defa commit d3ea232
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 21 deletions.
4 changes: 4 additions & 0 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,13 @@ task integrationTest(type: Test, dependsOn: processTestResources) {
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests'
def firestoreDb = project.findProperty('firestoreDb') ?: 'firestoredb'
def host = project.findProperty('host') ?: 'batch-firestore.googleapis.com:443'
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=DirectRunner",
"--project=${gcpProject}",
"--tempRoot=${gcpTempRoot}",
"--firestoreDb=${firestoreDb}",
"--host=${host}",
])

// Disable Gradle cache: these ITs interact with live service that should always be considered "out of date"
Expand Down Expand Up @@ -224,12 +226,14 @@ task integrationTestKms(type: Test) {
def gcpTempRoot = project.findProperty('gcpTempRootKms') ?: 'gs://temp-storage-for-end-to-end-tests-cmek'
def dataflowKmsKey = project.findProperty('dataflowKmsKey') ?: "projects/apache-beam-testing/locations/global/keyRings/beam-it/cryptoKeys/test"
def firestoreDb = project.findProperty('firestoreDb') ?: 'firestoredb'
def host = project.findProperty('host') ?: 'batch-firestore.googleapis.com:443'
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=DirectRunner",
"--project=${gcpProject}",
"--tempRoot=${gcpTempRoot}",
"--dataflowKmsKey=${dataflowKmsKey}",
"--firestoreDb=${firestoreDb}",
"--host=${host}",
])

// Disable Gradle cache: these ITs interact with live service that should always be considered "out of date"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,22 @@ public interface FirestoreOptions extends PipelineOptions {

/** Set the Firestore database ID to connect to. */
void setFirestoreDb(String firestoreDb);

/**
* A host port pair to allow connecting to a Cloud Firestore instead of the default live service.
*
* @return the string representation of a host and port pair to be used when constructing Cloud
* Firestore clients.
*/
@Description("Firestore endpoint (host and port)")
@Default.String("batch-firestore.googleapis.com:443")
String getHost();

/**
* Define a host port pair to allow connecting to a Cloud Firestore instead of the default live
* service.
*
* @param host the host and port to connect to
*/
void setHost(String host);
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ FirestoreStub getFirestoreStub(PipelineOptions options) {
GcpOptions gcpOptions = options.as(GcpOptions.class);
builder
.setCredentialsProvider(FixedCredentialsProvider.create(gcpOptions.getGcpCredential()))
.setEndpoint("batch-firestore.googleapis.com:443");
.setEndpoint(firestoreOptions.getHost());
}

ClientContext clientContext = ClientContext.create(builder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,10 @@ abstract class BaseFirestoreIT {
.build();

protected static String project;
protected GcpOptions options;

@Before
public void setup() {
options = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
project = options.getProject();
project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
}

private static Instant toWriteTime(WriteResult result) {
Expand Down Expand Up @@ -166,7 +164,7 @@ public final void listCollections() throws Exception {
.build());

PAssert.that(actualCollectionIds).containsInAnyOrder(allCollectionIds);
testPipeline.run(options);
testPipeline.run(TestPipeline.testingPipelineOptions());

// Reading from readTime should only get collection IDs written in the batch before readTime.
PCollection<String> actualCollectionIdsAtReadTime =
Expand All @@ -181,7 +179,7 @@ public final void listCollections() throws Exception {
.withRpcQosOptions(RPC_QOS_OPTIONS)
.build());
PAssert.that(actualCollectionIdsAtReadTime).containsInAnyOrder(collectionIds);
testPipeline2.run(options);
testPipeline2.run(TestPipeline.testingPipelineOptions());
}

@Test
Expand Down Expand Up @@ -212,7 +210,7 @@ public final void listDocuments() throws Exception {
.apply(ParDo.of(new DocumentToName()));

PAssert.that(listDocumentPaths).containsInAnyOrder(allDocumentPaths);
testPipeline.run(options);
testPipeline.run(TestPipeline.testingPipelineOptions());

// Reading from readTime should only get the documents written before readTime.
PCollection<String> listDocumentPathsAtReadTime =
Expand All @@ -230,7 +228,7 @@ public final void listDocuments() throws Exception {

PAssert.that(listDocumentPathsAtReadTime)
.containsInAnyOrder(documentGenerator.expectedDocumentPaths());
testPipeline2.run(options);
testPipeline2.run(TestPipeline.testingPipelineOptions());
}

@Test
Expand Down Expand Up @@ -264,7 +262,7 @@ public final void runQuery() throws Exception {
.apply(ParDo.of(new DocumentToName()));

PAssert.that(listDocumentPaths).containsInAnyOrder(allDocumentPaths);
testPipeline.run(options);
testPipeline.run(TestPipeline.testingPipelineOptions());

// Reading from readTime should only get the documents written before readTime.
PCollection<String> listDocumentPathsAtReadTime =
Expand All @@ -283,7 +281,7 @@ public final void runQuery() throws Exception {

PAssert.that(listDocumentPathsAtReadTime)
.containsInAnyOrder(documentGenerator.expectedDocumentPaths());
testPipeline2.run(options);
testPipeline2.run(TestPipeline.testingPipelineOptions());
}

@Test
Expand Down Expand Up @@ -323,7 +321,7 @@ public final void partitionQuery() throws Exception {
.apply(ParDo.of(new DocumentToName()));

PAssert.that(listDocumentPaths).containsInAnyOrder(allDocumentPaths);
testPipeline.run(options);
testPipeline.run(TestPipeline.testingPipelineOptions());

// Reading from readTime should only get the documents written before readTime.
PCollection<String> listDocumentPathsAtReadTime =
Expand All @@ -343,7 +341,7 @@ public final void partitionQuery() throws Exception {

PAssert.that(listDocumentPathsAtReadTime)
.containsInAnyOrder(documentGenerator.expectedDocumentPaths());
testPipeline2.run(options);
testPipeline2.run(TestPipeline.testingPipelineOptions());
}

@Test
Expand Down Expand Up @@ -387,7 +385,7 @@ public final void batchGet() throws Exception {
.apply(ParDo.of(new DocumentToName()));

PAssert.that(listDocumentPaths).containsInAnyOrder(allDocumentPaths);
testPipeline.run(options);
testPipeline.run(TestPipeline.testingPipelineOptions());

// Reading from readTime should only get the documents written before readTime.
PCollection<String> listDocumentPathsAtReadTime =
Expand All @@ -407,7 +405,7 @@ public final void batchGet() throws Exception {

PAssert.that(listDocumentPathsAtReadTime)
.containsInAnyOrder(documentGenerator.expectedDocumentPaths());
testPipeline2.run(options);
testPipeline2.run(TestPipeline.testingPipelineOptions());
}

@Test
Expand Down Expand Up @@ -445,7 +443,7 @@ protected final void runWriteTest(
.apply(createWrite)
.apply(FirestoreIO.v1().write().batchWrite().withRpcQosOptions(RPC_QOS_OPTIONS).build());

testPipeline.run(options);
testPipeline.run(TestPipeline.testingPipelineOptions());

List<String> actualDocumentIds =
helper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ enum DataLayout {
DataLayout value() default DataLayout.Shallow;
}

private final GcpOptions options;
private final GcpOptions gcpOptions;
private final org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions firestoreBeamOptions;
private final FirestoreOptions firestoreOptions;

Expand All @@ -125,17 +125,17 @@ enum DataLayout {
"initialization.fields.uninitialized") // testClass and testName are managed via #apply
public FirestoreTestingHelper(CleanupMode cleanupMode) {
this.cleanupMode = cleanupMode;
options = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
gcpOptions = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
firestoreBeamOptions =
TestPipeline.testingPipelineOptions()
.as(org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions.class);
firestoreOptions =
FirestoreOptions.newBuilder()
.setCredentials(options.getGcpCredential())
.setProjectId(options.getProject())
.setCredentials(gcpOptions.getGcpCredential())
.setProjectId(gcpOptions.getProject())
.setDatabaseId(firestoreBeamOptions.getFirestoreDb())
.setHost(firestoreBeamOptions.getHost())
.build();

fs = firestoreOptions.getService();
rpc = (FirestoreRpc) firestoreOptions.getRpc();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.WriteFailure;
import org.apache.beam.sdk.io.gcp.firestore.RpcQosOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -133,7 +134,7 @@ public void batchWrite_partialFailureOutputsToDeadLetterQueue()
assertFalse(iterator.hasNext());
return null;
});
testPipeline.run(this.options);
testPipeline.run(TestPipeline.testingPipelineOptions());

ApiFuture<QuerySnapshot> actualDocsQuery =
helper.getBaseDocument().collection(collectionId).orderBy("__name__").get();
Expand Down

0 comments on commit d3ea232

Please sign in to comment.