Skip to content

Commit

Permalink
feat: Add another StreamWriterClosedException and remove RETRY_THRESH…
Browse files Browse the repository at this point in the history
…OLD (#1713)

* feat: Add another StreamWriterClosedException and remove RETRY_THRESHOLD

* .

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* .

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
yirutang and gcf-owl-bot[bot] authored Jul 27, 2022
1 parent ed3fe1f commit f8d1bd9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ public class StreamWriter implements AutoCloseable {
@GuardedBy("lock")
private boolean inflightCleanuped = false;

/*
* Retry threshold, limits how often the connection is retried before processing halts.
*/
private static final long RETRY_THRESHOLD = 3;

/*
* Indicates whether user has called Close() or not.
*/
Expand Down Expand Up @@ -327,10 +322,11 @@ private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message)

if (connectionFinalStatus != null) {
requestWrapper.appendResult.setException(
new StatusRuntimeException(
new Exceptions.StreamWriterClosedException(
Status.fromCode(Status.Code.FAILED_PRECONDITION)
.withDescription(
"Connection is closed due to " + connectionFinalStatus.toString())));
"Connection is closed due to " + connectionFinalStatus.toString()),
streamName));
return requestWrapper.appendResult;
}

Expand Down Expand Up @@ -653,9 +649,7 @@ private void doneCallback(Throwable finalStatus) {
this.streamConnectionIsConnected = false;
if (connectionFinalStatus == null) {
// If the error can be retried, don't set it here, let it try to retry later on.
if (isRetriableError(finalStatus)
&& conectionRetryCountWithoutCallback < RETRY_THRESHOLD
&& !userClosed) {
if (isRetriableError(finalStatus) && !userClosed) {
this.conectionRetryCountWithoutCallback++;
log.fine(
"Retriable error "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,14 +655,33 @@ public void testWriterClosedStream() throws Exception {
}

@Test
public void testWriterException() throws Exception {
public void testWriterAlreadyClosedException() throws Exception {
StreamWriter writer = getTestStreamWriter();
writer.close();
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 0);
Exceptions.StreamWriterClosedException actualError =
assertFutureException(Exceptions.StreamWriterClosedException.class, appendFuture1);
// The basic StatusRuntimeException API is not changed.
assertTrue(actualError instanceof StatusRuntimeException);
assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode());
assertTrue(actualError.getStatus().getDescription().contains("Connection is already closed"));
}

@Test
public void testWriterClosedException() throws Exception {
StreamWriter writer = getTestStreamWriter();
testBigQueryWrite.addException(Status.INTERNAL.asException());
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 0);
try {
appendFuture1.get();
} catch (Exception e) {
}
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"A"}, 0);
Exceptions.StreamWriterClosedException actualError =
assertFutureException(Exceptions.StreamWriterClosedException.class, appendFuture2);
// The basic StatusRuntimeException API is not changed.
assertTrue(actualError instanceof StatusRuntimeException);
assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode());
assertTrue(actualError.getStatus().getDescription().contains("Connection is closed"));
}
}

0 comments on commit f8d1bd9

Please sign in to comment.