Skip to content

Commit

Permalink
feat: Enable instream retry for default streams when Multiplexing. (#…
Browse files Browse the repository at this point in the history
…2376)

* feat: Enable instream retry for default streams when Multiplexing.

* enable retrysettings for multiplexing test writers

---------

Co-authored-by: Evan Greco <[email protected]>
  • Loading branch information
egreco12 and Evan Greco authored Jan 25, 2024
1 parent c00661c commit 9a18523
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public class ConnectionWorkerPool {
*/
private final java.time.Duration maxRetryDuration;

/*
* Retry settings for in-stream retries.
*/
private RetrySettings retrySettings;

/*
Expand Down Expand Up @@ -208,7 +211,8 @@ public abstract static class Builder {
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
@Nullable String comperssorName,
BigQueryWriteSettings clientSettings) {
BigQueryWriteSettings clientSettings,
RetrySettings retrySettings) {
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.maxRetryDuration = maxRetryDuration;
Expand All @@ -217,8 +221,7 @@ public abstract static class Builder {
this.compressorName = comperssorName;
this.clientSettings = clientSettings;
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
// In-stream retry is not enabled for multiplexing.
this.retrySettings = null;
this.retrySettings = retrySettings;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,6 @@ private StreamWriter(Builder builder) throws IOException {
"Trying to enable connection pool in non-default stream.");
}

if (builder.retrySettings != null) {
log.warning("Retry settings is only allowed when connection pool is not enabled.");
throw new IllegalArgumentException(
"Trying to enable connection pool while providing retry settings.");
}

// We need a client to perform some getWriteStream calls.
BigQueryWriteClient client =
builder.client != null ? builder.client : new BigQueryWriteClient(clientSettings);
Expand Down Expand Up @@ -295,7 +289,8 @@ private StreamWriter(Builder builder) throws IOException {
builder.limitExceededBehavior,
builder.traceId,
builder.compressorName,
client.getSettings());
client.getSettings(),
builder.retrySettings);
}));
validateFetchedConnectonPool(builder);
// If the client is not from outside, then shutdown the client we created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
import com.google.common.util.concurrent.ListeningExecutorService;
Expand Down Expand Up @@ -58,6 +59,17 @@ public class ConnectionWorkerPoolTest {
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/_default";
private static final String TEST_STREAM_2 = "projects/p1/datasets/d1/tables/t2/streams/_default";
private static final int MAX_RETRY_NUM_ATTEMPTS = 3;
private static final long INITIAL_RETRY_MILLIS = 500;
private static final double RETRY_MULTIPLIER = 1.3;
private static final int MAX_RETRY_DELAY_MINUTES = 5;
private static final RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(INITIAL_RETRY_MILLIS))
.setRetryDelayMultiplier(RETRY_MULTIPLIER)
.setMaxAttempts(MAX_RETRY_NUM_ATTEMPTS)
.setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(MAX_RETRY_DELAY_MINUTES))
.build();

@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -398,6 +410,7 @@ public void testCloseExternalClient()
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setLocation("us")
.setRetrySettings(retrySettings)
.build());
}

Expand Down Expand Up @@ -483,6 +496,7 @@ ConnectionWorkerPool createConnectionWorkerPool(
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
clientSettings);
clientSettings,
retrySettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -536,25 +536,6 @@ public void testShortenStreamNameAllowed() throws Exception {
.build();
}

@Test
public void testNoRetryWhenConnectionPoolEnabled() throws Exception {
IllegalArgumentException ex =
assertThrows(
IllegalArgumentException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter.newBuilder(TEST_STREAM_SHORTEN, client)
.setEnableConnectionPool(true)
.setRetrySettings(RetrySettings.newBuilder().build())
.build();
}
});
assertTrue(
ex.getMessage()
.contains("Trying to enable connection pool while providing retry settings."));
}

@Test
public void testAppendSuccessAndConnectionError() throws Exception {
StreamWriter writer =
Expand Down Expand Up @@ -1429,6 +1410,7 @@ public StreamWriter getMultiplexingStreamWriter(String streamName) throws IOExce
.setMaxInflightRequests(10)
.setLocation("US")
.setMaxRetryDuration(java.time.Duration.ofMillis(100))
.setRetrySettings(retrySettings)
.build();
}

Expand Down

0 comments on commit 9a18523

Please sign in to comment.