From f8d1bd901232a61ca87c2671e478136dfd4f2432 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Wed, 27 Jul 2022 12:34:05 -0700 Subject: [PATCH] feat: Add another StreamWriterClosedException and remove RETRY_THRESHOLD (#1713) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Add another StreamWriterClosedException and remove RETRY_THRESHOLD * . * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * . Co-authored-by: Owl Bot --- .../bigquery/storage/v1/StreamWriter.java | 14 ++++--------- .../bigquery/storage/v1/StreamWriterTest.java | 21 ++++++++++++++++++- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 27bffbde69..75bfcdf3b5 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -118,11 +118,6 @@ public class StreamWriter implements AutoCloseable { @GuardedBy("lock") private boolean inflightCleanuped = false; - /* - * Retry threshold, limits how often the connection is retried before processing halts. - */ - private static final long RETRY_THRESHOLD = 3; - /* * Indicates whether user has called Close() or not. */ @@ -327,10 +322,11 @@ private ApiFuture appendInternal(AppendRowsRequest message) if (connectionFinalStatus != null) { requestWrapper.appendResult.setException( - new StatusRuntimeException( + new Exceptions.StreamWriterClosedException( Status.fromCode(Status.Code.FAILED_PRECONDITION) .withDescription( - "Connection is closed due to " + connectionFinalStatus.toString()))); + "Connection is closed due to " + connectionFinalStatus.toString()), + streamName)); return requestWrapper.appendResult; } @@ -653,9 +649,7 @@ private void doneCallback(Throwable finalStatus) { this.streamConnectionIsConnected = false; if (connectionFinalStatus == null) { // If the error can be retried, don't set it here, let it try to retry later on. - if (isRetriableError(finalStatus) - && conectionRetryCountWithoutCallback < RETRY_THRESHOLD - && !userClosed) { + if (isRetriableError(finalStatus) && !userClosed) { this.conectionRetryCountWithoutCallback++; log.fine( "Retriable error " diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index a4330be263..1ff0c65c69 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -655,14 +655,33 @@ public void testWriterClosedStream() throws Exception { } @Test - public void testWriterException() throws Exception { + public void testWriterAlreadyClosedException() throws Exception { StreamWriter writer = getTestStreamWriter(); writer.close(); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 0); Exceptions.StreamWriterClosedException actualError = assertFutureException(Exceptions.StreamWriterClosedException.class, appendFuture1); // The basic StatusRuntimeException API is not changed. + assertTrue(actualError instanceof StatusRuntimeException); assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode()); assertTrue(actualError.getStatus().getDescription().contains("Connection is already closed")); } + + @Test + public void testWriterClosedException() throws Exception { + StreamWriter writer = getTestStreamWriter(); + testBigQueryWrite.addException(Status.INTERNAL.asException()); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 0); + try { + appendFuture1.get(); + } catch (Exception e) { + } + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"A"}, 0); + Exceptions.StreamWriterClosedException actualError = + assertFutureException(Exceptions.StreamWriterClosedException.class, appendFuture2); + // The basic StatusRuntimeException API is not changed. + assertTrue(actualError instanceof StatusRuntimeException); + assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode()); + assertTrue(actualError.getStatus().getDescription().contains("Connection is closed")); + } }