Skip to content

Commit

Permalink
feat: add public api to stream writer to set the maximum wait time
Browse files Browse the repository at this point in the history
  • Loading branch information
GaoleMeng committed Mar 30, 2023
1 parent a989ac6 commit b2c1dfb
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class ConnectionWorker implements AutoCloseable {
* We will constantly checking how much time we have been waiting for the next request callback
* if we wait too much time we will start shutting down the connections and clean up the queues.
*/
private static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15);
static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15);

private Lock lock;
private Condition hasMessageInWaitingQueue;
Expand Down Expand Up @@ -671,10 +671,10 @@ private void appendLoop() {
// considered the same but is not considered equals(). However as long as it's never provide
// false negative we will always correctly pass writer schema to backend.
if ((!originalRequest.getWriteStream().isEmpty()
&& !streamName.isEmpty()
&& !originalRequest.getWriteStream().equals(streamName))
&& !streamName.isEmpty()
&& !originalRequest.getWriteStream().equals(streamName))
|| (originalRequest.getProtoRows().hasWriterSchema()
&& !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema))) {
&& !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema))) {
streamName = originalRequest.getWriteStream();
writerSchema = originalRequest.getProtoRows().getWriterSchema();
isMultiplexing = true;
Expand Down Expand Up @@ -721,8 +721,8 @@ private void appendLoop() {
+ userClosed
+ " final exception: "
+ (this.connectionFinalStatus == null
? "null"
: this.connectionFinalStatus.toString()));
? "null"
: this.connectionFinalStatus.toString()));
// At this point, the waiting queue is drained, so no more requests.
// We can close the stream connection and handle the remaining inflight requests.
if (streamConnection != null) {
Expand Down Expand Up @@ -950,8 +950,8 @@ private void doneCallback(Throwable finalStatus) {
if (isConnectionErrorRetriable(finalStatus)
&& !userClosed
&& (maxRetryDuration.toMillis() == 0f
|| System.currentTimeMillis() - connectionRetryStartTime
<= maxRetryDuration.toMillis())) {
|| System.currentTimeMillis() - connectionRetryStartTime
<= maxRetryDuration.toMillis())) {
this.conectionRetryCountWithoutCallback++;
log.info(
"Retriable error "
Expand All @@ -960,7 +960,7 @@ private void doneCallback(Throwable finalStatus) {
+ conectionRetryCountWithoutCallback
+ ", millis left to retry "
+ (maxRetryDuration.toMillis()
- (System.currentTimeMillis() - connectionRetryStartTime))
- (System.currentTimeMillis() - connectionRetryStartTime))
+ ", for stream "
+ streamName
+ " id:"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public String getLocation() {

/** @return the missing value interpretation map used for the writer. */
public Map<String, AppendRowsRequest.MissingValueInterpretation>
getMissingValueInterpretationMap() {
getMissingValueInterpretationMap() {
return missingValueInterpretationMap;
}

Expand Down Expand Up @@ -518,6 +518,17 @@ public synchronized TableSchema getUpdatedSchema() {
: null;
}

/**
* Sets the maximum time a request is allowed to be waiting in request waiting queue. Under very
* low chance, it's possible for append request to be waiting indefintely for request callback
* when Google networking SDK does not detect the networking breakage. The default timeout is
* 15 minutes.
* We are finding the root cause of the underlying SDK bug for callback not being triggered.
*/
public static void setMaxInflightRequestWaitTime(Duration waitTime) {
ConnectionWorker.MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = waitTime;
}

long getCreationTimestamp() {
return creationTimestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigquery.storage.v1;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -947,6 +948,36 @@ public void testMessageTooLarge() throws Exception {
writer.close();
}


@Test
public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
StreamWriter.setMaxInflightRequestWaitTime(java.time.Duration.ofSeconds(1));
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3));

long appendCount = 10;
for (int i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

// In total insert 5 requests,
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
futures.add(writer.append(createProtoRows(new String[]{String.valueOf(i)}), i));
}

for (int i = 0; i < appendCount; i++) {
int finalI = i;
ExecutionException ex =
assertThrows(
ExecutionException.class,
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
}
}

@Test
public void testAppendWithResetSuccess() throws Exception {
try (StreamWriter writer = getTestStreamWriter()) {
Expand Down

0 comments on commit b2c1dfb

Please sign in to comment.