From 3d9ea575056d1a71766d83add2f02965a2a63927 Mon Sep 17 00:00:00 2001 From: yirutang Date: Fri, 4 Feb 2022 16:50:56 -0800 Subject: [PATCH 1/4] feat: Add a indicator of how much time a request is waiting in for the inflight limit. --- .../bigquery/storage/v1/JsonStreamWriter.java | 11 +++++++++++ .../bigquery/storage/v1/StreamWriter.java | 19 +++++++++++++++++++ .../bigquery/storage/v1/StreamWriterTest.java | 4 +--- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index a16273db44..ba4d13da7d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -164,6 +164,17 @@ public Descriptor getDescriptor() { return this.descriptor; } + /** + * Returns the wait of a request in Client side before sending to the Server. Request could wait + * in Client because it reached the client side inflightRequest limit (adjustable when + * constructing the StreamWriter). The value is the wait time for the last sent request. A + * constant high wait value indicates a need for more throughput, you can create a new + * StreamWriter for a new connection to increase the throughput. + */ + public long getInflightWaitSeconds() { + return streamWriter.getInflightWaitSeconds(); + } + /** Sets all StreamWriter settings. */ private void setStreamWriterSettings( @Nullable TransportChannelProvider channelProvider, diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 3b8c47585d..3b8f5bf897 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -34,6 +34,7 @@ import java.util.Deque; import java.util.LinkedList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -159,6 +160,11 @@ public class StreamWriter implements AutoCloseable { */ private Thread appendThread; + /* + * The inflight wait time for the previous sent request. + */ + private final AtomicLong inflightWaitSec = new AtomicLong(0); + /** The maximum size of one request. Defined by the API. */ public static long getApiMaxRequestBytes() { return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) @@ -316,6 +322,7 @@ private ApiFuture appendInternal(AppendRowsRequest message) @GuardedBy("lock") private void maybeWaitForInflightQuota() { + long start_time = System.currentTimeMillis(); while (this.inflightRequests >= this.maxInflightRequests || this.inflightBytes >= this.maxInflightBytes) { try { @@ -332,6 +339,18 @@ private void maybeWaitForInflightQuota() { .withDescription("Interrupted while waiting for quota.")); } } + inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000); + } + + // Returns the wait of a request in Client side before sending to the Server. Request could wait + // in Client because + // it reached the client side inflightRequest limit (adjustable when constructing the + // StreamWriter). + // The value is the wait time for the last sent request. A constant high wait value indicates a + // need for more + // throughput, you can create a new StreamWriter for a new connection to increase the throughput. + public long getInflightWaitSeconds() { + return inflightWaitSec.longValue(); } /** Close the stream writer. Shut down all resources. */ diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 692f6ce9bc..e166fc66b5 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -547,10 +547,8 @@ public void testOneMaxInflightRequests() throws Exception { testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1)); testBigQueryWrite.addResponse(createAppendResponse(0)); - long appendStartTimeMs = System.currentTimeMillis(); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - long appendElapsedMs = System.currentTimeMillis() - appendStartTimeMs; - assertTrue(appendElapsedMs >= 1000); + assertTrue(writer.getInflightWaitSeconds() >= 1); assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); writer.close(); } From 28b9dc6838982e59d7a545a787366ca423a44a95 Mon Sep 17 00:00:00 2001 From: yirutang Date: Fri, 4 Feb 2022 16:55:28 -0800 Subject: [PATCH 2/4] . --- .../bigquery/storage/v1/JsonStreamWriter.java | 6 +++--- .../cloud/bigquery/storage/v1/StreamWriter.java | 15 ++++++++------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index ba4d13da7d..ab08aeb666 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -167,9 +167,9 @@ public Descriptor getDescriptor() { /** * Returns the wait of a request in Client side before sending to the Server. Request could wait * in Client because it reached the client side inflightRequest limit (adjustable when - * constructing the StreamWriter). The value is the wait time for the last sent request. A - * constant high wait value indicates a need for more throughput, you can create a new - * StreamWriter for a new connection to increase the throughput. + * constructing the Writer). The value is the wait time for the last sent request. A constant high + * wait value indicates a need for more throughput, you can create a new Stream for to increase + * the throughput in exclusive stream case, or create a new Writer in the default stream case. */ public long getInflightWaitSeconds() { return streamWriter.getInflightWaitSeconds(); diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 3b8f5bf897..01a8e9acc9 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -342,13 +342,14 @@ private void maybeWaitForInflightQuota() { inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000); } - // Returns the wait of a request in Client side before sending to the Server. Request could wait - // in Client because - // it reached the client side inflightRequest limit (adjustable when constructing the - // StreamWriter). - // The value is the wait time for the last sent request. A constant high wait value indicates a - // need for more - // throughput, you can create a new StreamWriter for a new connection to increase the throughput. + /** + * Returns the wait of a request in Client side before sending to the Server. Request could wait + * in Client because it reached the client side inflightRequest limit (adjustable when + * constructing the StreamWriter). The value is the wait time for the last sent request. A + * constant high wait value indicates a need for more throughput, you can create a new Stream for + * to increase the throughput in exclusive stream case, or create a new Writer in the default + * stream case. + */ public long getInflightWaitSeconds() { return inflightWaitSec.longValue(); } From 7d6b9a44bf79b79fc34036c85f849d04f7fa24d5 Mon Sep 17 00:00:00 2001 From: yirutang Date: Fri, 4 Feb 2022 17:36:25 -0800 Subject: [PATCH 3/4] . --- .../com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java | 2 +- .../java/com/google/cloud/bigquery/storage/v1/StreamWriter.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index ab08aeb666..d755c10524 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -166,7 +166,7 @@ public Descriptor getDescriptor() { /** * Returns the wait of a request in Client side before sending to the Server. Request could wait - * in Client because it reached the client side inflightRequest limit (adjustable when + * in Client because it reached the client side inflight request limit (adjustable when * constructing the Writer). The value is the wait time for the last sent request. A constant high * wait value indicates a need for more throughput, you can create a new Stream for to increase * the throughput in exclusive stream case, or create a new Writer in the default stream case. diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 01a8e9acc9..cfca94fa86 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -344,7 +344,7 @@ private void maybeWaitForInflightQuota() { /** * Returns the wait of a request in Client side before sending to the Server. Request could wait - * in Client because it reached the client side inflightRequest limit (adjustable when + * in Client because it reached the client side inflight request limit (adjustable when * constructing the StreamWriter). The value is the wait time for the last sent request. A * constant high wait value indicates a need for more throughput, you can create a new Stream for * to increase the throughput in exclusive stream case, or create a new Writer in the default From 59337c70baadd9f5a421c2845f9965bab6c9dae8 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Mon, 7 Feb 2022 15:59:35 +0000 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 40ca391291..593a36f589 100644 --- a/README.md +++ b/README.md @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies ```Groovy -implementation platform('com.google.cloud:libraries-bom:24.1.1') +implementation platform('com.google.cloud:libraries-bom:24.2.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` If you are using Gradle without BOM, add this to your dependencies ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.7.0' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.8.4' ``` If you are using SBT, add this to your dependencies ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.7.0" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.8.4" ``` ## Authentication