From 3baa84e96671a14936d1667d0e036a1565fa5b7a Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Mon, 2 May 2022 11:22:14 -0700 Subject: [PATCH] fix: A stuck when the client fail to get DoneCallback (#1637) Add a timeout of one minute waiting for done callback to be called. Same timeout as client close. The donecallback mainly gives back the server side error status, so it is not critical. In Dataflow connector, we saw hang because the DoneCallback is lost and we wait forever on it. Stack trace in b/230501926 --- .../bigquery/storage/v1/StreamWriter.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 03b1c69573..01706a5e14 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -465,7 +465,7 @@ private void appendLoop() { // We can close the stream connection and handle the remaining inflight requests. if (streamConnection != null) { this.streamConnection.close(); - waitForDoneCallback(); + waitForDoneCallback(1, TimeUnit.MINUTES); } // At this point, there cannot be more callback. It is safe to clean up all inflight requests. @@ -491,9 +491,10 @@ private boolean waitingQueueDrained() { } } - private void waitForDoneCallback() { + private void waitForDoneCallback(long duration, TimeUnit timeUnit) { log.fine("Waiting for done callback from stream connection. Stream: " + streamName); - while (true) { + long deadline = System.nanoTime() + timeUnit.toNanos(duration); + while (System.nanoTime() <= deadline) { this.lock.lock(); try { if (connectionFinalStatus != null) { @@ -505,6 +506,19 @@ private void waitForDoneCallback() { } Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } + this.lock.lock(); + try { + if (connectionFinalStatus == null) { + connectionFinalStatus = + new StatusRuntimeException( + Status.fromCode(Code.CANCELLED) + .withDescription("Timeout waiting for DoneCallback.")); + } + } finally { + this.lock.unlock(); + } + + return; } private AppendRowsRequest prepareRequestBasedOnPosition(