Skip to content

Commit

Permalink
fix: remove applying header for multiplexing client and add a unit te…
Browse files Browse the repository at this point in the history
…st for multiplexing with different location (#1850)

* .

* .

* .

* .

* .

* .
  • Loading branch information
yirutang authored Oct 27, 2022
1 parent 66853c2 commit 1733d5a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -327,19 +325,6 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
}
// currently we use different header for the client in each connection worker to be different
// as the backend require the header to have the same write_stream field as request body.
BigQueryWriteClient clientAfterModification = client;
if (ownsBigQueryWriteClient) {
BigQueryWriteSettings settings = client.getSettings();

// Every header to write api is required to set write_stream in the header to help routing
// the request to correct region.
HashMap<String, String> newHeaders = new HashMap<>();
newHeaders.putAll(settings.toBuilder().getHeaderProvider().getHeaders());
newHeaders.put("x-goog-request-params", "write_stream=" + streamName);
BigQueryWriteSettings stubSettings =
settings.toBuilder().setHeaderProvider(FixedHeaderProvider.create(newHeaders)).build();
clientAfterModification = BigQueryWriteClient.create(stubSettings);
}
ConnectionWorker connectionWorker =
new ConnectionWorker(
streamName,
Expand All @@ -348,7 +333,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
maxInflightBytes,
limitExceededBehavior,
traceId,
clientAfterModification,
client,
ownsBigQueryWriteClient);
connectionWorkerPool.add(connectionWorker);
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,16 +243,6 @@ public void testBatchWriteWithCommittedStreamEU()
streamWriter.append(CreateProtoRows(new String[] {"ddd"}), 3);
assertEquals(1, response1.get().getAppendResult().getOffset().getValue());
assertEquals(3, response2.get().getAppendResult().getOffset().getValue());

TableResult result =
bigquery.listTableData(
tableInfoEU.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter = result.getValues().iterator();
assertEquals("aaa", iter.next().get(0).getStringValue());
assertEquals("bbb", iter.next().get(0).getStringValue());
assertEquals("ccc", iter.next().get(0).getStringValue());
assertEquals("ddd", iter.next().get(0).getStringValue());
assertEquals(false, iter.hasNext());
}

@Test
Expand Down Expand Up @@ -1256,4 +1246,54 @@ public void testStreamReconnect() throws IOException, InterruptedException, Exec
assertEquals(1L, response.get().getAppendResult().getOffset().getValue());
}
}

@Test
public void testMultiplexingMixedLocation()
throws IOException, InterruptedException, ExecutionException {
ConnectionWorkerPool.setOptions(
ConnectionWorkerPool.Settings.builder()
.setMinConnectionsPerRegion(1)
.setMaxConnectionsPerRegion(2)
.build());
String defaultStream1 =
String.format(
"projects/%s/datasets/%s/tables/%s/streams/_default",
ServiceOptions.getDefaultProjectId(), DATASET, TABLE);
String defaultStream2 =
String.format(
"projects/%s/datasets/%s/tables/%s/streams/_default",
ServiceOptions.getDefaultProjectId(), DATASET, TABLE2);
String defaultStream3 =
String.format(
"projects/%s/datasets/%s/tables/%s/streams/_default",
ServiceOptions.getDefaultProjectId(), DATASET_EU, TABLE);

StreamWriter streamWriter1 =
StreamWriter.newBuilder(defaultStream1)
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
.setEnableConnectionPool(true)
.build();
StreamWriter streamWriter2 =
StreamWriter.newBuilder(defaultStream2)
.setWriterSchema(ProtoSchemaConverter.convert(ComplicateType.getDescriptor()))
.setEnableConnectionPool(true)
.build();
StreamWriter streamWriter3 =
StreamWriter.newBuilder(defaultStream3)
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
.setEnableConnectionPool(true)
.build();
ApiFuture<AppendRowsResponse> response1 =
streamWriter1.append(CreateProtoRows(new String[] {"aaa"}));
ApiFuture<AppendRowsResponse> response2 =
streamWriter2.append(CreateProtoRowsComplex(new String[] {"aaa"}));
ApiFuture<AppendRowsResponse> response3 =
streamWriter3.append(CreateProtoRows(new String[] {"bbb"}));
assertEquals(0L, response1.get().getAppendResult().getOffset().getValue());
assertEquals(0L, response2.get().getAppendResult().getOffset().getValue());
assertEquals(0L, response3.get().getAppendResult().getOffset().getValue());
assertEquals("us", streamWriter1.getLocation());
assertEquals("us", streamWriter2.getLocation());
assertEquals("eu", streamWriter3.getLocation());
}
}

0 comments on commit 1733d5a

Please sign in to comment.