Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support gRPC Compression #2197

Merged
merged 6 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/**
Expand Down Expand Up @@ -119,6 +120,10 @@ class ConnectionWorker implements AutoCloseable {
*/
private final String traceId;

/*
* Enables compression on the wire.
*/
private String compressorName = null;
/*
* Tracks current inflight requests in the stream.
*/
Expand Down Expand Up @@ -253,6 +258,7 @@ public ConnectionWorker(
Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
@Nullable String compressorName,
BigQueryWriteSettings clientSettings)
throws IOException {
this.lock = new ReentrantLock();
Expand All @@ -274,6 +280,7 @@ public ConnectionWorker(
this.traceId = traceId;
this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.compressorName = compressorName;
// Always recreate a client for connection worker.
HashMap<String, String> newHeaders = new HashMap<>();
newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders());
Expand Down Expand Up @@ -343,7 +350,8 @@ public void run(AppendRowsResponse response) {
public void run(Throwable finalStatus) {
doneCallback(finalStatus);
}
});
},
this.compressorName);
log.info("Finish connecting stream: " + streamName + " id: " + writerId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/** Pool of connections to accept appends and distirbute to different connections. */
Expand Down Expand Up @@ -91,6 +92,10 @@ public class ConnectionWorkerPool {
* TraceId for debugging purpose.
*/
private final String traceId;
/*
* Sets the compression to use for the calls
*/
private String compressorName;

/** Used for test on the number of times createWorker is called. */
private final AtomicInteger testValueCreateConnectionCount = new AtomicInteger(0);
Expand Down Expand Up @@ -199,12 +204,14 @@ public abstract static class Builder {
java.time.Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
@Nullable String comperssorName,
BigQueryWriteSettings clientSettings) {
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.maxRetryDuration = maxRetryDuration;
this.limitExceededBehavior = limitExceededBehavior;
this.traceId = traceId;
this.compressorName = comperssorName;
this.clientSettings = clientSettings;
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
}
Expand Down Expand Up @@ -379,6 +386,7 @@ private ConnectionWorker createConnectionWorker(
maxRetryDuration,
limitExceededBehavior,
traceId,
compressorName,
clientSettings);
connectionWorkerPool.add(connectionWorker);
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,17 @@ public Builder setLocation(String location) {
return this;
}

/**
* Sets the compression to use for the calls. The compressor must be of type gzip.
*
* @param compressorName
* @return Builder
*/
public Builder setCompressorName(String compressorName) {
this.schemaAwareStreamWriterBuilder.setCompressorName(compressorName);
return this;
}

/**
* Builds JsonStreamWriter
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class SchemaAwareStreamWriter<T> implements AutoCloseable {
private Descriptor descriptor;
private TableSchema tableSchema;
private ProtoSchema protoSchema;
private String CompressorName;

// During some sitaution we want to skip stream writer refresh for updated schema. e.g. when
// the user provides the table schema, we should always use that schema.
Expand Down Expand Up @@ -92,7 +93,8 @@ private SchemaAwareStreamWriter(Builder<T> builder)
builder.endpoint,
builder.flowControlSettings,
builder.traceIdBase,
builder.traceId);
builder.traceId,
builder.compressorName);
streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool);
streamWriterBuilder.setLocation(builder.location);
this.streamWriter = streamWriterBuilder.build();
Expand Down Expand Up @@ -276,7 +278,8 @@ private void setStreamWriterSettings(
@Nullable String endpoint,
@Nullable FlowControlSettings flowControlSettings,
@Nullable String traceIdBase,
@Nullable String traceId) {
@Nullable String traceId,
@Nullable String compressorName) {
if (channelProvider != null) {
streamWriterBuilder.setChannelProvider(channelProvider);
}
Expand Down Expand Up @@ -316,6 +319,9 @@ private void setStreamWriterSettings(
flowControlSettings.getLimitExceededBehavior());
}
}
if (compressorName != null) {
streamWriterBuilder.setCompressorName(compressorName);
}
}

/**
Expand Down Expand Up @@ -425,6 +431,7 @@ public static final class Builder<T> {
// Indicates whether multiplexing mode is enabled.
private boolean enableConnectionPool = false;
private String location;
private String compressorName;

private static final String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
Expand Down Expand Up @@ -609,6 +616,17 @@ public Builder<T> setLocation(String location) {
return this;
}

/**
* Sets the compression to use for the calls. The compressor must be of type gzip.
*
* @param compressorName
* @return Builder
*/
public Builder<T> setCompressorName(String compressorName) {
this.compressorName = compressorName;
return this;
}

/**
* Builds SchemaAwareStreamWriter
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
*/
package com.google.cloud.bigquery.storage.v1;

import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.BidiStreamingCallable;
import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
import io.grpc.CallOptions;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* StreamConnection is responsible for writing requests to a GRPC bidirecional connection.
Expand All @@ -43,11 +48,24 @@ class StreamConnection {
private RequestCallback requestCallback;
private DoneCallback doneCallback;

private static final Logger log = Logger.getLogger(StreamConnection.class.getName());

public StreamConnection(
BigQueryWriteClient client, RequestCallback requestCallback, DoneCallback doneCallback) {
BigQueryWriteClient client,
RequestCallback requestCallback,
DoneCallback doneCallback,
@Nullable String compressorName) {
this.requestCallback = requestCallback;
this.doneCallback = doneCallback;

ApiCallContext apiCallContext = null;
if (compressorName != null) {
apiCallContext =
GrpcCallContext.createDefault()
.withCallOptions(CallOptions.DEFAULT.withCompression(compressorName));
log.info("gRPC compression is enabled with " + compressorName + " compression");
}

bidiStreamingCallable = client.appendRowsCallable();
clientStream =
bidiStreamingCallable.splitCall(
Expand Down Expand Up @@ -75,7 +93,8 @@ public void onComplete() {
Status.fromCode(Code.CANCELLED)
.withDescription("Stream is closed by user.")));
}
});
},
apiCallContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ private StreamWriter(Builder builder) throws IOException {
builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.traceId,
builder.compressorName,
clientSettings));
} else {
if (!isDefaultStream(streamName)) {
Expand Down Expand Up @@ -276,6 +277,7 @@ private StreamWriter(Builder builder) throws IOException {
builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.traceId,
builder.compressorName,
client.getSettings());
}));
validateFetchedConnectonPool(builder);
Expand Down Expand Up @@ -598,6 +600,8 @@ public static final class Builder {

private java.time.Duration maxRetryDuration = Duration.ofMinutes(5);

private String compressorName = null;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down Expand Up @@ -716,6 +720,16 @@ public Builder setMaxRetryDuration(java.time.Duration maxRetryDuration) {
return this;
}

public Builder setCompressorName(String compressorName) {
Preconditions.checkNotNull(compressorName);
Preconditions.checkArgument(
compressorName.equals("gzip"),
"Compression of type \"%s\" isn't supported, only \"gzip\" compression is supported.",
compressorName);
this.compressorName = compressorName;
return this;
}

/** Builds the {@code StreamWriterV2}. */
public StreamWriter build() throws IOException {
return new StreamWriter(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ ConnectionWorkerPool createConnectionWorkerPool(
maxRetryDuration,
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
clientSettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ public void testAppendButInflightQueueFull() throws Exception {
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
client.getSettings());
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
ConnectionWorker.setMaxInflightQueueWaitTime(500);
Expand Down Expand Up @@ -388,6 +389,7 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception {
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
client.getSettings());
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
ConnectionWorker.setMaxInflightQueueWaitTime(500);
Expand Down Expand Up @@ -451,6 +453,7 @@ public void testLocationMismatch() throws Exception {
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
client.getSettings());
StatusRuntimeException ex =
assertThrows(
Expand Down Expand Up @@ -481,6 +484,7 @@ public void testStreamNameMismatch() throws Exception {
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
client.getSettings());
StatusRuntimeException ex =
assertThrows(
Expand Down Expand Up @@ -532,6 +536,7 @@ private ConnectionWorker createConnectionWorker(
maxRetryDuration,
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
client.getSettings());
}

Expand Down Expand Up @@ -625,6 +630,7 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
client.getSettings());
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3));

Expand Down Expand Up @@ -681,6 +687,7 @@ public void testLongTimeIdleWontFail() throws Exception {
Duration.ofSeconds(100),
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
client.getSettings());

long appendCount = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1452,4 +1452,20 @@ public void testAppendWithMissingValueMap() throws Exception {
missingValueMap);
}
}

@Test
public void testWrongCompressionType() throws Exception {
IllegalArgumentException ex =
assertThrows(
IllegalArgumentException.class,
() -> {
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA)
.setCompressorName("not-gzip")
.build();
});
assertTrue(
ex.getMessage()
.contains(
"Compression of type \"not-gzip\" isn't supported, only \"gzip\" compression is supported."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,20 @@ public void testMessageTooLarge() throws Exception {
writer.close();
}

@Test
public void testWrongCompressionType() throws Exception {
IllegalArgumentException ex =
assertThrows(
IllegalArgumentException.class,
() -> {
StreamWriter.newBuilder(TEST_STREAM_1, client).setCompressorName("not-gzip").build();
});
assertTrue(
ex.getMessage()
.contains(
"Compression of type \"not-gzip\" isn't supported, only \"gzip\" compression is supported."));
}

@Test
public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
Expand Down