Skip to content

Commit

Permalink
feat: Support gRPC Compression (#2197)
Browse files Browse the repository at this point in the history
* Support gRPC compression

* Ignore error for method signature change

* Clean some debugging leftovers

* Remove exception from clirr-ignored-differences.xml to try and get rid of the conflict with #2192

* Remove excessive arg verification and leave it only in the StreamWriter
  • Loading branch information
yifatgortler authored Jul 21, 2023
1 parent 4897c05 commit 642e345
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/**
Expand Down Expand Up @@ -119,6 +120,10 @@ class ConnectionWorker implements AutoCloseable {
*/
private final String traceId;

/*
* Enables compression on the wire.
*/
private String compressorName = null;
/*
* Tracks current inflight requests in the stream.
*/
Expand Down Expand Up @@ -253,6 +258,7 @@ public ConnectionWorker(
Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
@Nullable String compressorName,
BigQueryWriteSettings clientSettings)
throws IOException {
this.lock = new ReentrantLock();
Expand All @@ -274,6 +280,7 @@ public ConnectionWorker(
this.traceId = traceId;
this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.compressorName = compressorName;
// Always recreate a client for connection worker.
HashMap<String, String> newHeaders = new HashMap<>();
newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders());
Expand Down Expand Up @@ -343,7 +350,8 @@ public void run(AppendRowsResponse response) {
public void run(Throwable finalStatus) {
doneCallback(finalStatus);
}
});
},
this.compressorName);
log.info("Finish connecting stream: " + streamName + " id: " + writerId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/** Pool of connections to accept appends and distirbute to different connections. */
Expand Down Expand Up @@ -91,6 +92,10 @@ public class ConnectionWorkerPool {
* TraceId for debugging purpose.
*/
private final String traceId;
/*
* Sets the compression to use for the calls
*/
private String compressorName;

/** Used for test on the number of times createWorker is called. */
private final AtomicInteger testValueCreateConnectionCount = new AtomicInteger(0);
Expand Down Expand Up @@ -199,12 +204,14 @@ public abstract static class Builder {
java.time.Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
@Nullable String comperssorName,
BigQueryWriteSettings clientSettings) {
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.maxRetryDuration = maxRetryDuration;
this.limitExceededBehavior = limitExceededBehavior;
this.traceId = traceId;
this.compressorName = comperssorName;
this.clientSettings = clientSettings;
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
}
Expand Down Expand Up @@ -379,6 +386,7 @@ private ConnectionWorker createConnectionWorker(
maxRetryDuration,
limitExceededBehavior,
traceId,
compressorName,
clientSettings);
connectionWorkerPool.add(connectionWorker);
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,17 @@ public Builder setLocation(String location) {
return this;
}

/**
* Sets the compression to use for the calls. The compressor must be of type gzip.
*
* @param compressorName
* @return Builder
*/
public Builder setCompressorName(String compressorName) {
this.schemaAwareStreamWriterBuilder.setCompressorName(compressorName);
return this;
}

/**
* Builds JsonStreamWriter
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class SchemaAwareStreamWriter<T> implements AutoCloseable {
private Descriptor descriptor;
private TableSchema tableSchema;
private ProtoSchema protoSchema;
private String CompressorName;

// During some sitaution we want to skip stream writer refresh for updated schema. e.g. when
// the user provides the table schema, we should always use that schema.
Expand Down Expand Up @@ -92,7 +93,8 @@ private SchemaAwareStreamWriter(Builder<T> builder)
builder.endpoint,
builder.flowControlSettings,
builder.traceIdBase,
builder.traceId);
builder.traceId,
builder.compressorName);
streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool);
streamWriterBuilder.setLocation(builder.location);
this.streamWriter = streamWriterBuilder.build();
Expand Down Expand Up @@ -276,7 +278,8 @@ private void setStreamWriterSettings(
@Nullable String endpoint,
@Nullable FlowControlSettings flowControlSettings,
@Nullable String traceIdBase,
@Nullable String traceId) {
@Nullable String traceId,
@Nullable String compressorName) {
if (channelProvider != null) {
streamWriterBuilder.setChannelProvider(channelProvider);
}
Expand Down Expand Up @@ -316,6 +319,9 @@ private void setStreamWriterSettings(
flowControlSettings.getLimitExceededBehavior());
}
}
if (compressorName != null) {
streamWriterBuilder.setCompressorName(compressorName);
}
}

/**
Expand Down Expand Up @@ -425,6 +431,7 @@ public static final class Builder<T> {
// Indicates whether multiplexing mode is enabled.
private boolean enableConnectionPool = false;
private String location;
private String compressorName;

private static final String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
Expand Down Expand Up @@ -609,6 +616,17 @@ public Builder<T> setLocation(String location) {
return this;
}

/**
* Sets the compression to use for the calls. The compressor must be of type gzip.
*
* @param compressorName
* @return Builder
*/
public Builder<T> setCompressorName(String compressorName) {
this.compressorName = compressorName;
return this;
}

/**
* Builds SchemaAwareStreamWriter
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
*/
package com.google.cloud.bigquery.storage.v1;

import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.BidiStreamingCallable;
import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
import io.grpc.CallOptions;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* StreamConnection is responsible for writing requests to a GRPC bidirecional connection.
Expand All @@ -43,11 +48,24 @@ class StreamConnection {
private RequestCallback requestCallback;
private DoneCallback doneCallback;

private static final Logger log = Logger.getLogger(StreamConnection.class.getName());

public StreamConnection(
BigQueryWriteClient client, RequestCallback requestCallback, DoneCallback doneCallback) {
BigQueryWriteClient client,
RequestCallback requestCallback,
DoneCallback doneCallback,
@Nullable String compressorName) {
this.requestCallback = requestCallback;
this.doneCallback = doneCallback;

ApiCallContext apiCallContext = null;
if (compressorName != null) {
apiCallContext =
GrpcCallContext.createDefault()
.withCallOptions(CallOptions.DEFAULT.withCompression(compressorName));
log.info("gRPC compression is enabled with " + compressorName + " compression");
}

bidiStreamingCallable = client.appendRowsCallable();
clientStream =
bidiStreamingCallable.splitCall(
Expand Down Expand Up @@ -75,7 +93,8 @@ public void onComplete() {
Status.fromCode(Code.CANCELLED)
.withDescription("Stream is closed by user.")));
}
});
},
apiCallContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ private StreamWriter(Builder builder) throws IOException {
builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.traceId,
builder.compressorName,
clientSettings));
} else {
if (!isDefaultStream(streamName)) {
Expand Down Expand Up @@ -276,6 +277,7 @@ private StreamWriter(Builder builder) throws IOException {
builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.traceId,
builder.compressorName,
client.getSettings());
}));
validateFetchedConnectonPool(builder);
Expand Down Expand Up @@ -598,6 +600,8 @@ public static final class Builder {

private java.time.Duration maxRetryDuration = Duration.ofMinutes(5);

private String compressorName = null;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down Expand Up @@ -716,6 +720,16 @@ public Builder setMaxRetryDuration(java.time.Duration maxRetryDuration) {
return this;
}

public Builder setCompressorName(String compressorName) {
Preconditions.checkNotNull(compressorName);
Preconditions.checkArgument(
compressorName.equals("gzip"),
"Compression of type \"%s\" isn't supported, only \"gzip\" compression is supported.",
compressorName);
this.compressorName = compressorName;
return this;
}

/** Builds the {@code StreamWriterV2}. */
public StreamWriter build() throws IOException {
return new StreamWriter(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ ConnectionWorkerPool createConnectionWorkerPool(
maxRetryDuration,
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
clientSettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ public void testAppendButInflightQueueFull() throws Exception {
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
client.getSettings());
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
ConnectionWorker.setMaxInflightQueueWaitTime(500);
Expand Down Expand Up @@ -388,6 +389,7 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception {
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
client.getSettings());
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
ConnectionWorker.setMaxInflightQueueWaitTime(500);
Expand Down Expand Up @@ -451,6 +453,7 @@ public void testLocationMismatch() throws Exception {
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
client.getSettings());
StatusRuntimeException ex =
assertThrows(
Expand Down Expand Up @@ -481,6 +484,7 @@ public void testStreamNameMismatch() throws Exception {
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
client.getSettings());
StatusRuntimeException ex =
assertThrows(
Expand Down Expand Up @@ -532,6 +536,7 @@ private ConnectionWorker createConnectionWorker(
maxRetryDuration,
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
client.getSettings());
}

Expand Down Expand Up @@ -625,6 +630,7 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
client.getSettings());
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3));

Expand Down Expand Up @@ -681,6 +687,7 @@ public void testLongTimeIdleWontFail() throws Exception {
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
client.getSettings());

long appendCount = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1452,4 +1452,20 @@ public void testAppendWithMissingValueMap() throws Exception {
missingValueMap);
}
}

@Test
public void testWrongCompressionType() throws Exception {
IllegalArgumentException ex =
assertThrows(
IllegalArgumentException.class,
() -> {
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA)
.setCompressorName("not-gzip")
.build();
});
assertTrue(
ex.getMessage()
.contains(
"Compression of type \"not-gzip\" isn't supported, only \"gzip\" compression is supported."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,20 @@ public void testMessageTooLarge() throws Exception {
writer.close();
}

@Test
public void testWrongCompressionType() throws Exception {
IllegalArgumentException ex =
assertThrows(
IllegalArgumentException.class,
() -> {
StreamWriter.newBuilder(TEST_STREAM_1, client).setCompressorName("not-gzip").build();
});
assertTrue(
ex.getMessage()
.contains(
"Compression of type \"not-gzip\" isn't supported, only \"gzip\" compression is supported."));
}

@Test
public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
Expand Down

0 comments on commit 642e345

Please sign in to comment.