diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 32a8c948e0..81e14d53a5 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -31,7 +31,6 @@ import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; -import java.time.Instant; import java.util.Comparator; import java.util.Deque; import java.util.HashMap; @@ -610,7 +609,7 @@ private void requestCallback(AppendRowsResponse response) { this.lock.lock(); if (response.hasUpdatedSchema()) { this.updatedSchema = - TableSchemaAndTimestamp.create(Instant.now(), response.getUpdatedSchema()); + TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema()); } try { // Had a successful connection with at least one result, reset retries. @@ -824,12 +823,12 @@ public static void setOverwhelmedCountsThreshold(double newThreshold) { @AutoValue abstract static class TableSchemaAndTimestamp { // Shows the timestamp updated schema is reported from response - abstract Instant updateTimeStamp(); + abstract long updateTimeStamp(); // The updated schema returned from server. abstract TableSchema updatedSchema(); - static TableSchemaAndTimestamp create(Instant updateTimeStamp, TableSchema updatedSchema) { + static TableSchemaAndTimestamp create(long updateTimeStamp, TableSchema updatedSchema) { return new AutoValue_ConnectionWorker_TableSchemaAndTimestamp(updateTimeStamp, updatedSchema); } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index dea49b62db..e119f4c560 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; -import java.time.Instant; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -268,7 +267,7 @@ public ApiFuture append( if (response.getWriteStream() != "" && response.hasUpdatedSchema()) { tableNameToUpdatedSchema.put( response.getWriteStream(), - TableSchemaAndTimestamp.create(Instant.now(), response.getUpdatedSchema())); + TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema())); } return response; }, diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 744839f3db..f9e6186edc 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -29,7 +29,6 @@ import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; -import java.time.Instant; import java.util.Map; import java.util.Objects; import java.util.UUID; @@ -88,7 +87,7 @@ public class StreamWriter implements AutoCloseable { new ConcurrentHashMap<>(); /** Creation timestamp of this streamwriter */ - private final Instant creationTimestamp; + private final long creationTimestamp; /** The maximum size of one request. Defined by the API. */ public static long getApiMaxRequestBytes() { @@ -260,7 +259,7 @@ private StreamWriter(Builder builder) throws IOException { client.close(); } } - this.creationTimestamp = Instant.now(); + this.creationTimestamp = System.nanoTime(); } @VisibleForTesting @@ -414,12 +413,12 @@ public synchronized TableSchema getUpdatedSchema() { if (tableSchemaAndTimestamp == null) { return null; } - return creationTimestamp.compareTo(tableSchemaAndTimestamp.updateTimeStamp()) < 0 + return creationTimestamp < tableSchemaAndTimestamp.updateTimeStamp() ? tableSchemaAndTimestamp.updatedSchema() : null; } - Instant getCreationTimestamp() { + long getCreationTimestamp() { return creationTimestamp; } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 6fc0936ee4..258a287a1c 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -530,8 +530,6 @@ public void run() throws Throwable { public void testSimpleSchemaUpdate() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - // Sleep for a short period to make sure the creation timestamp is older. - Sleeper.DEFAULT.sleep(200); testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() .setAppendResult( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index e59b40e92b..134b438593 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import com.google.api.client.util.Sleeper; import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.NoCredentialsProvider; @@ -310,8 +309,6 @@ private void testUpdatedSchemaFetch(boolean enableMultiplexing) AppendRowsResponse response = writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0).get(); assertEquals(writer.getUpdatedSchema(), UPDATED_TABLE_SCHEMA); - // Sleep for a short period to make sure the creation timestamp is older. - Sleeper.DEFAULT.sleep(200); // Create another writer, although it's the same stream name but the time stamp is newer, thus // the old updated schema won't get returned.