diff --git a/README.md b/README.md
index b208a29cdd..712bb3034e 100644
--- a/README.md
+++ b/README.md
@@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:
```Groovy
-implementation platform('com.google.cloud:libraries-bom:26.4.0')
+implementation platform('com.google.cloud:libraries-bom:26.5.0')
implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
If you are using Gradle without BOM, add this to your dependencies:
```Groovy
-implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.3'
+implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.4'
```
If you are using SBT, add this to your dependencies:
```Scala
-libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.3"
+libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.4"
```
## Authentication
diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
index c55b8a691c..9833dbb1f3 100644
--- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml
+++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
@@ -115,7 +115,7 @@
7009
com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool
- ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)
+ ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings)
7009
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
index 28f1f033d2..b3b2c19199 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
@@ -222,7 +222,6 @@ public ConnectionWorker(
Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription("Writer schema must be provided when building this writer."));
}
- this.writerSchema = writerSchema;
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.limitExceededBehavior = limitExceededBehavior;
@@ -432,7 +431,7 @@ private void appendLoop() {
// Indicate whether we are at the first request after switching destination.
// True means the schema and other metadata are needed.
- boolean firstRequestForDestinationSwitch = true;
+ boolean firstRequestForTableOrSchemaSwitch = true;
// Represent whether we have entered multiplexing.
boolean isMultiplexing = false;
@@ -483,25 +482,35 @@ private void appendLoop() {
resetConnection();
// Set firstRequestInConnection to indicate the next request to be sent should include
// metedata. Reset everytime after reconnection.
- firstRequestForDestinationSwitch = true;
+ firstRequestForTableOrSchemaSwitch = true;
}
while (!localQueue.isEmpty()) {
AppendRowsRequest originalRequest = localQueue.pollFirst().message;
AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder();
-
- // Consider we enter multiplexing if we met a different non empty stream name.
- if (!originalRequest.getWriteStream().isEmpty()
- && !streamName.isEmpty()
- && !originalRequest.getWriteStream().equals(streamName)) {
+ // Always respect the first writer schema seen by the loop.
+ if (writerSchema == null) {
+ writerSchema = originalRequest.getProtoRows().getWriterSchema();
+ }
+ // Consider we enter multiplexing if we met a different non empty stream name or we meet
+ // a new schema for the same stream name.
+ // For the schema comparision we don't use message differencer to speed up the comparing
+ // process. `equals(...)` can bring us false positive, e.g. two repeated field can be
+ // considered the same but is not considered equals(). However as long as it's never provide
+ // false negative we will always correctly pass writer schema to backend.
+ if ((!originalRequest.getWriteStream().isEmpty()
+ && !streamName.isEmpty()
+ && !originalRequest.getWriteStream().equals(streamName))
+ || (originalRequest.getProtoRows().hasWriterSchema()
+ && !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema))) {
streamName = originalRequest.getWriteStream();
+ writerSchema = originalRequest.getProtoRows().getWriterSchema();
isMultiplexing = true;
- firstRequestForDestinationSwitch = true;
+ firstRequestForTableOrSchemaSwitch = true;
}
- if (firstRequestForDestinationSwitch) {
+ if (firstRequestForTableOrSchemaSwitch) {
// If we are at the first request for every table switch, including the first request in
// the connection, we will attach both stream name and table schema to the request.
- // We don't support change of schema change during multiplexing for the saeme stream name.
destinationSet.add(streamName);
if (this.traceId != null) {
originalRequestBuilder.setTraceId(this.traceId);
@@ -511,17 +520,11 @@ private void appendLoop() {
originalRequestBuilder.clearWriteStream();
}
- // We don't use message differencer to speed up the comparing process.
- // `equals(...)` can bring us false positive, e.g. two repeated field can be considered the
- // same but is not considered equals(). However as long as it's never provide false negative
- // we will always correctly pass writer schema to backend.
- if (firstRequestForDestinationSwitch
- || !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema)) {
- writerSchema = originalRequest.getProtoRows().getWriterSchema();
- } else {
+ // During non table/schema switch requests, clear writer schema.
+ if (!firstRequestForTableOrSchemaSwitch) {
originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema();
}
- firstRequestForDestinationSwitch = false;
+ firstRequestForTableOrSchemaSwitch = false;
// Send should only throw an exception if there is a problem with the request. The catch
// block will handle this case, and return the exception with the result.
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java
index 540269d734..6cc3247279 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java
@@ -247,10 +247,10 @@ public void testAppendInSameStream_switchSchema() throws Exception {
// We will get the request as the pattern of:
// (writer_stream: t1, schema: schema1)
// (writer_stream: _, schema: _)
- // (writer_stream: _, schema: schema3)
- // (writer_stream: _, schema: _)
- // (writer_stream: _, schema: schema1)
- // (writer_stream: _, schema: _)
+ // (writer_stream: t1, schema: schema3)
+ // (writer_stream: t1, schema: _)
+ // (writer_stream: t1, schema: schema1)
+ // (writer_stream: t1, schema: _)
switch (i % 4) {
case 0:
if (i == 0) {
@@ -261,19 +261,23 @@ public void testAppendInSameStream_switchSchema() throws Exception {
.isEqualTo("foo");
break;
case 1:
- assertThat(serverRequest.getWriteStream()).isEmpty();
+ if (i == 1) {
+ assertThat(serverRequest.getWriteStream()).isEmpty();
+ } else {
+ assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
+ }
// Schema is empty if not at the first request after table switch.
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
break;
case 2:
- assertThat(serverRequest.getWriteStream()).isEmpty();
+ assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
// Schema is populated after table switch.
assertThat(
serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())
.isEqualTo("bar");
break;
case 3:
- assertThat(serverRequest.getWriteStream()).isEmpty();
+ assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
// Schema is empty if not at the first request after table switch.
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
break;