Skip to content

Commit

Permalink
fix: catch uncaught exception from append loop and add expoential ret…
Browse files Browse the repository at this point in the history
…ry to reconnection (#2015)

* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* feat: add getInflightWaitSeconds implementation

* feat: Add schema comparision in connection loop to ensure schema update for
the same stream name can be notified

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: add schema update support to multiplexing

* fix: fix windows build bug: windows Instant resolution is different with
linux

* fix: fix another failing tests for windows build

* fix: fix another test failure for Windows build

* feat: Change new thread for each retry to be a thread pool to avoid
create/tear down too much threads if lots of retries happens

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: add back the background executor provider that's accidentally
removed

* feat: throw error when use connection pool for explicit stream

* fix: Add precision truncation to the passed in value from JSON float and
double type.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* modify the bom version

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix deadlockissue in ConnectionWorkerPool

* fix: fix deadlock issue during close + append for multiplexing

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: fix one potential root cause of deadlock issue for non-multiplexing
case

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Add timeout to inflight queue waiting, and also add some extra log

* feat: allow java client lib handle switch table schema for the same stream
name

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: close before retry connection

* fix: close before retry connection

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] authored Feb 24, 2023
1 parent 66db8fe commit 35db0fb
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ class ConnectionWorker implements AutoCloseable {
*/
private final String writerId = UUID.randomUUID().toString();

/*
* Test only exception behavior testing params.
*/
private RuntimeException testOnlyRunTimeExceptionInAppendLoop = null;
private long testOnlyAppendLoopSleepTime = 0;

/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
Expand Down Expand Up @@ -240,6 +246,25 @@ public void run() {
appendLoop();
}
});
appendThread.setUncaughtExceptionHandler(
(Thread t, Throwable e) -> {
log.warning(
"Exception thrown from append loop, thus stream writer is shutdown due to exception: "
+ e.toString());
e.printStackTrace();
lock.lock();
try {
connectionFinalStatus = e;
// Move all current waiting requests to in flight queue.
while (!this.waitingRequestQueue.isEmpty()) {
AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
this.inflightRequestQueue.addLast(requestWrapper);
}
} finally {
lock.unlock();
}
cleanupInflightRequests();
});
this.appendThread.start();
}

Expand All @@ -249,6 +274,8 @@ private void resetConnection() {
// It's safe to directly close the previous connection as the in flight messages
// will be picked up by the next connection.
this.streamConnection.close();
Uninterruptibles.sleepUninterruptibly(
calculateSleepTimeMilli(conectionRetryCountWithoutCallback), TimeUnit.MILLISECONDS);
}
this.streamConnection =
new StreamConnection(
Expand Down Expand Up @@ -391,6 +418,22 @@ private void maybeWaitForInflightQuota() {
inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000);
}

@VisibleForTesting
static long calculateSleepTimeMilli(long retryCount) {
return Math.min((long) Math.pow(2, retryCount), 60000);
}

@VisibleForTesting
void setTestOnlyAppendLoopSleepTime(long testOnlyAppendLoopSleepTime) {
this.testOnlyAppendLoopSleepTime = testOnlyAppendLoopSleepTime;
}

@VisibleForTesting
void setTestOnlyRunTimeExceptionInAppendLoop(
RuntimeException testOnlyRunTimeExceptionInAppendLoop) {
this.testOnlyRunTimeExceptionInAppendLoop = testOnlyRunTimeExceptionInAppendLoop;
}

public long getInflightWaitSeconds() {
return inflightWaitSec.longValue();
}
Expand Down Expand Up @@ -524,6 +567,10 @@ private void appendLoop() {
} finally {
lock.unlock();
}
if (testOnlyRunTimeExceptionInAppendLoop != null) {
Uninterruptibles.sleepUninterruptibly(testOnlyAppendLoopSleepTime, TimeUnit.MILLISECONDS);
throw testOnlyRunTimeExceptionInAppendLoop;
}
resetConnection();
// Set firstRequestInConnection to indicate the next request to be sent should include
// metedata. Reset everytime after reconnection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -351,6 +352,72 @@ public void testAppendButInflightQueueFull() throws Exception {
}
}

@Test
public void testThrowExceptionWhileWithinAppendLoop() throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
StreamWriter sw1 =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
ConnectionWorker connectionWorker =
new ConnectionWorker(
TEST_STREAM_1,
createProtoSchema("foo"),
100000,
100000,
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
client.getSettings());
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
ConnectionWorker.setMaxInflightQueueWaitTime(500);

long appendCount = 10;
for (int i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
connectionWorker.setTestOnlyRunTimeExceptionInAppendLoop(
new RuntimeException("Any exception can happen."));
// Sleep 1 second before erroring out.
connectionWorker.setTestOnlyAppendLoopSleepTime(1000L);

// In total insert 5 requests,
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
futures.add(
sendTestMessage(
connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i));
assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1);
}

for (int i = 0; i < appendCount; i++) {
int finalI = i;
ExecutionException ex =
assertThrows(
ExecutionException.class,
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
assertThat(ex.getCause()).hasMessageThat().contains("Any exception can happen.");
}

// The future append will directly fail.
ExecutionException ex =
assertThrows(
ExecutionException.class,
() ->
sendTestMessage(
connectionWorker,
sw1,
createFooProtoRows(new String[] {String.valueOf(100)}),
100)
.get());
assertThat(ex.getCause()).hasMessageThat().contains("Any exception can happen.");
}

@Test
public void testExponentialBackoff() throws Exception {
assertThat(ConnectionWorker.calculateSleepTimeMilli(0)).isEqualTo(1);
assertThat(ConnectionWorker.calculateSleepTimeMilli(5)).isEqualTo(32);
assertThat(ConnectionWorker.calculateSleepTimeMilli(100)).isEqualTo(60000);
}

private AppendRowsResponse createAppendResponse(long offset) {
return AppendRowsResponse.newBuilder()
.setAppendResult(
Expand Down

0 comments on commit 35db0fb

Please sign in to comment.