From e5cd7df54e2f6af12c240268a91e0afc2ec27a8a Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Wed, 16 Nov 2022 13:17:33 -0800 Subject: [PATCH] feat: fix windows build failure by using nanoSeconds instead of Instant for better accuracy. (#1887) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: add schema update support to multiplexing * fix: fix windows build bug: windows Instant resolution is different with linux * fix: fix another failing tests for windows build * fix: fix another test failure for Windows build Co-authored-by: Owl Bot --- .../cloud/bigquery/storage/v1/ConnectionWorker.java | 7 +++---- .../cloud/bigquery/storage/v1/ConnectionWorkerPool.java | 3 +-- .../google/cloud/bigquery/storage/v1/StreamWriter.java | 9 ++++----- .../cloud/bigquery/storage/v1/JsonStreamWriterTest.java | 2 -- .../cloud/bigquery/storage/v1/StreamWriterTest.java | 3 --- 5 files changed, 8 insertions(+), 16 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 32a8c948e0..81e14d53a5 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -31,7 +31,6 @@ import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; -import java.time.Instant; import java.util.Comparator; import java.util.Deque; import java.util.HashMap; @@ -610,7 +609,7 @@ private void requestCallback(AppendRowsResponse response) { this.lock.lock(); if (response.hasUpdatedSchema()) { this.updatedSchema = - TableSchemaAndTimestamp.create(Instant.now(), response.getUpdatedSchema()); + TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema()); } try { // Had a successful connection with at least one result, reset retries. @@ -824,12 +823,12 @@ public static void setOverwhelmedCountsThreshold(double newThreshold) { @AutoValue abstract static class TableSchemaAndTimestamp { // Shows the timestamp updated schema is reported from response - abstract Instant updateTimeStamp(); + abstract long updateTimeStamp(); // The updated schema returned from server. abstract TableSchema updatedSchema(); - static TableSchemaAndTimestamp create(Instant updateTimeStamp, TableSchema updatedSchema) { + static TableSchemaAndTimestamp create(long updateTimeStamp, TableSchema updatedSchema) { return new AutoValue_ConnectionWorker_TableSchemaAndTimestamp(updateTimeStamp, updatedSchema); } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index dea49b62db..e119f4c560 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; -import java.time.Instant; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -268,7 +267,7 @@ public ApiFuture append( if (response.getWriteStream() != "" && response.hasUpdatedSchema()) { tableNameToUpdatedSchema.put( response.getWriteStream(), - TableSchemaAndTimestamp.create(Instant.now(), response.getUpdatedSchema())); + TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema())); } return response; }, diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 744839f3db..f9e6186edc 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -29,7 +29,6 @@ import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; -import java.time.Instant; import java.util.Map; import java.util.Objects; import java.util.UUID; @@ -88,7 +87,7 @@ public class StreamWriter implements AutoCloseable { new ConcurrentHashMap<>(); /** Creation timestamp of this streamwriter */ - private final Instant creationTimestamp; + private final long creationTimestamp; /** The maximum size of one request. Defined by the API. */ public static long getApiMaxRequestBytes() { @@ -260,7 +259,7 @@ private StreamWriter(Builder builder) throws IOException { client.close(); } } - this.creationTimestamp = Instant.now(); + this.creationTimestamp = System.nanoTime(); } @VisibleForTesting @@ -414,12 +413,12 @@ public synchronized TableSchema getUpdatedSchema() { if (tableSchemaAndTimestamp == null) { return null; } - return creationTimestamp.compareTo(tableSchemaAndTimestamp.updateTimeStamp()) < 0 + return creationTimestamp < tableSchemaAndTimestamp.updateTimeStamp() ? tableSchemaAndTimestamp.updatedSchema() : null; } - Instant getCreationTimestamp() { + long getCreationTimestamp() { return creationTimestamp; } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 6fc0936ee4..258a287a1c 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -530,8 +530,6 @@ public void run() throws Throwable { public void testSimpleSchemaUpdate() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - // Sleep for a short period to make sure the creation timestamp is older. - Sleeper.DEFAULT.sleep(200); testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() .setAppendResult( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index e59b40e92b..134b438593 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import com.google.api.client.util.Sleeper; import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.NoCredentialsProvider; @@ -310,8 +309,6 @@ private void testUpdatedSchemaFetch(boolean enableMultiplexing) AppendRowsResponse response = writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0).get(); assertEquals(writer.getUpdatedSchema(), UPDATED_TABLE_SCHEMA); - // Sleep for a short period to make sure the creation timestamp is older. - Sleeper.DEFAULT.sleep(200); // Create another writer, although it's the same stream name but the time stamp is newer, thus // the old updated schema won't get returned.