From 5a63d958688f45aa065b801e4e181c1e84c0f58f Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Thu, 8 Sep 2022 20:21:58 -0700 Subject: [PATCH 01/12] feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client --- .github/.OwlBot.yaml | 1 + .../bigquery/storage/v1/ConnectionWorker.java | 675 ++++++++++++++++++ .../bigquery/storage/v1/StreamWriter.java | 613 +--------------- 3 files changed, 699 insertions(+), 590 deletions(-) create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java diff --git a/.github/.OwlBot.yaml b/.github/.OwlBot.yaml index 1a3a604eaf..ec7bb13f06 100644 --- a/.github/.OwlBot.yaml +++ b/.github/.OwlBot.yaml @@ -77,6 +77,7 @@ deep-preserve-regex: - "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1.*/StreamWriterV2.java" - "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1.*/Waiter.java" - "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java" +- "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java" deep-copy-regex: - source: "/google/cloud/bigquery/storage/(v.*)/.*-java/proto-google-.*/src" diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java new file mode 100644 index 0000000000..36bf7bbaa7 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -0,0 +1,675 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.batching.FlowController; +import com.google.cloud.bigquery.storage.util.Errors; +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData; +import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback; +import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback; +import com.google.common.util.concurrent.Uninterruptibles; +import com.google.protobuf.Int64Value; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.StatusRuntimeException; +import java.io.IOException; +import java.util.Deque; +import java.util.LinkedList; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.concurrent.GuardedBy; + +/** + * A BigQuery Stream Writer that can be used to write data into BigQuery Table. + * + *

TODO: Support batching. + */ +public class ConnectionWorker implements AutoCloseable { + private static final Logger log = Logger.getLogger(StreamWriter.class.getName()); + + private Lock lock; + private Condition hasMessageInWaitingQueue; + private Condition inflightReduced; + + /* + * The identifier of stream to write to. + */ + private final String streamName; + + /* + * The proto schema of rows to write. + */ + private final ProtoSchema writerSchema; + + /* + * Max allowed inflight requests in the stream. Method append is blocked at this. + */ + private final long maxInflightRequests; + + /* + * Max allowed inflight bytes in the stream. Method append is blocked at this. + */ + private final long maxInflightBytes; + + /* + * Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block. + */ + private final FlowController.LimitExceededBehavior limitExceededBehavior; + + /* + * TraceId for debugging purpose. + */ + private final String traceId; + + /* + * Tracks current inflight requests in the stream. + */ + @GuardedBy("lock") + private long inflightRequests = 0; + + /* + * Tracks current inflight bytes in the stream. + */ + @GuardedBy("lock") + private long inflightBytes = 0; + + /* + * Tracks how often the stream was closed due to a retriable error. Streaming will stop when the + * count hits a threshold. Streaming should only be halted, if it isn't possible to establish a + * connection. Keep track of the number of reconnections in succession. This will be reset if + * a row is successfully called back. + */ + @GuardedBy("lock") + private long conectionRetryCountWithoutCallback = 0; + + /* + * If false, streamConnection needs to be reset. + */ + @GuardedBy("lock") + private boolean streamConnectionIsConnected = false; + + /* + * A boolean to track if we cleaned up inflight queue. + */ + @GuardedBy("lock") + private boolean inflightCleanuped = false; + + /* + * Indicates whether user has called Close() or not. + */ + @GuardedBy("lock") + private boolean userClosed = false; + + /* + * The final status of connection. Set to nonnull when connection is permanently closed. + */ + @GuardedBy("lock") + private Throwable connectionFinalStatus = null; + + /* + * Contains requests buffered in the client and not yet sent to server. + */ + @GuardedBy("lock") + private final Deque waitingRequestQueue; + + /* + * Contains sent append requests waiting for response from server. + */ + @GuardedBy("lock") + private final Deque inflightRequestQueue; + + /* + * Contains the updated TableSchema. + */ + @GuardedBy("lock") + private TableSchema updatedSchema; + + /* + * A client used to interact with BigQuery. + */ + private BigQueryWriteClient client; + + /* + * If true, the client above is created by this writer and should be closed. + */ + private boolean ownsBigQueryWriteClient = false; + + /* + * Wraps the underlying bi-directional stream connection with server. + */ + private StreamConnection streamConnection; + + /* + * A separate thread to handle actual communication with server. + */ + private Thread appendThread; + + /* + * The inflight wait time for the previous sent request. + */ + private final AtomicLong inflightWaitSec = new AtomicLong(0); + + /* + * A String that uniquely identifies this writer. + */ + private final String writerId = UUID.randomUUID().toString(); + + /** The maximum size of one request. Defined by the API. */ + public static long getApiMaxRequestBytes() { + return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) + } + + public ConnectionWorker( + String streamName, + ProtoSchema writerSchema, + long maxInflightRequests, + long maxInflightBytes, + FlowController.LimitExceededBehavior limitExceededBehavior, + String traceId, + BigQueryWriteClient client, + boolean ownsBigQueryWriteClient) + throws IOException { + this.lock = new ReentrantLock(); + this.hasMessageInWaitingQueue = lock.newCondition(); + this.inflightReduced = lock.newCondition(); + this.streamName = streamName; + if (writerSchema == null) { + throw new StatusRuntimeException( + Status.fromCode(Code.INVALID_ARGUMENT) + .withDescription("Writer schema must be provided when building this writer.")); + } + this.writerSchema = writerSchema; + this.maxInflightRequests = maxInflightRequests; + this.maxInflightBytes = maxInflightBytes; + this.limitExceededBehavior = limitExceededBehavior; + this.traceId = traceId; + this.waitingRequestQueue = new LinkedList(); + this.inflightRequestQueue = new LinkedList(); + this.client = client; + this.ownsBigQueryWriteClient = ownsBigQueryWriteClient; + + this.appendThread = + new Thread( + new Runnable() { + @Override + public void run() { + appendLoop(); + } + }); + this.appendThread.start(); + } + + private void resetConnection() { + this.streamConnection = + new StreamConnection( + this.client, + new RequestCallback() { + @Override + public void run(AppendRowsResponse response) { + requestCallback(response); + } + }, + new DoneCallback() { + @Override + public void run(Throwable finalStatus) { + doneCallback(finalStatus); + } + }); + } + + /** Schedules the writing of rows at the end of current stream. */ + public ApiFuture append(ProtoRows rows) { + return append(rows, -1); + } + + /** Schedules the writing of rows at given offset. */ + public ApiFuture append(ProtoRows rows, long offset) { + AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); + requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build()); + if (offset >= 0) { + requestBuilder.setOffset(Int64Value.of(offset)); + } + return appendInternal(requestBuilder.build()); + } + + private ApiFuture appendInternal(AppendRowsRequest message) { + AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message); + if (requestWrapper.messageSize > getApiMaxRequestBytes()) { + requestWrapper.appendResult.setException( + new StatusRuntimeException( + Status.fromCode(Code.INVALID_ARGUMENT) + .withDescription( + "MessageSize is too large. Max allow: " + + getApiMaxRequestBytes() + + " Actual: " + + requestWrapper.messageSize))); + return requestWrapper.appendResult; + } + this.lock.lock(); + try { + if (userClosed) { + requestWrapper.appendResult.setException( + new Exceptions.StreamWriterClosedException( + Status.fromCode(Status.Code.FAILED_PRECONDITION) + .withDescription("Connection is already closed"), + streamName, + writerId)); + return requestWrapper.appendResult; + } + // Check if queue is going to be full before adding the request. + if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) { + if (this.inflightRequests + 1 >= this.maxInflightRequests) { + throw new Exceptions.InflightRequestsLimitExceededException( + writerId, this.maxInflightRequests); + } + if (this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes) { + throw new Exceptions.InflightBytesLimitExceededException(writerId, this.maxInflightBytes); + } + } + + if (connectionFinalStatus != null) { + requestWrapper.appendResult.setException( + new Exceptions.StreamWriterClosedException( + Status.fromCode(Status.Code.FAILED_PRECONDITION) + .withDescription( + "Connection is closed due to " + connectionFinalStatus.toString()), + streamName, + writerId)); + return requestWrapper.appendResult; + } + + ++this.inflightRequests; + this.inflightBytes += requestWrapper.messageSize; + waitingRequestQueue.addLast(requestWrapper); + hasMessageInWaitingQueue.signal(); + maybeWaitForInflightQuota(); + return requestWrapper.appendResult; + } finally { + this.lock.unlock(); + } + } + + @GuardedBy("lock") + private void maybeWaitForInflightQuota() { + long start_time = System.currentTimeMillis(); + while (this.inflightRequests >= this.maxInflightRequests + || this.inflightBytes >= this.maxInflightBytes) { + try { + inflightReduced.await(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.warning( + "Interrupted while waiting for inflight quota. Stream: " + + streamName + + " Error: " + + e.toString()); + throw new StatusRuntimeException( + Status.fromCode(Code.CANCELLED) + .withCause(e) + .withDescription("Interrupted while waiting for quota.")); + } + } + inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000); + } + + public long getInflightWaitSeconds() { + return inflightWaitSec.longValue(); + } + + /** @return a unique Id for the writer. */ + public String getWriterId() { + return writerId; + } + + /** Close the stream writer. Shut down all resources. */ + @Override + public void close() { + log.info("User closing stream: " + streamName); + this.lock.lock(); + try { + this.userClosed = true; + } finally { + this.lock.unlock(); + } + log.fine("Waiting for append thread to finish. Stream: " + streamName); + try { + appendThread.join(); + log.info("User close complete. Stream: " + streamName); + } catch (InterruptedException e) { + // Unexpected. Just swallow the exception with logging. + log.warning( + "Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString()); + } + if (this.ownsBigQueryWriteClient) { + this.client.close(); + try { + // Backend request has a 2 minute timeout, so wait a little longer than that. + this.client.awaitTermination(150, TimeUnit.SECONDS); + } catch (InterruptedException ignored) { + } + } + } + + /* + * This loop is executed in a separate thread. + * + * It takes requests from waiting queue and sends them to server. + */ + private void appendLoop() { + Deque localQueue = new LinkedList(); + boolean streamNeedsConnecting = false; + // Set firstRequestInConnection to true immediately after connecting the steam, + // indicates then next row sent, needs the schema and other metadata. + boolean isFirstRequestInConnection = true; + while (!waitingQueueDrained()) { + this.lock.lock(); + try { + hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS); + // Copy the streamConnectionIsConnected guarded by lock to a local variable. + // In addition, only reconnect if there is a retriable error. + streamNeedsConnecting = !streamConnectionIsConnected && connectionFinalStatus == null; + if (streamNeedsConnecting) { + // If the stream connection is broken, any requests on inflightRequestQueue will need + // to be resent, as the new connection has no knowledge of the requests. Copy the requests + // from inflightRequestQueue and prepent them onto the waitinRequestQueue. They need to be + // prepended as they need to be sent before new requests. + while (!inflightRequestQueue.isEmpty()) { + waitingRequestQueue.addFirst(inflightRequestQueue.pollLast()); + } + } + while (!this.waitingRequestQueue.isEmpty()) { + AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst(); + this.inflightRequestQueue.addLast(requestWrapper); + localQueue.addLast(requestWrapper); + } + } catch (InterruptedException e) { + log.warning( + "Interrupted while waiting for message. Stream: " + + streamName + + " Error: " + + e.toString()); + } finally { + this.lock.unlock(); + } + + if (localQueue.isEmpty()) { + continue; + } + if (streamNeedsConnecting) { + // Set streamConnectionIsConnected to true, to indicate the stream has been connected. This + // should happen before the call to resetConnection. As it is unknown when the connection + // could be closed and the doneCallback called, and thus clearing the flag. + lock.lock(); + try { + this.streamConnectionIsConnected = true; + } finally { + lock.unlock(); + } + resetConnection(); + // Set firstRequestInConnection to indicate the next request to be sent should include + // metedata. + isFirstRequestInConnection = true; + } + while (!localQueue.isEmpty()) { + AppendRowsRequest preparedRequest = + prepareRequestBasedOnPosition( + localQueue.pollFirst().message, isFirstRequestInConnection); + // Send should only throw an exception if there is a problem with the request. The catch + // block will handle this case, and return the exception with the result. + // Otherwise send will return: + // SUCCESS: Message was sent, wait for the callback. + // STREAM_CLOSED: Stream was closed, normally or due to en error + // NOT_ENOUGH_QUOTA: Message wasn't sent due to not enough quota. + // TODO: Handle NOT_ENOUGH_QUOTA. + // In the close case, the request is in the inflight queue, and will either be returned + // to the user with an error, or will be resent. + this.streamConnection.send(preparedRequest); + isFirstRequestInConnection = false; + } + } + + log.fine("Cleanup starts. Stream: " + streamName); + // At this point, the waiting queue is drained, so no more requests. + // We can close the stream connection and handle the remaining inflight requests. + if (streamConnection != null) { + this.streamConnection.close(); + waitForDoneCallback(3, TimeUnit.MINUTES); + } + + // At this point, there cannot be more callback. It is safe to clean up all inflight requests. + log.fine( + "Stream connection is fully closed. Cleaning up inflight requests. Stream: " + streamName); + cleanupInflightRequests(); + log.fine("Append thread is done. Stream: " + streamName); + } + + /* + * Returns true if waiting queue is drain, a.k.a. no more requests in the waiting queue. + * + * It serves as a signal to append thread that there cannot be any more requests in the waiting + * queue and it can prepare to stop. + */ + private boolean waitingQueueDrained() { + this.lock.lock(); + try { + return (this.userClosed || this.connectionFinalStatus != null) + && this.waitingRequestQueue.isEmpty(); + } finally { + this.lock.unlock(); + } + } + + private void waitForDoneCallback(long duration, TimeUnit timeUnit) { + log.fine("Waiting for done callback from stream connection. Stream: " + streamName); + long deadline = System.nanoTime() + timeUnit.toNanos(duration); + while (System.nanoTime() <= deadline) { + this.lock.lock(); + try { + if (!this.streamConnectionIsConnected) { + // Done callback is received, return. + return; + } + } finally { + this.lock.unlock(); + } + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + this.lock.lock(); + try { + if (connectionFinalStatus == null) { + connectionFinalStatus = + new StatusRuntimeException( + Status.fromCode(Code.CANCELLED) + .withDescription("Timeout waiting for DoneCallback.")); + } + } finally { + this.lock.unlock(); + } + + return; + } + + private AppendRowsRequest prepareRequestBasedOnPosition( + AppendRowsRequest original, boolean isFirstRequest) { + AppendRowsRequest.Builder requestBuilder = original.toBuilder(); + if (isFirstRequest) { + if (this.writerSchema != null) { + requestBuilder.getProtoRowsBuilder().setWriterSchema(this.writerSchema); + } + requestBuilder.setWriteStream(this.streamName); + if (this.traceId != null) { + requestBuilder.setTraceId(this.traceId); + } + } else { + requestBuilder.clearWriteStream(); + requestBuilder.getProtoRowsBuilder().clearWriterSchema(); + } + return requestBuilder.build(); + } + + private void cleanupInflightRequests() { + Throwable finalStatus = + new Exceptions.StreamWriterClosedException( + Status.fromCode(Status.Code.FAILED_PRECONDITION) + .withDescription("Connection is already closed, cleanup inflight request"), + streamName, + writerId); + Deque localQueue = new LinkedList(); + this.lock.lock(); + try { + if (this.connectionFinalStatus != null) { + finalStatus = this.connectionFinalStatus; + } + while (!this.inflightRequestQueue.isEmpty()) { + localQueue.addLast(pollInflightRequestQueue()); + } + this.inflightCleanuped = true; + } finally { + this.lock.unlock(); + } + log.fine("Cleaning " + localQueue.size() + " inflight requests with error: " + finalStatus); + while (!localQueue.isEmpty()) { + localQueue.pollFirst().appendResult.setException(finalStatus); + } + } + + private void requestCallback(AppendRowsResponse response) { + AppendRequestAndResponse requestWrapper; + this.lock.lock(); + if (response.hasUpdatedSchema()) { + this.updatedSchema = response.getUpdatedSchema(); + } + try { + // Had a successful connection with at least one result, reset retries. + // conectionRetryCountWithoutCallback is reset so that only multiple retries, without + // successful records sent, will cause the stream to fail. + if (conectionRetryCountWithoutCallback != 0) { + conectionRetryCountWithoutCallback = 0; + } + if (!this.inflightRequestQueue.isEmpty()) { + requestWrapper = pollInflightRequestQueue(); + } else if (inflightCleanuped) { + // It is possible when requestCallback is called, the inflight queue is already drained + // because we timed out waiting for done. + return; + } else { + // This is something not expected, we shouldn't have an empty inflight queue otherwise. + log.log(Level.WARNING, "Unexpected: request callback called on an empty inflight queue."); + connectionFinalStatus = + new StatusRuntimeException( + Status.fromCode(Code.FAILED_PRECONDITION) + .withDescription("Request callback called on an empty inflight queue.")); + return; + } + } finally { + this.lock.unlock(); + } + if (response.hasError()) { + Exceptions.StorageException storageException = + Exceptions.toStorageException(response.getError(), null); + if (storageException != null) { + requestWrapper.appendResult.setException(storageException); + } else { + StatusRuntimeException exception = + new StatusRuntimeException( + Status.fromCodeValue(response.getError().getCode()) + .withDescription(response.getError().getMessage())); + requestWrapper.appendResult.setException(exception); + } + } else { + requestWrapper.appendResult.set(response); + } + } + + private boolean isRetriableError(Throwable t) { + Status status = Status.fromThrowable(t); + if (Errors.isRetryableInternalStatus(status)) { + return true; + } + return status.getCode() == Code.ABORTED + || status.getCode() == Code.UNAVAILABLE + || status.getCode() == Code.CANCELLED; + } + + private void doneCallback(Throwable finalStatus) { + log.fine( + "Received done callback. Stream: " + + streamName + + " Final status: " + + finalStatus.toString()); + this.lock.lock(); + try { + this.streamConnectionIsConnected = false; + if (connectionFinalStatus == null) { + // If the error can be retried, don't set it here, let it try to retry later on. + if (isRetriableError(finalStatus) && !userClosed) { + this.conectionRetryCountWithoutCallback++; + log.fine( + "Retriable error " + + finalStatus.toString() + + " received, retry count " + + conectionRetryCountWithoutCallback + + " for stream " + + streamName); + } else { + Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus); + this.connectionFinalStatus = storageException != null ? storageException : finalStatus; + log.info( + "Connection finished with error " + + finalStatus.toString() + + " for stream " + + streamName); + } + } + } finally { + this.lock.unlock(); + } + } + + @GuardedBy("lock") + private AppendRequestAndResponse pollInflightRequestQueue() { + AppendRequestAndResponse requestWrapper = this.inflightRequestQueue.pollFirst(); + --this.inflightRequests; + this.inflightBytes -= requestWrapper.messageSize; + this.inflightReduced.signal(); + return requestWrapper; + } + + /** Thread-safe getter of updated TableSchema */ + public synchronized TableSchema getUpdatedSchema() { + return this.updatedSchema; + } + + // Class that wraps AppendRowsRequest and its corresponding Response future. + private static final class AppendRequestAndResponse { + final SettableApiFuture appendResult; + final AppendRowsRequest message; + final long messageSize; + + AppendRequestAndResponse(AppendRowsRequest message) { + this.appendResult = SettableApiFuture.create(); + this.message = message; + this.messageSize = message.getProtoRows().getSerializedSize(); + } + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 151321e248..35eca74eec 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -16,33 +16,17 @@ package com.google.cloud.bigquery.storage.v1; import com.google.api.core.ApiFuture; -import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; -import com.google.cloud.bigquery.storage.util.Errors; -import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData; -import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback; -import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Uninterruptibles; -import com.google.protobuf.Int64Value; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; -import java.util.Deque; -import java.util.LinkedList; import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.concurrent.GuardedBy; /** * A BigQuery Stream Writer that can be used to write data into BigQuery Table. @@ -52,128 +36,13 @@ public class StreamWriter implements AutoCloseable { private static final Logger log = Logger.getLogger(StreamWriter.class.getName()); - private Lock lock; - private Condition hasMessageInWaitingQueue; - private Condition inflightReduced; + private final ConnectionWorker connectionWorker; /* * The identifier of stream to write to. */ private final String streamName; - /* - * The proto schema of rows to write. - */ - private final ProtoSchema writerSchema; - - /* - * Max allowed inflight requests in the stream. Method append is blocked at this. - */ - private final long maxInflightRequests; - - /* - * Max allowed inflight bytes in the stream. Method append is blocked at this. - */ - private final long maxInflightBytes; - - /* - * Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block. - */ - private final FlowController.LimitExceededBehavior limitExceededBehavior; - - /* - * TraceId for debugging purpose. - */ - private final String traceId; - - /* - * Tracks current inflight requests in the stream. - */ - @GuardedBy("lock") - private long inflightRequests = 0; - - /* - * Tracks current inflight bytes in the stream. - */ - @GuardedBy("lock") - private long inflightBytes = 0; - - /* - * Tracks how often the stream was closed due to a retriable error. Streaming will stop when the - * count hits a threshold. Streaming should only be halted, if it isn't possible to establish a - * connection. Keep track of the number of reconnections in succession. This will be reset if - * a row is successfully called back. - */ - @GuardedBy("lock") - private long conectionRetryCountWithoutCallback = 0; - - /* - * If false, streamConnection needs to be reset. - */ - @GuardedBy("lock") - private boolean streamConnectionIsConnected = false; - - /* - * A boolean to track if we cleaned up inflight queue. - */ - @GuardedBy("lock") - private boolean inflightCleanuped = false; - - /* - * Indicates whether user has called Close() or not. - */ - @GuardedBy("lock") - private boolean userClosed = false; - - /* - * The final status of connection. Set to nonnull when connection is permanently closed. - */ - @GuardedBy("lock") - private Throwable connectionFinalStatus = null; - - /* - * Contains requests buffered in the client and not yet sent to server. - */ - @GuardedBy("lock") - private final Deque waitingRequestQueue; - - /* - * Contains sent append requests waiting for response from server. - */ - @GuardedBy("lock") - private final Deque inflightRequestQueue; - - /* - * Contains the updated TableSchema. - */ - @GuardedBy("lock") - private TableSchema updatedSchema; - - /* - * A client used to interact with BigQuery. - */ - private BigQueryWriteClient client; - - /* - * If true, the client above is created by this writer and should be closed. - */ - private boolean ownsBigQueryWriteClient = false; - - /* - * Wraps the underlying bi-directional stream connection with server. - */ - private StreamConnection streamConnection; - - /* - * A separate thread to handle actual communication with server. - */ - private Thread appendThread; - - /* - * The inflight wait time for the previous sent request. - */ - private final AtomicLong inflightWaitSec = new AtomicLong(0); - /* * A String that uniquely identifies this writer. */ @@ -185,22 +54,9 @@ public static long getApiMaxRequestBytes() { } private StreamWriter(Builder builder) throws IOException { - this.lock = new ReentrantLock(); - this.hasMessageInWaitingQueue = lock.newCondition(); - this.inflightReduced = lock.newCondition(); + BigQueryWriteClient client; this.streamName = builder.streamName; - if (builder.writerSchema == null) { - throw new StatusRuntimeException( - Status.fromCode(Code.INVALID_ARGUMENT) - .withDescription("Writer schema must be provided when building this writer.")); - } - this.writerSchema = builder.writerSchema; - this.maxInflightRequests = builder.maxInflightRequest; - this.maxInflightBytes = builder.maxInflightBytes; - this.limitExceededBehavior = builder.limitExceededBehavior; - this.traceId = builder.traceId; - this.waitingRequestQueue = new LinkedList(); - this.inflightRequestQueue = new LinkedList(); + boolean ownsBigQueryWriteClient; if (builder.client == null) { BigQueryWriteSettings stubSettings = BigQueryWriteSettings.newBuilder() @@ -212,40 +68,22 @@ private StreamWriter(Builder builder) throws IOException { FixedHeaderProvider.create( "x-goog-request-params", "write_stream=" + this.streamName)) .build(); - this.client = BigQueryWriteClient.create(stubSettings); - this.ownsBigQueryWriteClient = true; + client = BigQueryWriteClient.create(stubSettings); + ownsBigQueryWriteClient = true; } else { - this.client = builder.client; - this.ownsBigQueryWriteClient = false; - } - - this.appendThread = - new Thread( - new Runnable() { - @Override - public void run() { - appendLoop(); - } - }); - this.appendThread.start(); - } - - private void resetConnection() { - this.streamConnection = - new StreamConnection( - this.client, - new RequestCallback() { - @Override - public void run(AppendRowsResponse response) { - requestCallback(response); - } - }, - new DoneCallback() { - @Override - public void run(Throwable finalStatus) { - doneCallback(finalStatus); - } - }); + client = builder.client; + ownsBigQueryWriteClient = false; + } + connectionWorker = + new ConnectionWorker( + builder.streamName, + builder.writerSchema, + builder.maxInflightRequest, + builder.maxInflightBytes, + builder.limitExceededBehavior, + builder.traceId, + client, + ownsBigQueryWriteClient); } /** @@ -285,91 +123,7 @@ public ApiFuture append(ProtoRows rows) { * @return the append response wrapped in a future. */ public ApiFuture append(ProtoRows rows, long offset) { - AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); - requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build()); - if (offset >= 0) { - requestBuilder.setOffset(Int64Value.of(offset)); - } - return appendInternal(requestBuilder.build()); - } - - private ApiFuture appendInternal(AppendRowsRequest message) { - AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message); - if (requestWrapper.messageSize > getApiMaxRequestBytes()) { - requestWrapper.appendResult.setException( - new StatusRuntimeException( - Status.fromCode(Code.INVALID_ARGUMENT) - .withDescription( - "MessageSize is too large. Max allow: " - + getApiMaxRequestBytes() - + " Actual: " - + requestWrapper.messageSize))); - return requestWrapper.appendResult; - } - this.lock.lock(); - try { - if (userClosed) { - requestWrapper.appendResult.setException( - new Exceptions.StreamWriterClosedException( - Status.fromCode(Status.Code.FAILED_PRECONDITION) - .withDescription("Connection is already closed"), - streamName, - writerId)); - return requestWrapper.appendResult; - } - // Check if queue is going to be full before adding the request. - if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) { - if (this.inflightRequests + 1 >= this.maxInflightRequests) { - throw new Exceptions.InflightRequestsLimitExceededException( - writerId, this.maxInflightRequests); - } - if (this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes) { - throw new Exceptions.InflightBytesLimitExceededException(writerId, this.maxInflightBytes); - } - } - - if (connectionFinalStatus != null) { - requestWrapper.appendResult.setException( - new Exceptions.StreamWriterClosedException( - Status.fromCode(Status.Code.FAILED_PRECONDITION) - .withDescription( - "Connection is closed due to " + connectionFinalStatus.toString()), - streamName, - writerId)); - return requestWrapper.appendResult; - } - - ++this.inflightRequests; - this.inflightBytes += requestWrapper.messageSize; - waitingRequestQueue.addLast(requestWrapper); - hasMessageInWaitingQueue.signal(); - maybeWaitForInflightQuota(); - return requestWrapper.appendResult; - } finally { - this.lock.unlock(); - } - } - - @GuardedBy("lock") - private void maybeWaitForInflightQuota() { - long start_time = System.currentTimeMillis(); - while (this.inflightRequests >= this.maxInflightRequests - || this.inflightBytes >= this.maxInflightBytes) { - try { - inflightReduced.await(100, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - log.warning( - "Interrupted while waiting for inflight quota. Stream: " - + streamName - + " Error: " - + e.toString()); - throw new StatusRuntimeException( - Status.fromCode(Code.CANCELLED) - .withCause(e) - .withDescription("Interrupted while waiting for quota.")); - } - } - inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000); + return this.connectionWorker.append(rows, offset); } /** @@ -381,12 +135,12 @@ private void maybeWaitForInflightQuota() { * stream case. */ public long getInflightWaitSeconds() { - return inflightWaitSec.longValue(); + return connectionWorker.getInflightWaitSeconds(); } /** @return a unique Id for the writer. */ public String getWriterId() { - return writerId; + return connectionWorker.getWriterId(); } /** @return name of the Stream that this writer is working on. */ @@ -397,315 +151,7 @@ public String getStreamName() { /** Close the stream writer. Shut down all resources. */ @Override public void close() { - log.info("User closing stream: " + streamName); - this.lock.lock(); - try { - this.userClosed = true; - } finally { - this.lock.unlock(); - } - log.fine("Waiting for append thread to finish. Stream: " + streamName); - try { - appendThread.join(); - log.info("User close complete. Stream: " + streamName); - } catch (InterruptedException e) { - // Unexpected. Just swallow the exception with logging. - log.warning( - "Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString()); - } - if (this.ownsBigQueryWriteClient) { - this.client.close(); - try { - // Backend request has a 2 minute timeout, so wait a little longer than that. - this.client.awaitTermination(150, TimeUnit.SECONDS); - } catch (InterruptedException ignored) { - } - } - } - - /* - * This loop is executed in a separate thread. - * - * It takes requests from waiting queue and sends them to server. - */ - private void appendLoop() { - Deque localQueue = new LinkedList(); - boolean streamNeedsConnecting = false; - // Set firstRequestInConnection to true immediately after connecting the steam, - // indicates then next row sent, needs the schema and other metadata. - boolean isFirstRequestInConnection = true; - while (!waitingQueueDrained()) { - this.lock.lock(); - try { - hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS); - // Copy the streamConnectionIsConnected guarded by lock to a local variable. - // In addition, only reconnect if there is a retriable error. - streamNeedsConnecting = !streamConnectionIsConnected && connectionFinalStatus == null; - if (streamNeedsConnecting) { - // If the stream connection is broken, any requests on inflightRequestQueue will need - // to be resent, as the new connection has no knowledge of the requests. Copy the requests - // from inflightRequestQueue and prepent them onto the waitinRequestQueue. They need to be - // prepended as they need to be sent before new requests. - while (!inflightRequestQueue.isEmpty()) { - waitingRequestQueue.addFirst(inflightRequestQueue.pollLast()); - } - } - while (!this.waitingRequestQueue.isEmpty()) { - AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst(); - this.inflightRequestQueue.addLast(requestWrapper); - localQueue.addLast(requestWrapper); - } - } catch (InterruptedException e) { - log.warning( - "Interrupted while waiting for message. Stream: " - + streamName - + " Error: " - + e.toString()); - } finally { - this.lock.unlock(); - } - - if (localQueue.isEmpty()) { - continue; - } - if (streamNeedsConnecting) { - // Set streamConnectionIsConnected to true, to indicate the stream has been connected. This - // should happen before the call to resetConnection. As it is unknown when the connection - // could be closed and the doneCallback called, and thus clearing the flag. - lock.lock(); - try { - this.streamConnectionIsConnected = true; - } finally { - lock.unlock(); - } - resetConnection(); - // Set firstRequestInConnection to indicate the next request to be sent should include - // metedata. - isFirstRequestInConnection = true; - } - while (!localQueue.isEmpty()) { - AppendRowsRequest preparedRequest = - prepareRequestBasedOnPosition( - localQueue.pollFirst().message, isFirstRequestInConnection); - // Send should only throw an exception if there is a problem with the request. The catch - // block will handle this case, and return the exception with the result. - // Otherwise send will return: - // SUCCESS: Message was sent, wait for the callback. - // STREAM_CLOSED: Stream was closed, normally or due to en error - // NOT_ENOUGH_QUOTA: Message wasn't sent due to not enough quota. - // TODO: Handle NOT_ENOUGH_QUOTA. - // In the close case, the request is in the inflight queue, and will either be returned - // to the user with an error, or will be resent. - this.streamConnection.send(preparedRequest); - isFirstRequestInConnection = false; - } - } - - log.fine("Cleanup starts. Stream: " + streamName); - // At this point, the waiting queue is drained, so no more requests. - // We can close the stream connection and handle the remaining inflight requests. - if (streamConnection != null) { - this.streamConnection.close(); - waitForDoneCallback(3, TimeUnit.MINUTES); - } - - // At this point, there cannot be more callback. It is safe to clean up all inflight requests. - log.fine( - "Stream connection is fully closed. Cleaning up inflight requests. Stream: " + streamName); - cleanupInflightRequests(); - log.fine("Append thread is done. Stream: " + streamName); - } - - /* - * Returns true if waiting queue is drain, a.k.a. no more requests in the waiting queue. - * - * It serves as a signal to append thread that there cannot be any more requests in the waiting - * queue and it can prepare to stop. - */ - private boolean waitingQueueDrained() { - this.lock.lock(); - try { - return (this.userClosed || this.connectionFinalStatus != null) - && this.waitingRequestQueue.isEmpty(); - } finally { - this.lock.unlock(); - } - } - - private void waitForDoneCallback(long duration, TimeUnit timeUnit) { - log.fine("Waiting for done callback from stream connection. Stream: " + streamName); - long deadline = System.nanoTime() + timeUnit.toNanos(duration); - while (System.nanoTime() <= deadline) { - this.lock.lock(); - try { - if (!this.streamConnectionIsConnected) { - // Done callback is received, return. - return; - } - } finally { - this.lock.unlock(); - } - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - } - this.lock.lock(); - try { - if (connectionFinalStatus == null) { - connectionFinalStatus = - new StatusRuntimeException( - Status.fromCode(Code.CANCELLED) - .withDescription("Timeout waiting for DoneCallback.")); - } - } finally { - this.lock.unlock(); - } - - return; - } - - private AppendRowsRequest prepareRequestBasedOnPosition( - AppendRowsRequest original, boolean isFirstRequest) { - AppendRowsRequest.Builder requestBuilder = original.toBuilder(); - if (isFirstRequest) { - if (this.writerSchema != null) { - requestBuilder.getProtoRowsBuilder().setWriterSchema(this.writerSchema); - } - requestBuilder.setWriteStream(this.streamName); - if (this.traceId != null) { - requestBuilder.setTraceId(this.traceId); - } - } else { - requestBuilder.clearWriteStream(); - requestBuilder.getProtoRowsBuilder().clearWriterSchema(); - } - return requestBuilder.build(); - } - - private void cleanupInflightRequests() { - Throwable finalStatus = - new Exceptions.StreamWriterClosedException( - Status.fromCode(Status.Code.FAILED_PRECONDITION) - .withDescription("Connection is already closed, cleanup inflight request"), - streamName, - writerId); - Deque localQueue = new LinkedList(); - this.lock.lock(); - try { - if (this.connectionFinalStatus != null) { - finalStatus = this.connectionFinalStatus; - } - while (!this.inflightRequestQueue.isEmpty()) { - localQueue.addLast(pollInflightRequestQueue()); - } - this.inflightCleanuped = true; - } finally { - this.lock.unlock(); - } - log.fine("Cleaning " + localQueue.size() + " inflight requests with error: " + finalStatus); - while (!localQueue.isEmpty()) { - localQueue.pollFirst().appendResult.setException(finalStatus); - } - } - - private void requestCallback(AppendRowsResponse response) { - AppendRequestAndResponse requestWrapper; - this.lock.lock(); - if (response.hasUpdatedSchema()) { - this.updatedSchema = response.getUpdatedSchema(); - } - try { - // Had a successful connection with at least one result, reset retries. - // conectionRetryCountWithoutCallback is reset so that only multiple retries, without - // successful records sent, will cause the stream to fail. - if (conectionRetryCountWithoutCallback != 0) { - conectionRetryCountWithoutCallback = 0; - } - if (!this.inflightRequestQueue.isEmpty()) { - requestWrapper = pollInflightRequestQueue(); - } else if (inflightCleanuped) { - // It is possible when requestCallback is called, the inflight queue is already drained - // because we timed out waiting for done. - return; - } else { - // This is something not expected, we shouldn't have an empty inflight queue otherwise. - log.log(Level.WARNING, "Unexpected: request callback called on an empty inflight queue."); - connectionFinalStatus = - new StatusRuntimeException( - Status.fromCode(Code.FAILED_PRECONDITION) - .withDescription("Request callback called on an empty inflight queue.")); - return; - } - } finally { - this.lock.unlock(); - } - if (response.hasError()) { - Exceptions.StorageException storageException = - Exceptions.toStorageException(response.getError(), null); - if (storageException != null) { - requestWrapper.appendResult.setException(storageException); - } else { - StatusRuntimeException exception = - new StatusRuntimeException( - Status.fromCodeValue(response.getError().getCode()) - .withDescription(response.getError().getMessage())); - requestWrapper.appendResult.setException(exception); - } - } else { - requestWrapper.appendResult.set(response); - } - } - - private boolean isRetriableError(Throwable t) { - Status status = Status.fromThrowable(t); - if (Errors.isRetryableInternalStatus(status)) { - return true; - } - return status.getCode() == Code.ABORTED - || status.getCode() == Code.UNAVAILABLE - || status.getCode() == Code.CANCELLED; - } - - private void doneCallback(Throwable finalStatus) { - log.fine( - "Received done callback. Stream: " - + streamName - + " Final status: " - + finalStatus.toString()); - this.lock.lock(); - try { - this.streamConnectionIsConnected = false; - if (connectionFinalStatus == null) { - // If the error can be retried, don't set it here, let it try to retry later on. - if (isRetriableError(finalStatus) && !userClosed) { - this.conectionRetryCountWithoutCallback++; - log.fine( - "Retriable error " - + finalStatus.toString() - + " received, retry count " - + conectionRetryCountWithoutCallback - + " for stream " - + streamName); - } else { - Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus); - this.connectionFinalStatus = storageException != null ? storageException : finalStatus; - log.info( - "Connection finished with error " - + finalStatus.toString() - + " for stream " - + streamName); - } - } - } finally { - this.lock.unlock(); - } - } - - @GuardedBy("lock") - private AppendRequestAndResponse pollInflightRequestQueue() { - AppendRequestAndResponse requestWrapper = this.inflightRequestQueue.pollFirst(); - --this.inflightRequests; - this.inflightBytes -= requestWrapper.messageSize; - this.inflightReduced.signal(); - return requestWrapper; + this.connectionWorker.close(); } /** @@ -724,7 +170,7 @@ public static StreamWriter.Builder newBuilder(String streamName) { /** Thread-safe getter of updated TableSchema */ public synchronized TableSchema getUpdatedSchema() { - return this.updatedSchema; + return connectionWorker.getUpdatedSchema(); } /** A builder of {@link StreamWriter}s. */ @@ -847,17 +293,4 @@ public StreamWriter build() throws IOException { return new StreamWriter(this); } } - - // Class that wraps AppendRowsRequest and its corresponding Response future. - private static final class AppendRequestAndResponse { - final SettableApiFuture appendResult; - final AppendRowsRequest message; - final long messageSize; - - AppendRequestAndResponse(AppendRowsRequest message) { - this.appendResult = SettableApiFuture.create(); - this.message = message; - this.messageSize = message.getProtoRows().getSerializedSize(); - } - } } From 5a133022897e3d95de313a56f1678370f8d74c0b Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Tue, 13 Sep 2022 12:50:28 -0700 Subject: [PATCH 02/12] feat: add connection worker pool skeleton, used for multiplexing client --- .github/.OwlBot.yaml | 1 + google-cloud-bigquerystorage/pom.xml | 9 +- .../storage/v1/ConnectionWorkerPool.java | 179 ++++++++++++++++++ 3 files changed, 188 insertions(+), 1 deletion(-) create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java diff --git a/.github/.OwlBot.yaml b/.github/.OwlBot.yaml index ec7bb13f06..86af849164 100644 --- a/.github/.OwlBot.yaml +++ b/.github/.OwlBot.yaml @@ -78,6 +78,7 @@ deep-preserve-regex: - "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1.*/Waiter.java" - "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java" - "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java" +- "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java" deep-copy-regex: - source: "/google/cloud/bigquery/storage/(v.*)/.*-java/proto-google-.*/src" diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index d7936d4d92..8d1078f60d 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -63,6 +63,14 @@ com.google.api api-common + + com.google.auto.value + auto-value + + + com.google.auto.value + auto-value-annotations + com.google.protobuf protobuf-java @@ -134,7 +142,6 @@ junit test - com.google.truth truth diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java new file mode 100644 index 0000000000..435f199f14 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -0,0 +1,179 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.batching.FlowController; +import com.google.auto.value.AutoValue; +import javax.annotation.concurrent.GuardedBy; + +public class ConnectionWorkerPool { + /* + * Max allowed inflight requests in the stream. Method append is blocked at this. + */ + private final long maxInflightRequests; + + /* + * Max allowed inflight bytes in the stream. Method append is blocked at this. + */ + private final long maxInflightBytes; + + /* + * Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block. + */ + private final FlowController.LimitExceededBehavior limitExceededBehavior; + + /* + * TraceId for debugging purpose. + */ + private final String traceId; + + /* + * Tracks current inflight requests in the stream. + */ + @GuardedBy("lock") + private long inflightRequests = 0; + + /* + * Tracks current inflight bytes in the stream. + */ + @GuardedBy("lock") + private long inflightBytes = 0; + + /* + * Tracks how often the stream was closed due to a retriable error. Streaming will stop when the + * count hits a threshold. Streaming should only be halted, if it isn't possible to establish a + * connection. Keep track of the number of reconnections in succession. This will be reset if + * a row is successfully called back. + */ + @GuardedBy("lock") + private long conectionRetryCountWithoutCallback = 0; + + /* + * If false, streamConnection needs to be reset. + */ + @GuardedBy("lock") + private boolean streamConnectionIsConnected = false; + + /* + * A boolean to track if we cleaned up inflight queue. + */ + @GuardedBy("lock") + private boolean inflightCleanuped = false; + + /* + * Indicates whether user has called Close() or not. + */ + @GuardedBy("lock") + private boolean userClosed = false; + + /* + * The final status of connection. Set to nonnull when connection is permanently closed. + */ + @GuardedBy("lock") + private Throwable connectionFinalStatus = null; + + /* + * Contains the updated TableSchema. + */ + @GuardedBy("lock") + private TableSchema updatedSchema; + + /* + * A client used to interact with BigQuery. + */ + private BigQueryWriteClient client; + + /* + * If true, the client above is created by this writer and should be closed. + */ + private boolean ownsBigQueryWriteClient = false; + + /** Settings for connection pool. */ + @AutoValue + public abstract static class Settings { + /** + * The minimum connections each pool created before trying to reuse the previously created + * connection in multiplexing mode. + */ + abstract int minConnectionsPerPool(); + + /** The maximum connections per connection pool. */ + abstract int maxConnectionsPerPool(); + + public static Builder builder() { + return new AutoValue_ConnectionWorkerPool_Settings.Builder() + .setMinConnectionsPerPool(2) + .setMaxConnectionsPerPool(10); + } + + /** Builder for the options to config {@link ConnectionWorkerPool}. */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setMinConnectionsPerPool(int value); + + public abstract Builder setMaxConnectionsPerPool(int value); + + public abstract Settings build(); + } + } + + /** Static setting for connection pool. */ + private static Settings settings = Settings.builder().build(); + + public ConnectionWorkerPool( + long maxInflightRequests, + long maxInflightBytes, + FlowController.LimitExceededBehavior limitExceededBehavior, + String traceId, + BigQueryWriteClient client, + boolean ownsBigQueryWriteClient) { + this.maxInflightRequests = maxInflightRequests; + this.maxInflightBytes = maxInflightBytes; + this.limitExceededBehavior = limitExceededBehavior; + this.traceId = traceId; + this.client = client; + this.ownsBigQueryWriteClient = ownsBigQueryWriteClient; + } + + /** + * Sets static connection pool options. + * + *

Note: this method should be triggered prior to the construction of connection pool. + */ + public static void setOptions(Settings settings) { + ConnectionWorkerPool.settings = settings; + } + + /** Distributes the writing of a message to an underlying connection. */ + public ApiFuture append( + StreamWriter streamWriter, ProtoRows rows) { + throw new RuntimeException("Append is not implemented!"); + } + + /** + * Distributes the writing of a message to an underlying connection. + */ + public ApiFuture append( + StreamWriter streamWriter, ProtoRows rows, long offset) { + throw new RuntimeException("append with offset is not implemented on connection pool!"); + } + + /** Close the stream writer. Shut down all resources. */ + public void close(StreamWriter streamWriter) { + throw new RuntimeException("close is implemented on connection pool"); + } +} From 8a81ad34824d7409c9b2c62594421105296b974e Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Wed, 14 Sep 2022 13:26:08 -0700 Subject: [PATCH 03/12] feat: add Load api for connection worker for multiplexing client --- .../bigquery/storage/v1/ConnectionWorker.java | 76 +++++++++++++++++++ .../storage/v1/ConnectionWorkerTest.java | 56 ++++++++++++++ 2 files changed, 132 insertions(+) create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 36bf7bbaa7..081ab0340b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -18,16 +18,19 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowController; +import com.google.auto.value.AutoValue; import com.google.cloud.bigquery.storage.util.Errors; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData; import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback; import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Int64Value; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; +import java.util.Comparator; import java.util.Deque; import java.util.LinkedList; import java.util.UUID; @@ -672,4 +675,77 @@ private static final class AppendRequestAndResponse { this.messageSize = message.getProtoRows().getSerializedSize(); } } + + + /** + * Represent the current workload for this worker. Used for multiplexing algorithm to determine + * the distribution of requests. + */ + @AutoValue + public abstract static class Load { + // Consider the load on this worker to be overwhelmed when above some percentage of + // in-flight bytes or in-flight requests count. + private static double overwhelmedInflightCount = 0.5; + private static double overwhelmedInflightBytes = 0.6; + + // Number of in-flight requests bytes in the worker. + abstract long inFlightRequestsBytes(); + + // Number of in-flight requests count in the worker. + abstract long inFlightRequestsCount(); + + // Number of destination handled by this worker. + abstract long destinationCount(); + + // Max number of in-flight requests count allowed. + abstract long maxInflightBytes(); + + // Max number of in-flight requests bytes allowed. + abstract long maxInflightCount(); + + static Load create( + long inFlightRequestsBytes, + long inFlightRequestsCount, + long destinationCount, + long maxInflightBytes, + long maxInflightCount) { + return new AutoValue_ConnectionWorker_Load( + inFlightRequestsBytes, + inFlightRequestsCount, + destinationCount, + maxInflightBytes, + maxInflightCount); + } + + boolean isOverwhelmed() { + // Consider only in flight bytes and count for now, as by experiment those two are the most + // efficient and has great simplity. + return inFlightRequestsCount() > overwhelmedInflightCount * maxInflightCount() + || inFlightRequestsBytes() > overwhelmedInflightBytes * maxInflightBytes(); + } + + // Compares two different load. First compare in flight request bytes split by size 1024 bucket. + // Then compare the inflight requests count. + // Then compare destination count of the two connections. + public static final Comparator LOAD_COMPARATOR = + Comparator.comparing((Load key) -> (int) (key.inFlightRequestsBytes() / 1024)) + .thenComparing((Load key) -> (int) (key.inFlightRequestsCount() / 100)) + .thenComparing(Load::destinationCount); + + // Compares two different load without bucket, used in smaller scale unit testing. + public static final Comparator TEST_LOAD_COMPARATOR = + Comparator.comparing((Load key) -> (int) key.inFlightRequestsBytes()) + .thenComparing((Load key) -> (int) key.inFlightRequestsCount()) + .thenComparing(Load::destinationCount); + + @VisibleForTesting + public static void setOverwhelmedBytesThreshold(double newThreshold) { + overwhelmedInflightBytes = newThreshold; + } + + @VisibleForTesting + public static void setOverwhelmedCountsThreshold(double newThreshold) { + overwhelmedInflightCount = newThreshold; + } + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java new file mode 100644 index 0000000000..35d8d5cf09 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -0,0 +1,56 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ConnectionWorkerTest { + @Test + public void testLoadCompare_compareLoad() { + // In flight bytes bucket is split as per 1024 requests per bucket. + // When in flight bytes is in lower bucket, even destination count is higher and request count + // is higher, the load is still smaller. + Load load1 = ConnectionWorker.Load.create(1000, 2000, 100, 1000, 10); + Load load2 = ConnectionWorker.Load.create(2000, 1000, 10, 1000, 10); + assertThat(Load.LOAD_COMPARATOR.compare(load1, load2)).isLessThan(0); + + // In flight bytes in the same bucke of request bytes will compare request count. + Load load3 = ConnectionWorker.Load.create(1, 300, 10, 0, 10); + Load load4 = ConnectionWorker.Load.create(10, 1, 10, 0, 10); + assertThat(Load.LOAD_COMPARATOR.compare(load3, load4)).isGreaterThan(0); + + // In flight request and bytes in the same bucket will compare the destination count. + Load load5 = ConnectionWorker.Load.create(200, 1, 10, 1000, 10); + Load load6 = ConnectionWorker.Load.create(100, 10, 10, 1000, 10); + assertThat(Load.LOAD_COMPARATOR.compare(load5, load6) == 0).isTrue(); + } + + @Test + public void testLoadIsOverWhelmed() { + // Only in flight request is considered in current overwhelmed calculation. + Load load1 = ConnectionWorker.Load.create(60, 10, 100, 90, 100); + assertThat(load1.isOverwhelmed()).isTrue(); + + Load load2 = ConnectionWorker.Load.create(1, 1, 100, 100, 100); + assertThat(load2.isOverwhelmed()).isFalse(); + } +} From 7a6d91998f45b2b25855ee907c6c5dac963f25c1 Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Thu, 15 Sep 2022 13:47:58 -0700 Subject: [PATCH 04/12] feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt --- .../clirr-ignored-differences.xml | 15 ++ .../bigquery/storage/v1/ConnectionWorker.java | 111 +++++---- .../bigquery/storage/v1/StreamWriter.java | 6 +- .../storage/v1/ConnectionWorkerTest.java | 215 ++++++++++++++++++ 4 files changed, 307 insertions(+), 40 deletions(-) diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index ca9d4778e6..69e67b9464 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -25,4 +25,19 @@ com/google/cloud/bigquery/storage/v1/Exceptions$StreamWriterClosedException Exceptions$StreamWriterClosedException(io.grpc.Status, java.lang.String) + + 7004 + com/google/cloud/bigquery/storage/v1/ConnectionWorker + com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1.ProtoRows, long) + + + 7009 + com/google/cloud/bigquery/storage/v1/ConnectionWorker + com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1.ProtoRows, long) + + + 7002 + com/google/cloud/bigquery/storage/v1/ConnectionWorker + com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1.ProtoRows) + diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 743f926322..0b75813fa8 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -33,7 +33,9 @@ import java.util.Comparator; import java.util.Deque; import java.util.LinkedList; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -47,6 +49,8 @@ * A BigQuery Stream Writer that can be used to write data into BigQuery Table. * *

TODO: Support batching. + * + *

TODO: support updated schema */ public class ConnectionWorker implements AutoCloseable { private static final Logger log = Logger.getLogger(StreamWriter.class.getName()); @@ -56,14 +60,15 @@ public class ConnectionWorker implements AutoCloseable { private Condition inflightReduced; /* - * The identifier of stream to write to. + * The identifier of the current stream to write to. This stream name can change during + * multiplexing. */ - private final String streamName; + private String streamName; /* - * The proto schema of rows to write. + * The proto schema of rows to write. This schema can change during multiplexing. */ - private final ProtoSchema writerSchema; + private ProtoSchema writerSchema; /* * Max allowed inflight requests in the stream. Method append is blocked at this. @@ -142,6 +147,11 @@ public class ConnectionWorker implements AutoCloseable { @GuardedBy("lock") private final Deque inflightRequestQueue; + /* + * Tracks number of destinations handled by this connection. + */ + private final Set destinationSet = ConcurrentHashMap.newKeySet(); + /* * Contains the updated TableSchema. */ @@ -241,18 +251,16 @@ public void run(Throwable finalStatus) { }); } - /** Schedules the writing of rows at the end of current stream. */ - public ApiFuture append(ProtoRows rows) { - return append(rows, -1); - } - /** Schedules the writing of rows at given offset. */ - public ApiFuture append(ProtoRows rows, long offset) { + ApiFuture append( + String streamName, ProtoSchema writerSchema, ProtoRows rows, long offset) { AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); - requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build()); + requestBuilder.setProtoRows( + ProtoData.newBuilder().setWriterSchema(writerSchema).setRows(rows).build()); if (offset >= 0) { requestBuilder.setOffset(Int64Value.of(offset)); } + requestBuilder.setWriteStream(streamName); return appendInternal(requestBuilder.build()); } @@ -381,9 +389,13 @@ public void close() { private void appendLoop() { Deque localQueue = new LinkedList(); boolean streamNeedsConnecting = false; - // Set firstRequestInConnection to true immediately after connecting the steam, - // indicates then next row sent, needs the schema and other metadata. - boolean isFirstRequestInConnection = true; + + // Indicate whether we are at the first request after switching destination. + // True means the schema and other metadata are needed. + boolean firstRequestForDestinationSwitch = true; + // Represent whether we have entered multiplexing. + boolean isMultiplexing = false; + while (!waitingQueueDrained()) { this.lock.lock(); try { @@ -430,13 +442,43 @@ private void appendLoop() { } resetConnection(); // Set firstRequestInConnection to indicate the next request to be sent should include - // metedata. - isFirstRequestInConnection = true; + // metedata. Reset everytime after reconnection. + firstRequestForDestinationSwitch = true; } while (!localQueue.isEmpty()) { - AppendRowsRequest preparedRequest = - prepareRequestBasedOnPosition( - localQueue.pollFirst().message, isFirstRequestInConnection); + AppendRowsRequest originalRequest = localQueue.pollFirst().message; + AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder(); + + // Consider we enter multiplexing if we met a different non empty stream name. + if (!originalRequest.getWriteStream().isEmpty() + && !streamName.isEmpty() + && !originalRequest.getWriteStream().equals(streamName)) { + streamName = originalRequest.getWriteStream(); + writerSchema = originalRequest.getProtoRows().getWriterSchema(); + isMultiplexing = true; + firstRequestForDestinationSwitch = true; + } + + if (firstRequestForDestinationSwitch) { + // If we are at the first request for every table switch, including the first request in + // the connection, we will attach both stream name and table schema to the request. + // We don't support change of schema change during multiplexing for the saeme stream name. + destinationSet.add(streamName); + if (this.traceId != null) { + originalRequestBuilder.setTraceId(this.traceId); + } + firstRequestForDestinationSwitch = false; + } else if (isMultiplexing) { + // If we are not at the first request after table switch, but we are in multiplexing + // mode, we only need the stream name but not the schema in the request. + originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema(); + } else { + // If we are not at the first request or in multiplexing, create request with no schema + // and no stream name. + originalRequestBuilder.clearWriteStream(); + originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema(); + } + // Send should only throw an exception if there is a problem with the request. The catch // block will handle this case, and return the exception with the result. // Otherwise send will return: @@ -446,8 +488,7 @@ private void appendLoop() { // TODO: Handle NOT_ENOUGH_QUOTA. // In the close case, the request is in the inflight queue, and will either be returned // to the user with an error, or will be resent. - this.streamConnection.send(preparedRequest); - isFirstRequestInConnection = false; + this.streamConnection.send(originalRequestBuilder.build()); } } @@ -512,24 +553,6 @@ private void waitForDoneCallback(long duration, TimeUnit timeUnit) { return; } - private AppendRowsRequest prepareRequestBasedOnPosition( - AppendRowsRequest original, boolean isFirstRequest) { - AppendRowsRequest.Builder requestBuilder = original.toBuilder(); - if (isFirstRequest) { - if (this.writerSchema != null) { - requestBuilder.getProtoRowsBuilder().setWriterSchema(this.writerSchema); - } - requestBuilder.setWriteStream(this.streamName); - if (this.traceId != null) { - requestBuilder.setTraceId(this.traceId); - } - } else { - requestBuilder.clearWriteStream(); - requestBuilder.getProtoRowsBuilder().clearWriterSchema(); - } - return requestBuilder.build(); - } - private void cleanupInflightRequests() { Throwable finalStatus = new Exceptions.StreamWriterClosedException( @@ -676,6 +699,16 @@ private static final class AppendRequestAndResponse { } } + /** Returns the current workload of this worker. */ + public Load getLoad() { + return Load.create( + inflightBytes, + inflightRequests, + destinationSet.size(), + maxInflightBytes, + maxInflightRequests); + } + /** * Represent the current workload for this worker. Used for multiplexing algorithm to determine * the distribution of requests. diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 35eca74eec..922dd66e81 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -43,6 +43,9 @@ public class StreamWriter implements AutoCloseable { */ private final String streamName; + /** Every writer has a fixed proto schema. */ + private final ProtoSchema writerSchema; + /* * A String that uniquely identifies this writer. */ @@ -56,6 +59,7 @@ public static long getApiMaxRequestBytes() { private StreamWriter(Builder builder) throws IOException { BigQueryWriteClient client; this.streamName = builder.streamName; + this.writerSchema = builder.writerSchema; boolean ownsBigQueryWriteClient; if (builder.client == null) { BigQueryWriteSettings stubSettings = @@ -123,7 +127,7 @@ public ApiFuture append(ProtoRows rows) { * @return the append response wrapped in a future. */ public ApiFuture append(ProtoRows rows, long offset) { - return this.connectionWorker.append(rows, offset); + return this.connectionWorker.append(streamName, writerSchema, rows, offset); } /** diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 35d8d5cf09..e6067be735 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -17,13 +17,228 @@ import static com.google.common.truth.Truth.assertThat; +import com.google.api.core.ApiFuture; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.testing.MockGrpcService; +import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.cloud.bigquery.storage.test.Test.ComplicateType; +import com.google.cloud.bigquery.storage.test.Test.FooType; +import com.google.cloud.bigquery.storage.test.Test.InnerType; import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Int64Value; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class ConnectionWorkerTest { + private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/s1"; + private static final String TEST_STREAM_2 = "projects/p2/datasets/d2/tables/t2/streams/s2"; + private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; + + private FakeBigQueryWrite testBigQueryWrite; + private FakeScheduledExecutorService fakeExecutor; + private static MockServiceHelper serviceHelper; + private BigQueryWriteClient client; + + @Before + public void setUp() throws Exception { + testBigQueryWrite = new FakeBigQueryWrite(); + serviceHelper = + new MockServiceHelper( + UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite)); + serviceHelper.start(); + fakeExecutor = new FakeScheduledExecutorService(); + testBigQueryWrite.setExecutor(fakeExecutor); + client = + BigQueryWriteClient.create( + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider(serviceHelper.createChannelProvider()) + .build()); + } + + @Test + public void testMultiplexedAppendSuccess() throws Exception { + try (ConnectionWorker connectionWorker = createConnectionWorker()) { + long appendCount = 20; + for (long i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + List> futures = new ArrayList<>(); + // We do a pattern of: + // send to stream1, string1 + // send to stream1, string2 + // send to stream2, string3 + // send to stream2, string4 + // send to stream1, string5 + // ... + for (long i = 0; i < appendCount; i++) { + switch ((int) i % 4) { + case 0: + case 1: + ProtoRows rows = createFooProtoRows(new String[] {String.valueOf(i)}); + futures.add( + sendTestMessage( + connectionWorker, + TEST_STREAM_1, + createProtoSchema("foo"), + createFooProtoRows(new String[] {String.valueOf(i)}), + i)); + break; + case 2: + case 3: + futures.add( + sendTestMessage( + connectionWorker, + TEST_STREAM_2, + createProtoSchema("complicate"), + createComplicateTypeProtoRows(new String[] {String.valueOf(i)}), + i)); + break; + default: // fall out + break; + } + } + // In the real world the response won't contain offset for default stream, but we use offset + // here just to test response. + for (int i = 0; i < appendCount; i++) { + Int64Value offset = futures.get(i).get().getAppendResult().getOffset(); + assertThat(offset).isEqualTo(Int64Value.of(i)); + } + assertThat(testBigQueryWrite.getAppendRequests().size()).isEqualTo(appendCount); + for (int i = 0; i < appendCount; i++) { + AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i); + assertThat(serverRequest.getProtoRows().getRows().getSerializedRowsCount()) + .isGreaterThan(0); + assertThat(serverRequest.getOffset().getValue()).isEqualTo(i); + + // We will get the request as the pattern of: + // (writer_stream: t1, schema: t1) + // (writer_stream: _, schema: _) + // (writer_stream: t2, schema: t2) -> multiplexing entered. + // (writer_stream: t2, schema: _) + // (writer_stream: t1, schema: t1) + // (writer_stream: t1, schema: _) + switch (i % 4) { + case 0: + assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); + assertThat( + serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName()) + .isEqualTo("foo"); + break; + case 1: + // The write stream is empty until we enter multiplexing. + if (i == 1) { + assertThat(serverRequest.getWriteStream()).isEmpty(); + } else { + assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); + } + // Schema is empty if not at the first request after table switch. + assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse(); + break; + case 2: + // Stream name is always populated after multiplexing. + assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_2); + // Schema is populated after table switch. + assertThat( + serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName()) + .isEqualTo("complicate"); + break; + case 3: + // Schema is empty if not at the first request after table switch. + assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse(); + // Stream name is always populated after multiplexing. + assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_2); + break; + default: // fall out + break; + } + } + + assertThat(connectionWorker.getLoad().destinationCount()).isEqualTo(2); + assertThat(connectionWorker.getLoad().inFlightRequestsBytes()).isEqualTo(0); + } + } + + private AppendRowsResponse createAppendResponse(long offset) { + return AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(offset)).build()) + .build(); + } + + private ConnectionWorker createConnectionWorker() throws IOException { + // By default use only the first table as table reference. + return createConnectionWorker(TEST_STREAM_1, TEST_TRACE_ID, 100, 1000); + } + + private ConnectionWorker createConnectionWorker( + String streamName, String traceId, long maxRequests, long maxBytes) throws IOException { + return new ConnectionWorker( + streamName, + createProtoSchema("foo"), + maxRequests, + maxBytes, + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + client, + /*ownsBigQueryWriteClient=*/ false); + } + + private ProtoSchema createProtoSchema(String protoName) { + return ProtoSchema.newBuilder() + .setProtoDescriptor( + DescriptorProtos.DescriptorProto.newBuilder() + .setName(protoName) + .addField( + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName("foo") + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING) + .setNumber(1) + .build()) + .build()) + .build(); + } + + private ApiFuture sendTestMessage( + ConnectionWorker connectionWorker, + String streamName, + ProtoSchema protoSchema, + ProtoRows protoRows, + long offset) { + return connectionWorker.append(streamName, protoSchema, protoRows, offset); + } + + private ProtoRows createFooProtoRows(String[] messages) { + ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); + for (String message : messages) { + FooType foo = FooType.newBuilder().setFoo(message).build(); + rowsBuilder.addSerializedRows(foo.toByteString()); + } + return rowsBuilder.build(); + } + + private ProtoRows createComplicateTypeProtoRows(String[] messages) { + ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); + for (String message : messages) { + ComplicateType complicateType = + ComplicateType.newBuilder() + .setInnerType(InnerType.newBuilder().addValue(message)) + .build(); + rowsBuilder.addSerializedRows(complicateType.toByteString()); + } + return rowsBuilder.build(); + } + @Test public void testLoadCompare_compareLoad() { // In flight bytes bucket is split as per 1024 requests per bucket. From 3ba7659f46526a62fe54907dca8235a4196d5783 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Fri, 16 Sep 2022 19:42:28 +0000 Subject: [PATCH 05/12] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 6cbfc72ed5..3a83102c76 100644 --- a/README.md +++ b/README.md @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.20.1' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.21.0' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.20.1" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.21.0" ``` ## Authentication From f379a78851b33ee4e109db7349e32575a3992f05 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Fri, 16 Sep 2022 19:42:28 +0000 Subject: [PATCH 06/12] Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 6cbfc72ed5..3a83102c76 100644 --- a/README.md +++ b/README.md @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.20.1' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.21.0' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.20.1" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.21.0" ``` ## Authentication From de73013ba74437f8c26c9e97b8630d1c5ce31d41 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Fri, 16 Sep 2022 21:43:24 +0000 Subject: [PATCH 07/12] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 6cbfc72ed5..3a83102c76 100644 --- a/README.md +++ b/README.md @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.20.1' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.21.0' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.20.1" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.21.0" ``` ## Authentication From 19005a1b9894a5ee001fd0cf18e812efe51de741 Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Mon, 19 Sep 2022 12:20:31 -0700 Subject: [PATCH 08/12] feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset --- .../storage/v1/ConnectionWorkerPool.java | 195 ++++++++++++++- .../bigquery/storage/v1/StreamWriter.java | 5 + .../storage/v1/ConnectionWorkerPoolTest.java | 232 ++++++++++++++++++ .../storage/v1/FakeBigQueryWriteImpl.java | 9 +- 4 files changed, 437 insertions(+), 4 deletions(-) create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index a4642a96b0..e3b335bb4c 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -18,9 +18,28 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowController; import com.google.auto.value.AutoValue; +import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Logger; import javax.annotation.concurrent.GuardedBy; +/** + * Pool of connections to accept + */ public class ConnectionWorkerPool { + private static final Logger log = Logger.getLogger(ConnectionWorkerPool.class.getName()); /* * Max allowed inflight requests in the stream. Method append is blocked at this. */ @@ -36,11 +55,29 @@ public class ConnectionWorkerPool { */ private final FlowController.LimitExceededBehavior limitExceededBehavior; + /** Map from write stream to corresponding connection. */ + private final Map streamWriterToConnection = + new ConcurrentHashMap<>(); + + /** Map from a connection to a set of write stream that have sent requests onto it. */ + private final Map> connectionToWriteStream = + new ConcurrentHashMap<>(); + + /** Collection of all the created connections. */ + private final Set connectionWorkerPool = + Collections.synchronizedSet(new HashSet<>()); + + /** Enable test related logic. */ + private static boolean enableTesting = false; + /* * TraceId for debugging purpose. */ private final String traceId; + /** Used for test on the number of times createWorker is called. */ + private final AtomicInteger testValueCreateConnectionCount = new AtomicInteger(0); + /* * Tracks current inflight requests in the stream. */ @@ -102,6 +139,15 @@ public class ConnectionWorkerPool { */ private boolean ownsBigQueryWriteClient = false; + /** + * The current maximum connection count. This value is gradually increased till the user defined + * maximum connection count. + */ + private int currentMaxConnectionCount; + + /** Lock for controlling concurrent operation on add / delete connections. */ + private final Lock lock = new ReentrantLock(); + /** Settings for connection pool. */ @AutoValue public abstract static class Settings { @@ -147,6 +193,7 @@ public ConnectionWorkerPool( this.traceId = traceId; this.client = client; this.ownsBigQueryWriteClient = ownsBigQueryWriteClient; + this.currentMaxConnectionCount = settings.minConnectionsPerPool(); } /** @@ -160,13 +207,157 @@ public static void setOptions(Settings settings) { /** Distributes the writing of a message to an underlying connection. */ public ApiFuture append(StreamWriter streamWriter, ProtoRows rows) { - throw new RuntimeException("Append is not implemented!"); + return append(streamWriter, rows, -1); } /** Distributes the writing of a message to an underlying connection. */ public ApiFuture append( StreamWriter streamWriter, ProtoRows rows, long offset) { - throw new RuntimeException("append with offset is not implemented on connection pool!"); + // We are in multiplexing mode after entering the following logic. + ConnectionWorker connectionWorker = + streamWriterToConnection.compute( + streamWriter, + (key, existingStream) -> { + // Though compute on concurrent map is atomic, we still do explicit locking as we + // may have concurrent close(...) triggered. + lock.lock(); + try { + // Stick to the existing stream if it's not overwhelmed. + if (existingStream != null && !existingStream.getLoad().isOverwhelmed()) { + return existingStream; + } + // Try to create or find another existing stream to reuse. + ConnectionWorker createdOrExistingConnection = + null; + try { + createdOrExistingConnection = createOrReuseConnectionWorker(streamWriter, + existingStream); + } catch (IOException e) { + throw new IllegalStateException(e); + } + // Update connection to write stream relationship. + connectionToWriteStream.computeIfAbsent( + createdOrExistingConnection, (ConnectionWorker k) -> new HashSet<>()); + connectionToWriteStream.get(createdOrExistingConnection).add(streamWriter); + return createdOrExistingConnection; + } finally { + lock.unlock(); + } + }); + Stopwatch stopwatch = Stopwatch.createStarted(); + ApiFuture responseFuture = connectionWorker.append( + streamWriter.getStreamName(), streamWriter.getProtoSchema(), rows, offset); + return responseFuture; + } + + /** + * Create a new connection if we haven't reached current maximum, or reuse an existing connection + * using best of random k selection (randomly select out `maxSearchConnectionRetryTimes` + * connections and check which one has lowest load). + * + *

Note: for simplicity, this function is defined as synchronized, which means only one thread + * can execute it once per time. + */ + private ConnectionWorker createOrReuseConnectionWorker( + StreamWriter streamWriter, + ConnectionWorker existingConnectionWorker) throws IOException { + String streamReference = streamWriter.getStreamName(); + if (connectionWorkerPool.size() < currentMaxConnectionCount) { + // Always create a new connection if we haven't reached current maximum. + return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema()); + } else { + ConnectionWorker existingBestConnection = + pickBestLoadConnection( + enableTesting ? Load.TEST_LOAD_COMPARATOR : Load.LOAD_COMPARATOR, + ImmutableList.copyOf(connectionWorkerPool)); + if (!existingBestConnection.getLoad().isOverwhelmed()) { + return existingBestConnection; + } else if (currentMaxConnectionCount < settings.maxConnectionsPerPool()) { + // At this point, we have reached the connection cap and the selected connection is + // overwhelmed, we can try scale up the connection pool. + // The connection count will go up one by one until `maxConnectionsPerPool` is reached. + currentMaxConnectionCount += 1; + if (currentMaxConnectionCount > settings.maxConnectionsPerPool()) { + currentMaxConnectionCount = settings.maxConnectionsPerPool(); + } + return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema()); + } else { + // Stick to the original connection if all the connections are overwhelmed. + if (existingConnectionWorker != null) { + return existingConnectionWorker; + } + // If we are at this branch, it means we reached the maximum connections. + return existingBestConnection; + } + } + } + + + /** + * Select out the best connection worker among the given connection workers. + */ + static ConnectionWorker pickBestLoadConnection( + Comparator comparator, List connectionWorkerList) { + if (connectionWorkerList.isEmpty()) { + throw new IllegalStateException( + String.format("Bug in code! At least one connection worker should be passed in " + + "pickSemiBestLoadConnection(...)")); + } + // Compare all connection workers to find the connection worker with the smallest load. + // Loop and find the connection with the least load. + // The load comparision and computation process + int currentBestIndex = 0; + Load currentBestLoad = connectionWorkerList.get(currentBestIndex).getLoad(); + for (int i = 1; i < connectionWorkerList.size(); i++) { + Load loadToCompare = connectionWorkerList.get(i).getLoad(); + if (comparator.compare(loadToCompare, currentBestLoad) <= 0) { + currentBestIndex = i; + currentBestLoad = loadToCompare; + } + } + return connectionWorkerList.get(currentBestIndex); + } + + + /** + * Creates a single connection worker. + * + *

Note this function need to be thread-safe across different stream reference but no need for + * a single stream reference. This is because createConnectionWorker(...) is called via + * computeIfAbsent(...) which is at most once per key. + */ + private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema writeSchema) + throws IOException { + if (enableTesting) { + // Though atomic integer is super lightweight, add extra if check in case adding future logic. + testValueCreateConnectionCount.getAndIncrement(); + } + ConnectionWorker connectionWorker = + new ConnectionWorker( + streamName, + writeSchema, + maxInflightRequests, + maxInflightBytes, + limitExceededBehavior, + traceId, + client, + ownsBigQueryWriteClient); + connectionWorkerPool.add(connectionWorker); + log.info( + String.format( + "Scaling up new connection for stream name: %s, pool size after scaling up %s", + streamName, connectionWorkerPool.size())); + return connectionWorker; + } + + /** Enable Test related logic. */ + public static void enableTestingLogic() { + enableTesting = true; + } + + /** Returns how many times createConnectionWorker(...) is called. */ + int getCreateConnectionCount() { + return testValueCreateConnectionCount.get(); } /** Close the stream writer. Shut down all resources. */ diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 922dd66e81..e869668818 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -152,6 +152,11 @@ public String getStreamName() { return streamName; } + /** @return the passed in user schema. */ + public ProtoSchema getProtoSchema() { + return writerSchema; + } + /** Close the stream writer. Shut down all resources. */ @Override public void close() { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java new file mode 100644 index 0000000000..9213974e36 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -0,0 +1,232 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.testing.MockGrpcService; +import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.cloud.bigquery.storage.test.Test.FooType; +import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Int64Value; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.logging.Logger; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class ConnectionWorkerPoolTest { + + private FakeBigQueryWrite testBigQueryWrite; + private FakeScheduledExecutorService fakeExecutor; + private static MockServiceHelper serviceHelper; + private BigQueryWriteClient client; + + private static final String TEST_TRACE_ID = "home:job1"; + private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/_default"; + private static final String TEST_STREAM_2 = "projects/p1/datasets/d1/tables/t2/streams/_default"; + + @Before + public void setUp() throws Exception { + testBigQueryWrite = new FakeBigQueryWrite(); + serviceHelper = + new MockServiceHelper( + UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite)); + serviceHelper.start(); + fakeExecutor = new FakeScheduledExecutorService(); + testBigQueryWrite.setExecutor(fakeExecutor); + client = + BigQueryWriteClient.create( + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider(serviceHelper.createChannelProvider()) + .build()); + } + + @Test + public void testSingleTableConnection_noOverwhelmedConnection() throws Exception { + // Set the max requests count to a large value so we will not scaling up. + testSend100RequestsToMultiTable( + /*maxRequests=*/100000, + /*maxConnections=*/8, + /*expectedConnectionCount=*/1, + /*tableCount=*/1); + } + + @Test + public void testSingleTableConnections_overwhelmed() throws Exception { + // A connection will be considered overwhelmed when the requests count reach 5 (max 10). + testSend100RequestsToMultiTable( + /*maxRequests=*/10, + /*maxConnections=*/8, + /*expectedConnectionCount=*/8, + /*tableCount=*/1); + } + + @Test + public void testMultiTableConnection_noOverwhelmedConnection() throws Exception { + // Set the max requests count to a large value so we will not scaling up. + // All tables will share the same connections. + testSend100RequestsToMultiTable( + /*maxRequests=*/100000, + /*maxConnections=*/8, + /*expectedConnectionCount=*/1, + /*tableCount=*/4); + } + + @Test + public void testMultiTableConnections_overwhelmed() throws Exception { + // A connection will be considered overwhelmed when the requests count reach 5 (max 10). + testSend100RequestsToMultiTable( + /*maxRequests=*/10, + /*maxConnections=*/8, + /*expectedConnectionCount=*/8, + /*tableCount=*/4); + } + + private void testSend100RequestsToMultiTable( + int maxRequests, + int maxConnections, + int expectedConnectionCount, + int tableCount) throws IOException, ExecutionException, InterruptedException { + ConnectionWorkerPool connectionWorkerPool = + createConnectionWorkerPool( + maxRequests, + /*maxBytes=*/100000); + ConnectionWorkerPool.setOptions( + Settings.builder() + .setMaxConnectionsPerPool(maxConnections) + .build()); + + // Sets the sleep time to simulate requests stuck in connection. + testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L)); + + // Try append 100 requests. + long appendCount = 100; + for (long i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + List> futures = new ArrayList<>(); + + // Create one stream writer per table. + List streamWriterList = new ArrayList<>(); + for (int i = 0; i < tableCount; i++) { + streamWriterList.add(getTestStreamWriter( + String.format("projects/p1/datasets/d1/tables/t%s/streams/_default", i))); + } + + for (long i = 0; i < appendCount; i++) { + // Round robinly insert requests to different tables. + futures.add( + sendFooStringTestMessage( + streamWriterList.get((int) (i % streamWriterList.size())), + connectionWorkerPool, + new String[] {String.valueOf(i)}, + i)); + } + + for (int i = 0; i < appendCount; i++) { + AppendRowsResponse response = futures.get(i).get(); + assertThat(response.getAppendResult().getOffset().getValue()).isEqualTo(i); + } + // At the end we should scale up to 8 connections. + assertThat(connectionWorkerPool.getCreateConnectionCount()).isEqualTo(expectedConnectionCount); + + assertThat(testBigQueryWrite.getAppendRequests().size()).isEqualTo(appendCount); + // The request order server received is no longer guaranteed, + HashSet offsets = new HashSet<>(); + for (int i = 0; i < appendCount; i++) { + AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i); + assertThat(serverRequest.getProtoRows().getRows().getSerializedRowsCount()) + .isGreaterThan(0); + offsets.add(serverRequest.getOffset().getValue()); + } + assertThat(offsets.size()).isEqualTo(appendCount); + } + + private AppendRowsResponse createAppendResponse(long offset) { + return AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(offset)).build()) + .build(); + } + + private StreamWriter getTestStreamWriter(String streamName) throws IOException { + return StreamWriter.newBuilder(streamName, client) + .setWriterSchema(createProtoSchema()) + .setTraceId(TEST_TRACE_ID) + .build(); + } + + private ProtoSchema createProtoSchema() { + return ProtoSchema.newBuilder() + .setProtoDescriptor( + DescriptorProtos.DescriptorProto.newBuilder() + .setName("Message") + .addField( + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName("foo") + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING) + .setNumber(1) + .build()) + .build()) + .build(); + } + + private ApiFuture sendFooStringTestMessage( + StreamWriter writeStream, + ConnectionWorkerPool connectionWorkerPool, + String[] messages, + long offset) { + return connectionWorkerPool.append( + writeStream, + createProtoRows(messages), + offset); + } + + private ProtoRows createProtoRows(String[] messages) { + ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); + for (String message : messages) { + FooType foo = FooType.newBuilder().setFoo(message).build(); + rowsBuilder.addSerializedRows(foo.toByteString()); + } + return rowsBuilder.build(); + } + + ConnectionWorkerPool createConnectionWorkerPool(long maxRequests, long maxBytes) { + ConnectionWorkerPool.enableTestingLogic(); + return new ConnectionWorkerPool( + maxRequests, + maxBytes, + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + client, + /*ownsBigQueryWriteClient=*/false); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java index 5d8f05fff5..fab2274578 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java @@ -20,6 +20,7 @@ import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -40,7 +41,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase { private final LinkedBlockingQueue writeRequests = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue flushRequests = new LinkedBlockingQueue<>(); - private final LinkedBlockingQueue responses = new LinkedBlockingQueue<>(); + private final List responses = Collections.synchronizedList(new ArrayList<>()); private final LinkedBlockingQueue writeResponses = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue flushResponses = new LinkedBlockingQueue<>(); private final AtomicInteger nextMessageId = new AtomicInteger(1); @@ -143,6 +144,10 @@ public void onNext(AppendRowsRequest value) { LOG.fine("Get request:" + value.toString()); requests.add(value); recordCount++; + long offset = value.getOffset().getValue(); + if (offset == -1) { + offset = recordCount; + } if (responseSleep.compareTo(Duration.ZERO) > 0) { LOG.fine("Sleeping before response for " + responseSleep.toString()); Uninterruptibles.sleepUninterruptibly( @@ -168,7 +173,7 @@ public void onNext(AppendRowsRequest value) { LOG.info("Shutting down connection from test..."); responseObserver.onError(Status.ABORTED.asException()); } else { - final Response response = responses.remove(); + final Response response = responses.get((int) offset); sendResponse(response, responseObserver); } } From 644360a03df0e472824da86c06c24b72802ce780 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 20 Sep 2022 00:54:20 +0000 Subject: [PATCH 09/12] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3a83102c76..61faf06bac 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.1.1') +implementation platform('com.google.cloud:libraries-bom:26.1.2') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` From 44c36fc648b112a9574eda1c7c5e95fb8aa42c50 Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Tue, 20 Sep 2022 14:42:58 -0700 Subject: [PATCH 10/12] feat: wire multiplexing connection pool to stream writer --- .../storage/v1/ConnectionWorkerPool.java | 17 ++ .../bigquery/storage/v1/StreamWriter.java | 218 ++++++++++++++++-- .../storage/v1/ConnectionWorkerPoolTest.java | 46 +++- .../bigquery/storage/v1/StreamWriterTest.java | 21 +- 4 files changed, 276 insertions(+), 26 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 8cfd30a800..7b0d3a2964 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -167,6 +167,7 @@ public static Builder builder() { /** Builder for the options to config {@link ConnectionWorkerPool}. */ @AutoValue.Builder public abstract static class Builder { + // TODO(gaole) rename to per location for easier understanding. public abstract Builder setMinConnectionsPerPool(int value); public abstract Builder setMaxConnectionsPerPool(int value); @@ -387,4 +388,20 @@ int getCreateConnectionCount() { int getTotalConnectionCount() { return connectionWorkerPool.size(); } + + String getTraceId() { + return traceId; + } + + boolean ownsBigQueryWriteClient() { + return ownsBigQueryWriteClient; + } + + FlowController.LimitExceededBehavior limitExceededBehavior() { + return limitExceededBehavior; + } + + BigQueryWriteClient bigQueryWriteClient() { + return client; + } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index e869668818..180ee81d94 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -20,12 +20,19 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.auto.value.AutoOneOf; +import com.google.auto.value.AutoValue; +import com.google.cloud.bigquery.storage.v1.StreamWriter.Builder.ConnectionMode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; +import java.util.Map; +import java.util.Objects; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Logger; /** @@ -36,8 +43,6 @@ public class StreamWriter implements AutoCloseable { private static final Logger log = Logger.getLogger(StreamWriter.class.getName()); - private final ConnectionWorker connectionWorker; - /* * The identifier of stream to write to. */ @@ -51,11 +56,108 @@ public class StreamWriter implements AutoCloseable { */ private final String writerId = UUID.randomUUID().toString(); + /** + * Stream can access a single connection or a pool of connection depending on whether multiplexing + * is enabled. + */ + private final SingleConnectionOrConnectionPool singleConnectionOrConnectionPool; + + /** + * Static map from {@link ConnectionPoolKey} to connection pool. Note this map is static to be + * shared by every stream writer in the same process. + */ + private static final Map connectionPoolMap = + new ConcurrentHashMap<>(); + /** The maximum size of one request. Defined by the API. */ public static long getApiMaxRequestBytes() { return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) } + /** + * Connection pool with different key will be split. + * + *

Shard based only on location right now. + */ + @AutoValue + abstract static class ConnectionPoolKey { + abstract String location(); + + public static ConnectionPoolKey create(String location) { + return new AutoValue_StreamWriter_ConnectionPoolKey(location); + } + } + + /** + * When in single table mode, append directly to connectionWorker. Otherwise append to connection + * pool in multiplexing mode. + */ + @AutoOneOf(SingleConnectionOrConnectionPool.Kind.class) + public abstract static class SingleConnectionOrConnectionPool { + /** Kind of connection operation mode. */ + public enum Kind { + CONNECTION_WORKER, + CONNECTION_WORKER_POOL + } + + public abstract Kind getKind(); + + public abstract ConnectionWorker connectionWorker(); + + public abstract ConnectionWorkerPool connectionWorkerPool(); + + public ApiFuture append( + StreamWriter streamWriter, ProtoRows protoRows, long offset) { + if (getKind() == Kind.CONNECTION_WORKER) { + return connectionWorker() + .append(streamWriter.getStreamName(), streamWriter.getProtoSchema(), protoRows, offset); + } else { + return connectionWorkerPool().append(streamWriter, protoRows, offset); + } + } + + public void close(StreamWriter streamWriter) { + if (getKind() == Kind.CONNECTION_WORKER) { + connectionWorker().close(); + } else { + connectionWorkerPool().close(streamWriter); + } + } + + long getInflightWaitSeconds() { + if (getKind() == Kind.CONNECTION_WORKER_POOL) { + throw new IllegalStateException( + "getInflightWaitSeconds is not supported in multiplexing mode."); + } + return connectionWorker().getInflightWaitSeconds(); + } + + TableSchema getUpdatedSchema() { + if (getKind() == Kind.CONNECTION_WORKER_POOL) { + // TODO(gaole): implement updated schema support for multiplexing. + throw new IllegalStateException("getUpdatedSchema is not implemented for multiplexing."); + } + return connectionWorker().getUpdatedSchema(); + } + + String getWriterId(String streamWriterId) { + if (getKind() == Kind.CONNECTION_WORKER_POOL) { + return streamWriterId; + } + return connectionWorker().getWriterId(); + } + + public static SingleConnectionOrConnectionPool ofSingleConnection(ConnectionWorker connection) { + return AutoOneOf_StreamWriter_SingleConnectionOrConnectionPool.connectionWorker(connection); + } + + public static SingleConnectionOrConnectionPool ofConnectionPool( + ConnectionWorkerPool connectionPool) { + return AutoOneOf_StreamWriter_SingleConnectionOrConnectionPool.connectionWorkerPool( + connectionPool); + } + } + private StreamWriter(Builder builder) throws IOException { BigQueryWriteClient client; this.streamName = builder.streamName; @@ -78,16 +180,66 @@ private StreamWriter(Builder builder) throws IOException { client = builder.client; ownsBigQueryWriteClient = false; } - connectionWorker = - new ConnectionWorker( - builder.streamName, - builder.writerSchema, - builder.maxInflightRequest, - builder.maxInflightBytes, - builder.limitExceededBehavior, - builder.traceId, - client, - ownsBigQueryWriteClient); + if (builder.connectionMode == ConnectionMode.SINGLE_TABLE) { + this.singleConnectionOrConnectionPool = + SingleConnectionOrConnectionPool.ofSingleConnection( + new ConnectionWorker( + builder.streamName, + builder.writerSchema, + builder.maxInflightRequest, + builder.maxInflightBytes, + builder.limitExceededBehavior, + builder.traceId, + client, + ownsBigQueryWriteClient)); + } else { + if (builder.location == "") { + throw new IllegalArgumentException("Location must be specified for multiplexing client!"); + } + // Assume the connection in the same pool share the same client and trace id. + // The first StreamWriter for a new stub will create the pool for the other + // streams in the same region, meaning the per StreamWriter settings are no + // longer working unless all streams share the same set of settings + this.singleConnectionOrConnectionPool = + SingleConnectionOrConnectionPool.ofConnectionPool( + connectionPoolMap.computeIfAbsent( + ConnectionPoolKey.create(builder.location), + (key) -> + new ConnectionWorkerPool( + builder.maxInflightRequest, + builder.maxInflightBytes, + builder.limitExceededBehavior, + builder.traceId, + client, + ownsBigQueryWriteClient))); + validateFetchedConnectonPool(client, builder); + } + } + + // Validate whether the fetched connection pool matched certain properties. + private void validateFetchedConnectonPool( + BigQueryWriteClient client, StreamWriter.Builder builder) { + String paramsValidatedFailed = ""; + if (!Objects.equals( + this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(), + builder.traceId)) { + paramsValidatedFailed = "Trace id"; + } else if (!Objects.equals( + this.singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient(), + client)) { + paramsValidatedFailed = "Bigquery write client"; + } else if (!Objects.equals( + this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(), + builder.limitExceededBehavior)) { + paramsValidatedFailed = "Limit Exceeds Behavior"; + } + + if (!paramsValidatedFailed.isEmpty()) { + throw new IllegalArgumentException( + String.format( + "%s used for the same connection pool for the same location must be the same!", + paramsValidatedFailed)); + } } /** @@ -127,7 +279,7 @@ public ApiFuture append(ProtoRows rows) { * @return the append response wrapped in a future. */ public ApiFuture append(ProtoRows rows, long offset) { - return this.connectionWorker.append(streamName, writerSchema, rows, offset); + return this.singleConnectionOrConnectionPool.append(this, rows, offset); } /** @@ -139,12 +291,12 @@ public ApiFuture append(ProtoRows rows, long offset) { * stream case. */ public long getInflightWaitSeconds() { - return connectionWorker.getInflightWaitSeconds(); + return singleConnectionOrConnectionPool.getInflightWaitSeconds(); } /** @return a unique Id for the writer. */ public String getWriterId() { - return connectionWorker.getWriterId(); + return singleConnectionOrConnectionPool.getWriterId(writerId); } /** @return name of the Stream that this writer is working on. */ @@ -160,7 +312,7 @@ public ProtoSchema getProtoSchema() { /** Close the stream writer. Shut down all resources. */ @Override public void close() { - this.connectionWorker.close(); + singleConnectionOrConnectionPool.close(this); } /** @@ -179,11 +331,28 @@ public static StreamWriter.Builder newBuilder(String streamName) { /** Thread-safe getter of updated TableSchema */ public synchronized TableSchema getUpdatedSchema() { - return connectionWorker.getUpdatedSchema(); + return singleConnectionOrConnectionPool.getUpdatedSchema(); + } + + @VisibleForTesting + SingleConnectionOrConnectionPool.Kind getConnectionOperationType() { + return singleConnectionOrConnectionPool.getKind(); } /** A builder of {@link StreamWriter}s. */ public static final class Builder { + /** Operation mode for the internal connection pool. */ + public enum ConnectionMode { + // Create a connection per given write stream. + SINGLE_TABLE, + // Share a connection for multiple tables. This mode is only effective in default stream case. + // Some key characteristics: + // 1. tables within the same pool has to be in the same location. + // 2. Close(streamReference) will not close connection immediately until all tables on + // this connection is closed. + // 3. Try to use one stream per table at first and share stream later. + MULTIPLEXING + } private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L; @@ -210,10 +379,14 @@ public static final class Builder { private FlowController.LimitExceededBehavior limitExceededBehavior = FlowController.LimitExceededBehavior.Block; + private ConnectionMode connectionMode = ConnectionMode.SINGLE_TABLE; + private String traceId = null; private TableSchema updatedTableSchema = null; + private String location; + private Builder(String streamName) { this.streamName = Preconditions.checkNotNull(streamName); this.client = null; @@ -246,6 +419,11 @@ public Builder setEndpoint(String endpoint) { return this; } + public Builder enableConnectionPool() { + this.connectionMode = ConnectionMode.MULTIPLEXING; + return this; + } + /** * {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage * API endpoint. @@ -280,6 +458,12 @@ public Builder setTraceId(String traceId) { return this; } + /** Location of the table this stream writer is targeting. */ + public Builder setLocation(String location) { + this.location = location; + return this; + } + /** * Sets the limit exceeded behavior. * diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index 1bc180b814..8b865eb13a 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -73,7 +73,8 @@ public void setUp() throws Exception { @Test public void testSingleTableConnection_noOverwhelmedConnection() throws Exception { // Set the max requests count to a large value so we will not scaling up. - testSend100RequestsToMultiTable( + testSendRequestsToMultiTable( + /*requestToSend=*/ 100, /*maxRequests=*/ 100000, /*maxConnections=*/ 8, /*expectedConnectionCount=*/ 1, @@ -83,7 +84,8 @@ public void testSingleTableConnection_noOverwhelmedConnection() throws Exception @Test public void testSingleTableConnections_overwhelmed() throws Exception { // A connection will be considered overwhelmed when the requests count reach 5 (max 10). - testSend100RequestsToMultiTable( + testSendRequestsToMultiTable( + /*requestToSend=*/ 100, /*maxRequests=*/ 10, /*maxConnections=*/ 8, /*expectedConnectionCount=*/ 8, @@ -94,7 +96,8 @@ public void testSingleTableConnections_overwhelmed() throws Exception { public void testMultiTableConnection_noOverwhelmedConnection() throws Exception { // Set the max requests count to a large value so we will not scaling up. // All tables will share the two connections (2 becasue we set the min connections to be 2). - testSend100RequestsToMultiTable( + testSendRequestsToMultiTable( + /*requestToSend=*/ 100, /*maxRequests=*/ 100000, /*maxConnections=*/ 8, /*expectedConnectionCount=*/ 2, @@ -102,17 +105,44 @@ public void testMultiTableConnection_noOverwhelmedConnection() throws Exception } @Test - public void testMultiTableConnections_overwhelmed() throws Exception { + public void testMultiTableConnections_overwhelmed_reachingMaximum() throws Exception { // A connection will be considered overwhelmed when the requests count reach 5 (max 10). - testSend100RequestsToMultiTable( + testSendRequestsToMultiTable( + /*requestToSend=*/ 100, /*maxRequests=*/ 10, /*maxConnections=*/ 8, /*expectedConnectionCount=*/ 8, /*tableCount=*/ 4); } - private void testSend100RequestsToMultiTable( - int maxRequests, int maxConnections, int expectedConnectionCount, int tableCount) + @Test + public void testMultiTableConnections_overwhelmed_overTotalLimit() throws Exception { + // A connection will be considered overwhelmed when the requests count reach 5 (max 10). + testSendRequestsToMultiTable( + /*requestToSend=*/ 200, + /*maxRequests=*/ 10, + /*maxConnections=*/ 8, + /*expectedConnectionCount=*/ 8, + /*tableCount=*/ 10); + } + + @Test + public void testMultiTableConnections_overwhelmed_notReachingMaximum() throws Exception { + // A connection will be considered overwhelmed when the requests count reach 5 (max 10). + testSendRequestsToMultiTable( + /*requestToSend=*/ 20, + /*maxRequests=*/ 10, + /*maxConnections=*/ 8, + /*expectedConnectionCount=*/ 4, + /*tableCount=*/ 4); + } + + private void testSendRequestsToMultiTable( + int requestToSend, + int maxRequests, + int maxConnections, + int expectedConnectionCount, + int tableCount) throws IOException, ExecutionException, InterruptedException { ConnectionWorkerPool.setOptions( Settings.builder() @@ -126,7 +156,7 @@ private void testSend100RequestsToMultiTable( testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L)); // Try append 100 requests. - long appendCount = 100; + long appendCount = requestToSend; for (long i = 0; i < appendCount; i++) { testBigQueryWrite.addResponse(createAppendResponse(i)); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index ef50c40977..04725ba97b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -30,6 +30,7 @@ import com.google.api.gax.rpc.UnknownException; import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode; +import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind; import com.google.common.base.Strings; import com.google.protobuf.Any; import com.google.protobuf.DescriptorProtos; @@ -90,6 +91,15 @@ public void tearDown() throws Exception { serviceHelper.stop(); } + private StreamWriter getMultiplexingTestStreamWriter() throws IOException { + return StreamWriter.newBuilder(TEST_STREAM, client) + .setWriterSchema(createProtoSchema()) + .setTraceId(TEST_TRACE_ID) + .setLocation("US") + .enableConnectionPool() + .build(); + } + private StreamWriter getTestStreamWriter() throws IOException { return StreamWriter.newBuilder(TEST_STREAM, client) .setWriterSchema(createProtoSchema()) @@ -196,7 +206,6 @@ private void verifyAppendRequests(long appendCount) { } } - @Test public void testBuildBigQueryWriteClientInWriter() throws Exception { StreamWriter writer = StreamWriter.newBuilder(TEST_STREAM) @@ -703,6 +712,16 @@ public void testWriterId() Assert.assertNotEquals(writer1.getWriterId(), writer2.getWriterId()); } + @Test + public void testInitialization_operationKind() throws Exception { + try (StreamWriter streamWriter = getMultiplexingTestStreamWriter()) { + Assert.assertEquals(streamWriter.getConnectionOperationType(), Kind.CONNECTION_WORKER_POOL); + } + try (StreamWriter streamWriter = getTestStreamWriter()) { + Assert.assertEquals(streamWriter.getConnectionOperationType(), Kind.CONNECTION_WORKER); + } + } + // Timeout to ensure close() doesn't wait for done callback timeout. @Test(timeout = 10000) public void testCloseDisconnectedStream() throws Exception { From 87a403671697ae9b4e7ed8d6e1a9bde867396079 Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Fri, 23 Sep 2022 00:02:32 -0700 Subject: [PATCH 11/12] feat: some fixes for multiplexing client --- .../clirr-ignored-differences.xml | 20 +++++++ .../storage/v1/ConnectionWorkerPool.java | 37 ++++++++---- .../bigquery/storage/v1/JsonStreamWriter.java | 37 +++++++++++- .../bigquery/storage/v1/StreamWriter.java | 56 ++++++++++--------- .../storage/v1/ConnectionWorkerPoolTest.java | 8 +-- .../bigquery/storage/v1/StreamWriterTest.java | 22 +++++++- 6 files changed, 136 insertions(+), 44 deletions(-) diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index 69e67b9464..d8caaa07bf 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -40,4 +40,24 @@ com/google/cloud/bigquery/storage/v1/ConnectionWorker com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1.ProtoRows) + + 7002 + com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder + com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMaxConnectionsPerPool(int) + + + 7013 + com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder + com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMaxConnectionsPerRegion(int) + + + 7002 + com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder + com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMinConnectionsPerPool(int) + + + 7013 + com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder + com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMinConnectionsPerRegion(int) + diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 7b0d3a2964..cf87cbe180 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -17,6 +17,7 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.batching.FlowController; +import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.auto.value.AutoValue; import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load; import com.google.common.base.Stopwatch; @@ -153,24 +154,24 @@ public abstract static class Settings { * The minimum connections each pool created before trying to reuse the previously created * connection in multiplexing mode. */ - abstract int minConnectionsPerPool(); + abstract int minConnectionsPerRegion(); /** The maximum connections per connection pool. */ - abstract int maxConnectionsPerPool(); + abstract int maxConnectionsPerRegion(); public static Builder builder() { return new AutoValue_ConnectionWorkerPool_Settings.Builder() - .setMinConnectionsPerPool(2) - .setMaxConnectionsPerPool(10); + .setMinConnectionsPerRegion(2) + .setMaxConnectionsPerRegion(10); } /** Builder for the options to config {@link ConnectionWorkerPool}. */ @AutoValue.Builder public abstract static class Builder { // TODO(gaole) rename to per location for easier understanding. - public abstract Builder setMinConnectionsPerPool(int value); + public abstract Builder setMinConnectionsPerRegion(int value); - public abstract Builder setMaxConnectionsPerPool(int value); + public abstract Builder setMaxConnectionsPerRegion(int value); public abstract Settings build(); } @@ -192,7 +193,7 @@ public ConnectionWorkerPool( this.traceId = traceId; this.client = client; this.ownsBigQueryWriteClient = ownsBigQueryWriteClient; - this.currentMaxConnectionCount = settings.minConnectionsPerPool(); + this.currentMaxConnectionCount = settings.minConnectionsPerRegion(); } /** @@ -266,13 +267,13 @@ private ConnectionWorker createOrReuseConnectionWorker( ImmutableList.copyOf(connectionWorkerPool)); if (!existingBestConnection.getLoad().isOverwhelmed()) { return existingBestConnection; - } else if (currentMaxConnectionCount < settings.maxConnectionsPerPool()) { + } else if (currentMaxConnectionCount < settings.maxConnectionsPerRegion()) { // At this point, we have reached the connection cap and the selected connection is // overwhelmed, we can try scale up the connection pool. // The connection count will go up one by one until `maxConnectionsPerPool` is reached. currentMaxConnectionCount += 1; - if (currentMaxConnectionCount > settings.maxConnectionsPerPool()) { - currentMaxConnectionCount = settings.maxConnectionsPerPool(); + if (currentMaxConnectionCount > settings.maxConnectionsPerRegion()) { + currentMaxConnectionCount = settings.maxConnectionsPerRegion(); } return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema()); } else { @@ -323,6 +324,20 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w // Though atomic integer is super lightweight, add extra if check in case adding future logic. testValueCreateConnectionCount.getAndIncrement(); } + // TODO(gaole): figure out a better way to handle header / request body mismatch + // currently we use different header for the client in each connection worker to be different + // as the backend require the header to have the same write_stream field as request body. + BigQueryWriteClient clientAfterModification = client; + if (ownsBigQueryWriteClient) { + BigQueryWriteSettings settings = client.getSettings(); + BigQueryWriteSettings stubSettings = + settings.toBuilder() + .setHeaderProvider( + FixedHeaderProvider.create( + "x-goog-request-params", "write_stream=" + streamName)) + .build(); + clientAfterModification = BigQueryWriteClient.create(stubSettings); + } ConnectionWorker connectionWorker = new ConnectionWorker( streamName, @@ -331,7 +346,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w maxInflightBytes, limitExceededBehavior, traceId, - client, + clientAfterModification, ownsBigQueryWriteClient); connectionWorkerPool.add(connectionWorker); log.info( diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 508b51f02d..797bef015e 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -20,6 +20,7 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; +import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind; import com.google.common.base.Preconditions; import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors.Descriptor; @@ -60,6 +61,7 @@ public class JsonStreamWriter implements AutoCloseable { private long totalMessageSize = 0; private long absTotal = 0; private ProtoSchema protoSchema; + private boolean enableConnectionPool = false; /** * Constructs the JsonStreamWriter @@ -87,6 +89,7 @@ private JsonStreamWriter(Builder builder) builder.endpoint, builder.flowControlSettings, builder.traceId); + streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool); this.streamWriter = streamWriterBuilder.build(); this.streamName = builder.streamName; this.tableSchema = builder.tableSchema; @@ -124,8 +127,10 @@ public ApiFuture append(JSONArray jsonArr, long offset) throws IOException, DescriptorValidationException { // Handle schema updates in a Thread-safe way by locking down the operation synchronized (this) { - TableSchema updatedSchema = this.streamWriter.getUpdatedSchema(); - if (updatedSchema != null) { + // Update schema only work when connection pool is not enabled. + if (this.streamWriter.getConnectionOperationType() == Kind.CONNECTION_WORKER + && this.streamWriter.getUpdatedSchema() != null) { + TableSchema updatedSchema = this.streamWriter.getUpdatedSchema(); // Close the StreamWriter this.streamWriter.close(); // Update JsonStreamWriter's TableSchema and Descriptor @@ -301,6 +306,9 @@ public static final class Builder { private String traceId; private boolean ignoreUnknownFields = false; private boolean reconnectAfter10M = false; + // Indicte whether multiplexing mode is enabled. + private boolean enableConnectionPool = false; + private String location; private static String streamPatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+"; @@ -427,6 +435,31 @@ public Builder setReconnectAfter10M(boolean reconnectAfter10M) { return this; } + /** + * Enable multiplexing for this writer. In multiplexing mode tables will share the same + * connection if possible until the connection is overwhelmed. + * This feature is still under development, please contact write api team before using. + * + * @param enableConnectionPool + * @return Builder + */ + public Builder setEnableConnectionPool(boolean enableConnectionPool) { + this.enableConnectionPool = enableConnectionPool; + return this; + } + + /** + * Location of the table this stream writer is targeting. + * Connection pools are shared by location. + * + * @param location + * @return Builder + */ + public Builder setLocation(String location) { + this.location = location; + return this; + } + /** * Builds JsonStreamWriter * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 180ee81d94..e1c7089dbb 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -22,7 +22,6 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auto.value.AutoOneOf; import com.google.auto.value.AutoValue; -import com.google.cloud.bigquery.storage.v1.StreamWriter.Builder.ConnectionMode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.grpc.Status; @@ -33,6 +32,7 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; /** @@ -180,7 +180,7 @@ private StreamWriter(Builder builder) throws IOException { client = builder.client; ownsBigQueryWriteClient = false; } - if (builder.connectionMode == ConnectionMode.SINGLE_TABLE) { + if (!builder.enableConnectionPool) { this.singleConnectionOrConnectionPool = SingleConnectionOrConnectionPool.ofSingleConnection( new ConnectionWorker( @@ -212,22 +212,31 @@ private StreamWriter(Builder builder) throws IOException { builder.traceId, client, ownsBigQueryWriteClient))); - validateFetchedConnectonPool(client, builder); + validateFetchedConnectonPool(builder); + // Shut down the passed in client. Internally we will create another client inside connection + // pool for every new connection worker. + // TODO(gaole): instead of perform close outside of pool approach, change to always create + // new client in connection. + if (client != singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient() + && ownsBigQueryWriteClient) { + client.shutdown(); + try { + client.awaitTermination(150, TimeUnit.SECONDS); + } catch (InterruptedException unused) { + // Ignore interruption as this client is not used. + } + client.close(); + } } } // Validate whether the fetched connection pool matched certain properties. - private void validateFetchedConnectonPool( - BigQueryWriteClient client, StreamWriter.Builder builder) { + private void validateFetchedConnectonPool(StreamWriter.Builder builder) { String paramsValidatedFailed = ""; if (!Objects.equals( this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(), builder.traceId)) { paramsValidatedFailed = "Trace id"; - } else if (!Objects.equals( - this.singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient(), - client)) { - paramsValidatedFailed = "Bigquery write client"; } else if (!Objects.equals( this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(), builder.limitExceededBehavior)) { @@ -341,19 +350,6 @@ SingleConnectionOrConnectionPool.Kind getConnectionOperationType() { /** A builder of {@link StreamWriter}s. */ public static final class Builder { - /** Operation mode for the internal connection pool. */ - public enum ConnectionMode { - // Create a connection per given write stream. - SINGLE_TABLE, - // Share a connection for multiple tables. This mode is only effective in default stream case. - // Some key characteristics: - // 1. tables within the same pool has to be in the same location. - // 2. Close(streamReference) will not close connection immediately until all tables on - // this connection is closed. - // 3. Try to use one stream per table at first and share stream later. - MULTIPLEXING - } - private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L; private static final long DEFAULT_MAX_INFLIGHT_BYTES = 100 * 1024 * 1024; // 100Mb. @@ -379,14 +375,14 @@ public enum ConnectionMode { private FlowController.LimitExceededBehavior limitExceededBehavior = FlowController.LimitExceededBehavior.Block; - private ConnectionMode connectionMode = ConnectionMode.SINGLE_TABLE; - private String traceId = null; private TableSchema updatedTableSchema = null; private String location; + private boolean enableConnectionPool = false; + private Builder(String streamName) { this.streamName = Preconditions.checkNotNull(streamName); this.client = null; @@ -419,8 +415,16 @@ public Builder setEndpoint(String endpoint) { return this; } - public Builder enableConnectionPool() { - this.connectionMode = ConnectionMode.MULTIPLEXING; + /** + * Enable multiplexing for this writer. In multiplexing mode tables will share the same + * connection if possible until the connection is overwhelmed. + * This feature is still under development, please contact write api team before using. + * + * @param enableConnectionPool + * @return Builder + */ + public Builder setEnableConnectionPool(boolean enableConnectionPool) { + this.enableConnectionPool = enableConnectionPool; return this; } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index 8b865eb13a..fa551c0a6b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -146,8 +146,8 @@ private void testSendRequestsToMultiTable( throws IOException, ExecutionException, InterruptedException { ConnectionWorkerPool.setOptions( Settings.builder() - .setMinConnectionsPerPool(2) - .setMaxConnectionsPerPool(maxConnections) + .setMinConnectionsPerRegion(2) + .setMaxConnectionsPerRegion(maxConnections) .build()); ConnectionWorkerPool connectionWorkerPool = createConnectionWorkerPool(maxRequests, /*maxBytes=*/ 100000); @@ -201,7 +201,7 @@ private void testSendRequestsToMultiTable( @Test public void testMultiStreamClosed_multiplexingEnabled() throws Exception { ConnectionWorkerPool.setOptions( - Settings.builder().setMaxConnectionsPerPool(10).setMinConnectionsPerPool(5).build()); + Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build()); ConnectionWorkerPool connectionWorkerPool = createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 1000); @@ -250,7 +250,7 @@ public void testMultiStreamClosed_multiplexingEnabled() throws Exception { @Test public void testMultiStreamAppend_appendWhileClosing() throws Exception { ConnectionWorkerPool.setOptions( - Settings.builder().setMaxConnectionsPerPool(10).setMinConnectionsPerPool(5).build()); + Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build()); ConnectionWorkerPool connectionWorkerPool = createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 100000); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 04725ba97b..94abb11a6e 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -96,7 +96,7 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException { .setWriterSchema(createProtoSchema()) .setTraceId(TEST_TRACE_ID) .setLocation("US") - .enableConnectionPool() + .setEnableConnectionPool(true) .build(); } @@ -722,6 +722,26 @@ public void testInitialization_operationKind() throws Exception { } } + @Test + public void testInitializationTwice_closeSecondClient() throws Exception { + BigQueryWriteClient client2 = BigQueryWriteClient.create( + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider(serviceHelper.createChannelProvider()) + .build()); + + StreamWriter streamWriter1 = getMultiplexingTestStreamWriter(); + StreamWriter streamWriter2 = StreamWriter.newBuilder(TEST_STREAM, client2) + .setWriterSchema(createProtoSchema()) + .setTraceId(TEST_TRACE_ID) + .setLocation("US") + .setEnableConnectionPool(true) + .build(); + + // The second passed in client will be closed + assertTrue(client2.isShutdown()); + } + // Timeout to ensure close() doesn't wait for done callback timeout. @Test(timeout = 10000) public void testCloseDisconnectedStream() throws Exception { From 47893dfdb600748f317b2abf27b2cb3ceb6b8613 Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Mon, 26 Sep 2022 17:12:07 -0700 Subject: [PATCH 12/12] feat: fix some todos, and reject the mixed behavior of passed in client or not --- .../storage/v1/ConnectionWorkerPool.java | 14 ++- .../bigquery/storage/v1/StreamWriter.java | 87 +++++++++++-------- .../bigquery/storage/v1/StreamWriterTest.java | 20 +++++ 3 files changed, 80 insertions(+), 41 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index bdddeca12d..a59ce9907e 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -22,9 +22,11 @@ import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -324,17 +326,21 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w // Though atomic integer is super lightweight, add extra if check in case adding future logic. testValueCreateConnectionCount.getAndIncrement(); } - // TODO(gaole): figure out a better way to handle header / request body mismatch - // currently we use different header for the client in each connection worker to be different + // currently we use different header for the client in each connection worker to be different // as the backend require the header to have the same write_stream field as request body. BigQueryWriteClient clientAfterModification = client; if (ownsBigQueryWriteClient) { BigQueryWriteSettings settings = client.getSettings(); + + // Every header to write api is required to set write_stream in the header to help routing + // the request to correct region. + HashMap newHeaders = new HashMap<>(); + newHeaders.putAll(settings.toBuilder().getHeaderProvider().getHeaders()); + newHeaders.put("x-goog-request-params", "write_stream=" + streamName); BigQueryWriteSettings stubSettings = settings .toBuilder() - .setHeaderProvider( - FixedHeaderProvider.create("x-goog-request-params", "write_stream=" + streamName)) + .setHeaderProvider(FixedHeaderProvider.create(newHeaders)) .build(); clientAfterModification = BigQueryWriteClient.create(stubSettings); } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index be6c10dff8..afb26d4b26 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -62,6 +62,11 @@ public class StreamWriter implements AutoCloseable { */ private final SingleConnectionOrConnectionPool singleConnectionOrConnectionPool; + /** + * Test only param to tell how many times a client is created. + */ + private static int testOnlyClientCreatedTimes = 0; + /** * Static map from {@link ConnectionPoolKey} to connection pool. Note this map is static to be * shared by every stream writer in the same process. @@ -162,24 +167,7 @@ private StreamWriter(Builder builder) throws IOException { BigQueryWriteClient client; this.streamName = builder.streamName; this.writerSchema = builder.writerSchema; - boolean ownsBigQueryWriteClient; - if (builder.client == null) { - BigQueryWriteSettings stubSettings = - BigQueryWriteSettings.newBuilder() - .setCredentialsProvider(builder.credentialsProvider) - .setTransportChannelProvider(builder.channelProvider) - .setEndpoint(builder.endpoint) - // (b/185842996): Temporily fix this by explicitly providing the header. - .setHeaderProvider( - FixedHeaderProvider.create( - "x-goog-request-params", "write_stream=" + this.streamName)) - .build(); - client = BigQueryWriteClient.create(stubSettings); - ownsBigQueryWriteClient = true; - } else { - client = builder.client; - ownsBigQueryWriteClient = false; - } + boolean ownsBigQueryWriteClient = builder.client == null; if (!builder.enableConnectionPool) { this.singleConnectionOrConnectionPool = SingleConnectionOrConnectionPool.ofSingleConnection( @@ -190,7 +178,7 @@ private StreamWriter(Builder builder) throws IOException { builder.maxInflightBytes, builder.limitExceededBehavior, builder.traceId, - client, + getBigQueryWriteClient(builder), ownsBigQueryWriteClient)); } else { if (builder.location == "") { @@ -204,29 +192,39 @@ private StreamWriter(Builder builder) throws IOException { SingleConnectionOrConnectionPool.ofConnectionPool( connectionPoolMap.computeIfAbsent( ConnectionPoolKey.create(builder.location), - (key) -> - new ConnectionWorkerPool( + (key) -> { + try { + return new ConnectionWorkerPool( builder.maxInflightRequest, builder.maxInflightBytes, builder.limitExceededBehavior, builder.traceId, - client, - ownsBigQueryWriteClient))); + getBigQueryWriteClient(builder), + ownsBigQueryWriteClient); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); validateFetchedConnectonPool(builder); - // Shut down the passed in client. Internally we will create another client inside connection - // pool for every new connection worker. - // TODO(gaole): instead of perform close outside of pool approach, change to always create - // new client in connection. - if (client != singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient() - && ownsBigQueryWriteClient) { - client.shutdown(); - try { - client.awaitTermination(150, TimeUnit.SECONDS); - } catch (InterruptedException unused) { - // Ignore interruption as this client is not used. - } - client.close(); - } + } + } + + private BigQueryWriteClient getBigQueryWriteClient(Builder builder) throws IOException { + if (builder.client == null) { + BigQueryWriteSettings stubSettings = + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(builder.credentialsProvider) + .setTransportChannelProvider(builder.channelProvider) + .setEndpoint(builder.endpoint) + // (b/185842996): Temporily fix this by explicitly providing the header. + .setHeaderProvider( + FixedHeaderProvider.create( + "x-goog-request-params", "write_stream=" + this.streamName)) + .build(); + testOnlyClientCreatedTimes++; + return BigQueryWriteClient.create(stubSettings); + } else { + return builder.client; } } @@ -237,6 +235,10 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) { this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(), builder.traceId)) { paramsValidatedFailed = "Trace id"; + } else if (!Objects.equals( + this.singleConnectionOrConnectionPool.connectionWorkerPool().ownsBigQueryWriteClient(), + builder.client == null)) { + paramsValidatedFailed = "Whether using passed in clients"; } else if (!Objects.equals( this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(), builder.limitExceededBehavior)) { @@ -347,6 +349,17 @@ public synchronized TableSchema getUpdatedSchema() { SingleConnectionOrConnectionPool.Kind getConnectionOperationType() { return singleConnectionOrConnectionPool.getKind(); } + + @VisibleForTesting + static int getTestOnlyClientCreatedTimes() { + return testOnlyClientCreatedTimes; + } + + @VisibleForTesting + static void cleanUp() { + testOnlyClientCreatedTimes = 0; + connectionPoolMap.clear(); + } /** A builder of {@link StreamWriter}s. */ public static final class Builder { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 8ceeff4daf..80bbb76191 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -82,6 +82,7 @@ public void setUp() throws Exception { .setCredentialsProvider(NoCredentialsProvider.create()) .setTransportChannelProvider(serviceHelper.createChannelProvider()) .build()); + StreamWriter.cleanUp(); } @After @@ -89,6 +90,7 @@ public void tearDown() throws Exception { log.info("tearDown called"); client.close(); serviceHelper.stop(); + StreamWriter.cleanUp(); } private StreamWriter getMultiplexingTestStreamWriter() throws IOException { @@ -722,6 +724,24 @@ public void testInitialization_operationKind() throws Exception { } } + @Test + public void createStreamWithDifferentWhetherOwnsClient() throws Exception { + StreamWriter streamWriter1 = getMultiplexingTestStreamWriter(); + + assertThrows(IllegalArgumentException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + StreamWriter.newBuilder(TEST_STREAM) + .setWriterSchema(createProtoSchema()) + .setTraceId(TEST_TRACE_ID) + .setLocation("US") + .setEnableConnectionPool(true) + .build(); + } + }); + } + // Timeout to ensure close() doesn't wait for done callback timeout. @Test(timeout = 10000) public void testCloseDisconnectedStream() throws Exception {