Skip to content

Commit

Permalink
feat: add multiplexing support to connection worker. (#1784)
Browse files Browse the repository at this point in the history
* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] authored Sep 16, 2022
1 parent 2989c1a commit a869a1d
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 42 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.20.1'
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.21.0'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.20.1"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.21.0"
```

## Authentication
Expand Down
15 changes: 15 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,19 @@
<className>com/google/cloud/bigquery/storage/v1/Exceptions$StreamWriterClosedException</className>
<method>Exceptions$StreamWriterClosedException(io.grpc.Status, java.lang.String)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
<method>com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1.ProtoRows, long)</method>
</difference>
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
<method>com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1.ProtoRows, long)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
<method>com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1.ProtoRows)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
Expand All @@ -47,6 +49,8 @@
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
* <p>TODO: Support batching.
*
* <p>TODO: support updated schema
*/
public class ConnectionWorker implements AutoCloseable {
private static final Logger log = Logger.getLogger(StreamWriter.class.getName());
Expand All @@ -56,14 +60,15 @@ public class ConnectionWorker implements AutoCloseable {
private Condition inflightReduced;

/*
* The identifier of stream to write to.
* The identifier of the current stream to write to. This stream name can change during
* multiplexing.
*/
private final String streamName;
private String streamName;

/*
* The proto schema of rows to write.
* The proto schema of rows to write. This schema can change during multiplexing.
*/
private final ProtoSchema writerSchema;
private ProtoSchema writerSchema;

/*
* Max allowed inflight requests in the stream. Method append is blocked at this.
Expand Down Expand Up @@ -142,6 +147,11 @@ public class ConnectionWorker implements AutoCloseable {
@GuardedBy("lock")
private final Deque<AppendRequestAndResponse> inflightRequestQueue;

/*
* Tracks number of destinations handled by this connection.
*/
private final Set<String> destinationSet = ConcurrentHashMap.newKeySet();

/*
* Contains the updated TableSchema.
*/
Expand Down Expand Up @@ -241,18 +251,16 @@ public void run(Throwable finalStatus) {
});
}

/** Schedules the writing of rows at the end of current stream. */
public ApiFuture<AppendRowsResponse> append(ProtoRows rows) {
return append(rows, -1);
}

/** Schedules the writing of rows at given offset. */
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
ApiFuture<AppendRowsResponse> append(
String streamName, ProtoSchema writerSchema, ProtoRows rows, long offset) {
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build());
requestBuilder.setProtoRows(
ProtoData.newBuilder().setWriterSchema(writerSchema).setRows(rows).build());
if (offset >= 0) {
requestBuilder.setOffset(Int64Value.of(offset));
}
requestBuilder.setWriteStream(streamName);
return appendInternal(requestBuilder.build());
}

Expand Down Expand Up @@ -381,9 +389,13 @@ public void close() {
private void appendLoop() {
Deque<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();
boolean streamNeedsConnecting = false;
// Set firstRequestInConnection to true immediately after connecting the steam,
// indicates then next row sent, needs the schema and other metadata.
boolean isFirstRequestInConnection = true;

// Indicate whether we are at the first request after switching destination.
// True means the schema and other metadata are needed.
boolean firstRequestForDestinationSwitch = true;
// Represent whether we have entered multiplexing.
boolean isMultiplexing = false;

while (!waitingQueueDrained()) {
this.lock.lock();
try {
Expand Down Expand Up @@ -430,13 +442,43 @@ private void appendLoop() {
}
resetConnection();
// Set firstRequestInConnection to indicate the next request to be sent should include
// metedata.
isFirstRequestInConnection = true;
// metedata. Reset everytime after reconnection.
firstRequestForDestinationSwitch = true;
}
while (!localQueue.isEmpty()) {
AppendRowsRequest preparedRequest =
prepareRequestBasedOnPosition(
localQueue.pollFirst().message, isFirstRequestInConnection);
AppendRowsRequest originalRequest = localQueue.pollFirst().message;
AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder();

// Consider we enter multiplexing if we met a different non empty stream name.
if (!originalRequest.getWriteStream().isEmpty()
&& !streamName.isEmpty()
&& !originalRequest.getWriteStream().equals(streamName)) {
streamName = originalRequest.getWriteStream();
writerSchema = originalRequest.getProtoRows().getWriterSchema();
isMultiplexing = true;
firstRequestForDestinationSwitch = true;
}

if (firstRequestForDestinationSwitch) {
// If we are at the first request for every table switch, including the first request in
// the connection, we will attach both stream name and table schema to the request.
// We don't support change of schema change during multiplexing for the saeme stream name.
destinationSet.add(streamName);
if (this.traceId != null) {
originalRequestBuilder.setTraceId(this.traceId);
}
firstRequestForDestinationSwitch = false;
} else if (isMultiplexing) {
// If we are not at the first request after table switch, but we are in multiplexing
// mode, we only need the stream name but not the schema in the request.
originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema();
} else {
// If we are not at the first request or in multiplexing, create request with no schema
// and no stream name.
originalRequestBuilder.clearWriteStream();
originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema();
}

// Send should only throw an exception if there is a problem with the request. The catch
// block will handle this case, and return the exception with the result.
// Otherwise send will return:
Expand All @@ -446,8 +488,7 @@ private void appendLoop() {
// TODO: Handle NOT_ENOUGH_QUOTA.
// In the close case, the request is in the inflight queue, and will either be returned
// to the user with an error, or will be resent.
this.streamConnection.send(preparedRequest);
isFirstRequestInConnection = false;
this.streamConnection.send(originalRequestBuilder.build());
}
}

Expand Down Expand Up @@ -512,24 +553,6 @@ private void waitForDoneCallback(long duration, TimeUnit timeUnit) {
return;
}

private AppendRowsRequest prepareRequestBasedOnPosition(
AppendRowsRequest original, boolean isFirstRequest) {
AppendRowsRequest.Builder requestBuilder = original.toBuilder();
if (isFirstRequest) {
if (this.writerSchema != null) {
requestBuilder.getProtoRowsBuilder().setWriterSchema(this.writerSchema);
}
requestBuilder.setWriteStream(this.streamName);
if (this.traceId != null) {
requestBuilder.setTraceId(this.traceId);
}
} else {
requestBuilder.clearWriteStream();
requestBuilder.getProtoRowsBuilder().clearWriterSchema();
}
return requestBuilder.build();
}

private void cleanupInflightRequests() {
Throwable finalStatus =
new Exceptions.StreamWriterClosedException(
Expand Down Expand Up @@ -676,6 +699,16 @@ private static final class AppendRequestAndResponse {
}
}

/** Returns the current workload of this worker. */
public Load getLoad() {
return Load.create(
inflightBytes,
inflightRequests,
destinationSet.size(),
maxInflightBytes,
maxInflightRequests);
}

/**
* Represent the current workload for this worker. Used for multiplexing algorithm to determine
* the distribution of requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class StreamWriter implements AutoCloseable {
*/
private final String streamName;

/** Every writer has a fixed proto schema. */
private final ProtoSchema writerSchema;

/*
* A String that uniquely identifies this writer.
*/
Expand All @@ -56,6 +59,7 @@ public static long getApiMaxRequestBytes() {
private StreamWriter(Builder builder) throws IOException {
BigQueryWriteClient client;
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
boolean ownsBigQueryWriteClient;
if (builder.client == null) {
BigQueryWriteSettings stubSettings =
Expand Down Expand Up @@ -123,7 +127,7 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows) {
* @return the append response wrapped in a future.
*/
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
return this.connectionWorker.append(rows, offset);
return this.connectionWorker.append(streamName, writerSchema, rows, offset);
}

/**
Expand Down
Loading

0 comments on commit a869a1d

Please sign in to comment.