Skip to content

Commit

Permalink
feat: fix windows build failure by using nanoSeconds instead of Insta…
Browse files Browse the repository at this point in the history
…nt for better accuracy. (#1887)

* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* feat: add getInflightWaitSeconds implementation

* feat: Add schema comparision in connection loop to ensure schema update for
the same stream name can be notified

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: add schema update support to multiplexing

* fix: fix windows build bug: windows Instant resolution is different with
linux

* fix: fix another failing tests for windows build

* fix: fix another test failure for Windows build

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] authored Nov 16, 2022
1 parent 0def62f commit e5cd7df
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Instant;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
Expand Down Expand Up @@ -610,7 +609,7 @@ private void requestCallback(AppendRowsResponse response) {
this.lock.lock();
if (response.hasUpdatedSchema()) {
this.updatedSchema =
TableSchemaAndTimestamp.create(Instant.now(), response.getUpdatedSchema());
TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema());
}
try {
// Had a successful connection with at least one result, reset retries.
Expand Down Expand Up @@ -824,12 +823,12 @@ public static void setOverwhelmedCountsThreshold(double newThreshold) {
@AutoValue
abstract static class TableSchemaAndTimestamp {
// Shows the timestamp updated schema is reported from response
abstract Instant updateTimeStamp();
abstract long updateTimeStamp();

// The updated schema returned from server.
abstract TableSchema updatedSchema();

static TableSchemaAndTimestamp create(Instant updateTimeStamp, TableSchema updatedSchema) {
static TableSchemaAndTimestamp create(long updateTimeStamp, TableSchema updatedSchema) {
return new AutoValue_ConnectionWorker_TableSchemaAndTimestamp(updateTimeStamp, updatedSchema);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
Expand Down Expand Up @@ -268,7 +267,7 @@ public ApiFuture<AppendRowsResponse> append(
if (response.getWriteStream() != "" && response.hasUpdatedSchema()) {
tableNameToUpdatedSchema.put(
response.getWriteStream(),
TableSchemaAndTimestamp.create(Instant.now(), response.getUpdatedSchema()));
TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema()));
}
return response;
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
Expand Down Expand Up @@ -88,7 +87,7 @@ public class StreamWriter implements AutoCloseable {
new ConcurrentHashMap<>();

/** Creation timestamp of this streamwriter */
private final Instant creationTimestamp;
private final long creationTimestamp;

/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
Expand Down Expand Up @@ -260,7 +259,7 @@ private StreamWriter(Builder builder) throws IOException {
client.close();
}
}
this.creationTimestamp = Instant.now();
this.creationTimestamp = System.nanoTime();
}

@VisibleForTesting
Expand Down Expand Up @@ -414,12 +413,12 @@ public synchronized TableSchema getUpdatedSchema() {
if (tableSchemaAndTimestamp == null) {
return null;
}
return creationTimestamp.compareTo(tableSchemaAndTimestamp.updateTimeStamp()) < 0
return creationTimestamp < tableSchemaAndTimestamp.updateTimeStamp()
? tableSchemaAndTimestamp.updatedSchema()
: null;
}

Instant getCreationTimestamp() {
long getCreationTimestamp() {
return creationTimestamp;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,6 @@ public void run() throws Throwable {
public void testSimpleSchemaUpdate() throws Exception {
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
// Sleep for a short period to make sure the creation timestamp is older.
Sleeper.DEFAULT.sleep(200);
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import com.google.api.client.util.Sleeper;
import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.NoCredentialsProvider;
Expand Down Expand Up @@ -310,8 +309,6 @@ private void testUpdatedSchemaFetch(boolean enableMultiplexing)
AppendRowsResponse response =
writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0).get();
assertEquals(writer.getUpdatedSchema(), UPDATED_TABLE_SCHEMA);
// Sleep for a short period to make sure the creation timestamp is older.
Sleeper.DEFAULT.sleep(200);

// Create another writer, although it's the same stream name but the time stamp is newer, thus
// the old updated schema won't get returned.
Expand Down

0 comments on commit e5cd7df

Please sign in to comment.