Skip to content

Commit

Permalink
Fix routing header issue in Firestore Beam connector (#27858)
Browse files Browse the repository at this point in the history
* Fix routing header issue in Firestore Beam connector

* Apply the lint

---------

Co-authored-by: Sichen Liu <[email protected]>
  • Loading branch information
pl04351820 and Sichen Liu authored Aug 7, 2023
1 parent 347f84c commit 23d44a4
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
4 changes: 0 additions & 4 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,6 @@ task integrationTest(type: Test, dependsOn: processTestResources) {
exclude '**/BigQueryToTableIT.class'
exclude '**/BigQueryIOJsonTest.class'

// Failing due to Firestore service-side changes
// https://github.com/apache/beam/issues/25851
exclude '**/firestore/it/**/*.class'

maxParallelForks 4
classpath = sourceSets.test.runtimeClasspath
testClassesDirs = sourceSets.test.output.classesDirs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,7 @@ private FirestoreStatefulComponentFactory() {}
*/
FirestoreStub getFirestoreStub(PipelineOptions options) {
try {
FirestoreSettings.Builder builder =
FirestoreSettings.newBuilder()
.setHeaderProvider(
new FixedHeaderProvider() {
@Override
public Map<@NonNull String, @NonNull String> getHeaders() {
return ImmutableMap.of("User-Agent", options.getUserAgent());
}
});
FirestoreSettings.Builder builder = FirestoreSettings.newBuilder();

RetrySettings retrySettings = RetrySettings.newBuilder().setMaxAttempts(1).build();

Expand All @@ -86,6 +78,8 @@ FirestoreStub getFirestoreStub(PipelineOptions options) {

FirestoreOptions firestoreOptions = options.as(FirestoreOptions.class);
String emulatorHostPort = firestoreOptions.getEmulatorHost();
ImmutableMap.Builder<String, String> headers = ImmutableMap.builder();
headers.put("User-Agent", options.getUserAgent());
if (emulatorHostPort != null) {
builder
.setCredentialsProvider(FixedCredentialsProvider.create(new EmulatorCredentials()))
Expand All @@ -100,8 +94,22 @@ FirestoreStub getFirestoreStub(PipelineOptions options) {
builder
.setCredentialsProvider(FixedCredentialsProvider.create(gcpOptions.getGcpCredential()))
.setEndpoint(firestoreOptions.getHost());
headers.put(
"x-goog-request-params",
"project_id="
+ gcpOptions.getProject()
+ "&database_id="
+ firestoreOptions.getFirestoreDb());
}

builder.setHeaderProvider(
new FixedHeaderProvider() {
@Override
public Map<@NonNull String, @NonNull String> getHeaders() {
return headers.build();
}
});

ClientContext clientContext = ClientContext.create(builder.build());
return GrpcFirestoreStub.create(clientContext);
} catch (Exception e) {
Expand Down

0 comments on commit 23d44a4

Please sign in to comment.