From 23d44a45d68cc0438faf46fbedcb8499efbf7c8d Mon Sep 17 00:00:00 2001 From: Jing Date: Mon, 7 Aug 2023 10:25:22 -0700 Subject: [PATCH] Fix routing header issue in Firestore Beam connector (#27858) * Fix routing header issue in Firestore Beam connector * Apply the lint --------- Co-authored-by: Sichen Liu --- .../io/google-cloud-platform/build.gradle | 4 --- .../FirestoreStatefulComponentFactory.java | 26 ++++++++++++------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 4bbba491bf9f..ee2e86ce0fef 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -207,10 +207,6 @@ task integrationTest(type: Test, dependsOn: processTestResources) { exclude '**/BigQueryToTableIT.class' exclude '**/BigQueryIOJsonTest.class' - // Failing due to Firestore service-side changes - // https://github.com/apache/beam/issues/25851 - exclude '**/firestore/it/**/*.class' - maxParallelForks 4 classpath = sourceSets.test.runtimeClasspath testClassesDirs = sourceSets.test.output.classesDirs diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java index 7c5a2a68090e..ace70fc77f44 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java @@ -66,15 +66,7 @@ private FirestoreStatefulComponentFactory() {} */ FirestoreStub getFirestoreStub(PipelineOptions options) { try { - FirestoreSettings.Builder builder = - FirestoreSettings.newBuilder() - .setHeaderProvider( - new FixedHeaderProvider() { - @Override - public Map<@NonNull String, @NonNull String> getHeaders() { - return ImmutableMap.of("User-Agent", options.getUserAgent()); - } - }); + FirestoreSettings.Builder builder = FirestoreSettings.newBuilder(); RetrySettings retrySettings = RetrySettings.newBuilder().setMaxAttempts(1).build(); @@ -86,6 +78,8 @@ FirestoreStub getFirestoreStub(PipelineOptions options) { FirestoreOptions firestoreOptions = options.as(FirestoreOptions.class); String emulatorHostPort = firestoreOptions.getEmulatorHost(); + ImmutableMap.Builder headers = ImmutableMap.builder(); + headers.put("User-Agent", options.getUserAgent()); if (emulatorHostPort != null) { builder .setCredentialsProvider(FixedCredentialsProvider.create(new EmulatorCredentials())) @@ -100,8 +94,22 @@ FirestoreStub getFirestoreStub(PipelineOptions options) { builder .setCredentialsProvider(FixedCredentialsProvider.create(gcpOptions.getGcpCredential())) .setEndpoint(firestoreOptions.getHost()); + headers.put( + "x-goog-request-params", + "project_id=" + + gcpOptions.getProject() + + "&database_id=" + + firestoreOptions.getFirestoreDb()); } + builder.setHeaderProvider( + new FixedHeaderProvider() { + @Override + public Map<@NonNull String, @NonNull String> getHeaders() { + return headers.build(); + } + }); + ClientContext clientContext = ClientContext.create(builder.build()); return GrpcFirestoreStub.create(clientContext); } catch (Exception e) {