From dd5bbb2d77d7f3d057baad3d872677671c730345 Mon Sep 17 00:00:00 2001 From: scwhittle Date: Tue, 16 Jan 2024 09:56:10 +0100 Subject: [PATCH] Add a read timeout and cache BigQueryIOMetadata (#29662) --- .../extensions/gcp/util/GceMetadataUtil.java | 1 + .../io/gcp/bigquery/BigQueryIOMetadata.java | 38 ++++++++++--------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java index fd49b759fd6d..e63aa7dc677a 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java @@ -44,6 +44,7 @@ static String fetchMetadata(String key) { int timeoutMillis = 5000; final HttpParams httpParams = new BasicHttpParams(); HttpConnectionParams.setConnectionTimeout(httpParams, timeoutMillis); + HttpConnectionParams.setSoTimeout(httpParams, timeoutMillis); String ret = ""; try { HttpClient client = new DefaultHttpClient(httpParams); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java index 9cce436fe351..f8d261d3bf66 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java @@ -18,19 +18,25 @@ package org.apache.beam.sdk.io.gcp.bigquery; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; import org.checkerframework.checker.nullness.qual.Nullable; /** Metadata class for BigQueryIO. i.e. to use as BQ job labels. */ final class BigQueryIOMetadata { - private @Nullable String beamJobId; + private final @Nullable String beamJobId; - private @Nullable String beamJobName; + private final @Nullable String beamJobName; - private @Nullable String beamWorkerId; + private final @Nullable String beamWorkerId; + + static final Supplier INSTANCE = + Suppliers.memoizeWithExpiration(() -> refreshInstance(), 5, TimeUnit.MINUTES); private BigQueryIOMetadata( @Nullable String beamJobId, @Nullable String beamJobName, @Nullable String beamWorkerId) { @@ -47,25 +53,21 @@ private BigQueryIOMetadata( * being used. */ public static BigQueryIOMetadata create() { - String dataflowJobId = GceMetadataUtil.fetchDataflowJobId(); - String dataflowJobName = GceMetadataUtil.fetchDataflowJobName(); - String dataflowWorkerId = GceMetadataUtil.fetchDataflowWorkerId(); + return INSTANCE.get(); + } + private static BigQueryIOMetadata refreshInstance() { + String dataflowJobId = GceMetadataUtil.fetchDataflowJobId(); // If a Dataflow job id is returned on GCE metadata. Then it means // this program is running on a Dataflow GCE VM. - boolean isDataflowRunner = !dataflowJobId.isEmpty(); - - String beamJobId = null; - String beamJobName = null; - String beamWorkerId = null; - if (isDataflowRunner) { - if (BigQueryIOMetadata.isValidCloudLabel(dataflowJobId)) { - beamJobId = dataflowJobId; - beamJobName = dataflowJobName; - beamWorkerId = dataflowWorkerId; - } + if (dataflowJobId.isEmpty() || !BigQueryIOMetadata.isValidCloudLabel(dataflowJobId)) { + return new BigQueryIOMetadata(null, null, null); } - return new BigQueryIOMetadata(beamJobId, beamJobName, beamWorkerId); + + return new BigQueryIOMetadata( + dataflowJobId, + GceMetadataUtil.fetchDataflowJobName(), + GceMetadataUtil.fetchDataflowWorkerId()); } public Map addAdditionalJobLabels(Map jobLabels) {