Skip to content

Commit

Permalink
feat: minor tune after offline testing (#1807)
Browse files Browse the repository at this point in the history
* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] authored Sep 29, 2022
1 parent 9791d69 commit 694a870
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,8 @@ public Load getLoad() {
public abstract static class Load {
// Consider the load on this worker to be overwhelmed when above some percentage of
// in-flight bytes or in-flight requests count.
private static double overwhelmedInflightCount = 0.5;
private static double overwhelmedInflightBytes = 0.6;
private static double overwhelmedInflightCount = 0.2;
private static double overwhelmedInflightBytes = 0.2;

// Number of in-flight requests bytes in the worker.
abstract long inFlightRequestsBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public abstract static class Settings {
public static Builder builder() {
return new AutoValue_ConnectionWorkerPool_Settings.Builder()
.setMinConnectionsPerRegion(2)
.setMaxConnectionsPerRegion(10);
.setMaxConnectionsPerRegion(20);
}

/** Builder for the options to config {@link ConnectionWorkerPool}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,16 @@ private StreamWriter(Builder builder) throws IOException {
String fetchedLocation = writeStream.getLocation();
log.info(
String.format(
"Fethed location %s for stream name %s", fetchedLocation, streamName));
"Fethed location %s for stream name %s, extracted project and dataset name: %s\"",
fetchedLocation, streamName, datasetAndProjectName));
return fetchedLocation;
});
if (location.isEmpty()) {
throw new IllegalStateException(
String.format(
"The location is empty for both user passed in value and looked up value for "
+ "stream: %s",
streamName));
+ "stream: %s, extracted project and dataset name: %s",
streamName, datasetAndProjectName));
}
}
this.location = location;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public void setUp() throws Exception {
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(serviceHelper.createChannelProvider())
.build());
ConnectionWorker.Load.setOverwhelmedCountsThreshold(0.5);
ConnectionWorker.Load.setOverwhelmedBytesThreshold(0.6);
}

@Test
Expand Down

0 comments on commit 694a870

Please sign in to comment.