From 1a69192e2ffc6475a7e4b67c5a452f1c0e8aaddc Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Tue, 27 Sep 2022 16:51:10 -0700 Subject: [PATCH] feat: fix some todos and reject stream writer if it's created with mixed behavior of passed in client or not (#1803) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not Co-authored-by: Owl Bot --- .../storage/v1/ConnectionWorkerPool.java | 16 ++-- .../bigquery/storage/v1/StreamWriter.java | 87 ++++++++++--------- .../bigquery/storage/v1/StreamWriterTest.java | 21 +++++ 3 files changed, 78 insertions(+), 46 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index bdddeca12d..3e042eb115 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -324,18 +325,19 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w // Though atomic integer is super lightweight, add extra if check in case adding future logic. testValueCreateConnectionCount.getAndIncrement(); } - // TODO(gaole): figure out a better way to handle header / request body mismatch - // currently we use different header for the client in each connection worker to be different + // currently we use different header for the client in each connection worker to be different // as the backend require the header to have the same write_stream field as request body. BigQueryWriteClient clientAfterModification = client; if (ownsBigQueryWriteClient) { BigQueryWriteSettings settings = client.getSettings(); + + // Every header to write api is required to set write_stream in the header to help routing + // the request to correct region. + HashMap newHeaders = new HashMap<>(); + newHeaders.putAll(settings.toBuilder().getHeaderProvider().getHeaders()); + newHeaders.put("x-goog-request-params", "write_stream=" + streamName); BigQueryWriteSettings stubSettings = - settings - .toBuilder() - .setHeaderProvider( - FixedHeaderProvider.create("x-goog-request-params", "write_stream=" + streamName)) - .build(); + settings.toBuilder().setHeaderProvider(FixedHeaderProvider.create(newHeaders)).build(); clientAfterModification = BigQueryWriteClient.create(stubSettings); } ConnectionWorker connectionWorker = diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 9ae3440780..aa96ae66dd 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -33,7 +33,6 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import java.util.logging.Logger; /** @@ -68,6 +67,9 @@ public class StreamWriter implements AutoCloseable { */ private final SingleConnectionOrConnectionPool singleConnectionOrConnectionPool; + /** Test only param to tell how many times a client is created. */ + private static int testOnlyClientCreatedTimes = 0; + /** * Static map from {@link ConnectionPoolKey} to connection pool. Note this map is static to be * shared by every stream writer in the same process. @@ -169,25 +171,7 @@ private StreamWriter(Builder builder) throws IOException { this.streamName = builder.streamName; this.writerSchema = builder.writerSchema; this.location = builder.location; - boolean ownsBigQueryWriteClient; - if (builder.client == null) { - BigQueryWriteSettings stubSettings = - BigQueryWriteSettings.newBuilder() - .setCredentialsProvider(builder.credentialsProvider) - .setTransportChannelProvider(builder.channelProvider) - .setBackgroundExecutorProvider(builder.executorProvider) - .setEndpoint(builder.endpoint) - // (b/185842996): Temporily fix this by explicitly providing the header. - .setHeaderProvider( - FixedHeaderProvider.create( - "x-goog-request-params", "write_stream=" + this.streamName)) - .build(); - client = BigQueryWriteClient.create(stubSettings); - ownsBigQueryWriteClient = true; - } else { - client = builder.client; - ownsBigQueryWriteClient = false; - } + boolean ownsBigQueryWriteClient = builder.client == null; if (!builder.enableConnectionPool) { this.singleConnectionOrConnectionPool = SingleConnectionOrConnectionPool.ofSingleConnection( @@ -198,7 +182,7 @@ private StreamWriter(Builder builder) throws IOException { builder.maxInflightBytes, builder.limitExceededBehavior, builder.traceId, - client, + getBigQueryWriteClient(builder), ownsBigQueryWriteClient)); } else { if (builder.location == null || builder.location.isEmpty()) { @@ -212,29 +196,39 @@ private StreamWriter(Builder builder) throws IOException { SingleConnectionOrConnectionPool.ofConnectionPool( connectionPoolMap.computeIfAbsent( ConnectionPoolKey.create(builder.location), - (key) -> - new ConnectionWorkerPool( + (key) -> { + try { + return new ConnectionWorkerPool( builder.maxInflightRequest, builder.maxInflightBytes, builder.limitExceededBehavior, builder.traceId, - client, - ownsBigQueryWriteClient))); + getBigQueryWriteClient(builder), + ownsBigQueryWriteClient); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); validateFetchedConnectonPool(builder); - // Shut down the passed in client. Internally we will create another client inside connection - // pool for every new connection worker. - // TODO(gaole): instead of perform close outside of pool approach, change to always create - // new client in connection. - if (client != singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient() - && ownsBigQueryWriteClient) { - client.shutdown(); - try { - client.awaitTermination(150, TimeUnit.SECONDS); - } catch (InterruptedException unused) { - // Ignore interruption as this client is not used. - } - client.close(); - } + } + } + + private BigQueryWriteClient getBigQueryWriteClient(Builder builder) throws IOException { + if (builder.client == null) { + BigQueryWriteSettings stubSettings = + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(builder.credentialsProvider) + .setTransportChannelProvider(builder.channelProvider) + .setEndpoint(builder.endpoint) + // (b/185842996): Temporily fix this by explicitly providing the header. + .setHeaderProvider( + FixedHeaderProvider.create( + "x-goog-request-params", "write_stream=" + this.streamName)) + .build(); + testOnlyClientCreatedTimes++; + return BigQueryWriteClient.create(stubSettings); + } else { + return builder.client; } } @@ -245,6 +239,10 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) { this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(), builder.traceId)) { paramsValidatedFailed = "Trace id"; + } else if (!Objects.equals( + this.singleConnectionOrConnectionPool.connectionWorkerPool().ownsBigQueryWriteClient(), + builder.client == null)) { + paramsValidatedFailed = "Whether using passed in clients"; } else if (!Objects.equals( this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(), builder.limitExceededBehavior)) { @@ -361,6 +359,17 @@ SingleConnectionOrConnectionPool.Kind getConnectionOperationType() { return singleConnectionOrConnectionPool.getKind(); } + @VisibleForTesting + static int getTestOnlyClientCreatedTimes() { + return testOnlyClientCreatedTimes; + } + + @VisibleForTesting + static void cleanUp() { + testOnlyClientCreatedTimes = 0; + connectionPoolMap.clear(); + } + /** A builder of {@link StreamWriter}s. */ public static final class Builder { private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 8ceeff4daf..2cf8a60b29 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -82,6 +82,7 @@ public void setUp() throws Exception { .setCredentialsProvider(NoCredentialsProvider.create()) .setTransportChannelProvider(serviceHelper.createChannelProvider()) .build()); + StreamWriter.cleanUp(); } @After @@ -89,6 +90,7 @@ public void tearDown() throws Exception { log.info("tearDown called"); client.close(); serviceHelper.stop(); + StreamWriter.cleanUp(); } private StreamWriter getMultiplexingTestStreamWriter() throws IOException { @@ -722,6 +724,25 @@ public void testInitialization_operationKind() throws Exception { } } + @Test + public void createStreamWithDifferentWhetherOwnsClient() throws Exception { + StreamWriter streamWriter1 = getMultiplexingTestStreamWriter(); + + assertThrows( + IllegalArgumentException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + StreamWriter.newBuilder(TEST_STREAM) + .setWriterSchema(createProtoSchema()) + .setTraceId(TEST_TRACE_ID) + .setLocation("US") + .setEnableConnectionPool(true) + .build(); + } + }); + } + // Timeout to ensure close() doesn't wait for done callback timeout. @Test(timeout = 10000) public void testCloseDisconnectedStream() throws Exception {