diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 8eab8bfdde..19d8911ee8 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -198,6 +198,12 @@ class ConnectionWorker implements AutoCloseable { */ private final String writerId = UUID.randomUUID().toString(); + /* + * Test only exception behavior testing params. + */ + private RuntimeException testOnlyRunTimeExceptionInAppendLoop = null; + private long testOnlyAppendLoopSleepTime = 0; + /** The maximum size of one request. Defined by the API. */ public static long getApiMaxRequestBytes() { return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) @@ -240,6 +246,25 @@ public void run() { appendLoop(); } }); + appendThread.setUncaughtExceptionHandler( + (Thread t, Throwable e) -> { + log.warning( + "Exception thrown from append loop, thus stream writer is shutdown due to exception: " + + e.toString()); + e.printStackTrace(); + lock.lock(); + try { + connectionFinalStatus = e; + // Move all current waiting requests to in flight queue. + while (!this.waitingRequestQueue.isEmpty()) { + AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst(); + this.inflightRequestQueue.addLast(requestWrapper); + } + } finally { + lock.unlock(); + } + cleanupInflightRequests(); + }); this.appendThread.start(); } @@ -249,6 +274,8 @@ private void resetConnection() { // It's safe to directly close the previous connection as the in flight messages // will be picked up by the next connection. this.streamConnection.close(); + Uninterruptibles.sleepUninterruptibly( + calculateSleepTimeMilli(conectionRetryCountWithoutCallback), TimeUnit.MILLISECONDS); } this.streamConnection = new StreamConnection( @@ -391,6 +418,22 @@ private void maybeWaitForInflightQuota() { inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000); } + @VisibleForTesting + static long calculateSleepTimeMilli(long retryCount) { + return Math.min((long) Math.pow(2, retryCount), 60000); + } + + @VisibleForTesting + void setTestOnlyAppendLoopSleepTime(long testOnlyAppendLoopSleepTime) { + this.testOnlyAppendLoopSleepTime = testOnlyAppendLoopSleepTime; + } + + @VisibleForTesting + void setTestOnlyRunTimeExceptionInAppendLoop( + RuntimeException testOnlyRunTimeExceptionInAppendLoop) { + this.testOnlyRunTimeExceptionInAppendLoop = testOnlyRunTimeExceptionInAppendLoop; + } + public long getInflightWaitSeconds() { return inflightWaitSec.longValue(); } @@ -524,6 +567,10 @@ private void appendLoop() { } finally { lock.unlock(); } + if (testOnlyRunTimeExceptionInAppendLoop != null) { + Uninterruptibles.sleepUninterruptibly(testOnlyAppendLoopSleepTime, TimeUnit.MILLISECONDS); + throw testOnlyRunTimeExceptionInAppendLoop; + } resetConnection(); // Set firstRequestInConnection to indicate the next request to be sent should include // metedata. Reset everytime after reconnection. diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 4edf0f3e9d..fbd0850ee0 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -38,6 +38,7 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutionException; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -351,6 +352,72 @@ public void testAppendButInflightQueueFull() throws Exception { } } + @Test + public void testThrowExceptionWhileWithinAppendLoop() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + client.getSettings()); + testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); + ConnectionWorker.setMaxInflightQueueWaitTime(500); + + long appendCount = 10; + for (int i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + connectionWorker.setTestOnlyRunTimeExceptionInAppendLoop( + new RuntimeException("Any exception can happen.")); + // Sleep 1 second before erroring out. + connectionWorker.setTestOnlyAppendLoopSleepTime(1000L); + + // In total insert 5 requests, + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + futures.add( + sendTestMessage( + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i)); + assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1); + } + + for (int i = 0; i < appendCount; i++) { + int finalI = i; + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> futures.get(finalI).get().getAppendResult().getOffset().getValue()); + assertThat(ex.getCause()).hasMessageThat().contains("Any exception can happen."); + } + + // The future append will directly fail. + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> + sendTestMessage( + connectionWorker, + sw1, + createFooProtoRows(new String[] {String.valueOf(100)}), + 100) + .get()); + assertThat(ex.getCause()).hasMessageThat().contains("Any exception can happen."); + } + + @Test + public void testExponentialBackoff() throws Exception { + assertThat(ConnectionWorker.calculateSleepTimeMilli(0)).isEqualTo(1); + assertThat(ConnectionWorker.calculateSleepTimeMilli(5)).isEqualTo(32); + assertThat(ConnectionWorker.calculateSleepTimeMilli(100)).isEqualTo(60000); + } + private AppendRowsResponse createAppendResponse(long offset) { return AppendRowsResponse.newBuilder() .setAppendResult(