Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: at connection level, retry for internal errors #1965

Merged
merged 10 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.4.0')
implementation platform('com.google.cloud:libraries-bom:26.5.0')

implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.3'
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.4'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.3"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.4"
```

## Authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool</className>
<method>ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)</method>
<method>ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings)</method>
</difference>
<difference>
<differenceType>7009</differenceType>
Expand All @@ -142,4 +142,9 @@
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool</className>
<method>long getInflightWaitSeconds(com.google.cloud.bigquery.storage.v1.StreamWriter)</method>
</difference>
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool</className>
<method>ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.util.Errors;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback;
Expand Down Expand Up @@ -222,7 +221,6 @@ public ConnectionWorker(
Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription("Writer schema must be provided when building this writer."));
}
this.writerSchema = writerSchema;
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.limitExceededBehavior = limitExceededBehavior;
Expand Down Expand Up @@ -379,6 +377,11 @@ public String getWriterId() {
return writerId;
}

boolean isConnectionInUnrecoverableState() {
// If final status is set, there's no
return connectionFinalStatus != null;
}

/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
Expand Down Expand Up @@ -432,7 +435,7 @@ private void appendLoop() {

// Indicate whether we are at the first request after switching destination.
// True means the schema and other metadata are needed.
boolean firstRequestForDestinationSwitch = true;
boolean firstRequestForTableOrSchemaSwitch = true;
// Represent whether we have entered multiplexing.
boolean isMultiplexing = false;

Expand Down Expand Up @@ -483,25 +486,35 @@ private void appendLoop() {
resetConnection();
// Set firstRequestInConnection to indicate the next request to be sent should include
// metedata. Reset everytime after reconnection.
firstRequestForDestinationSwitch = true;
firstRequestForTableOrSchemaSwitch = true;
}
while (!localQueue.isEmpty()) {
AppendRowsRequest originalRequest = localQueue.pollFirst().message;
AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder();

// Consider we enter multiplexing if we met a different non empty stream name.
if (!originalRequest.getWriteStream().isEmpty()
&& !streamName.isEmpty()
&& !originalRequest.getWriteStream().equals(streamName)) {
// Always respect the first writer schema seen by the loop.
if (writerSchema == null) {
writerSchema = originalRequest.getProtoRows().getWriterSchema();
}
// Consider we enter multiplexing if we met a different non empty stream name or we meet
// a new schema for the same stream name.
// For the schema comparision we don't use message differencer to speed up the comparing
// process. `equals(...)` can bring us false positive, e.g. two repeated field can be
// considered the same but is not considered equals(). However as long as it's never provide
// false negative we will always correctly pass writer schema to backend.
if ((!originalRequest.getWriteStream().isEmpty()
&& !streamName.isEmpty()
&& !originalRequest.getWriteStream().equals(streamName))
|| (originalRequest.getProtoRows().hasWriterSchema()
&& !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema))) {
streamName = originalRequest.getWriteStream();
writerSchema = originalRequest.getProtoRows().getWriterSchema();
isMultiplexing = true;
firstRequestForDestinationSwitch = true;
firstRequestForTableOrSchemaSwitch = true;
}

if (firstRequestForDestinationSwitch) {
if (firstRequestForTableOrSchemaSwitch) {
// If we are at the first request for every table switch, including the first request in
// the connection, we will attach both stream name and table schema to the request.
// We don't support change of schema change during multiplexing for the saeme stream name.
destinationSet.add(streamName);
if (this.traceId != null) {
originalRequestBuilder.setTraceId(this.traceId);
Expand All @@ -511,17 +524,11 @@ private void appendLoop() {
originalRequestBuilder.clearWriteStream();
}

// We don't use message differencer to speed up the comparing process.
// `equals(...)` can bring us false positive, e.g. two repeated field can be considered the
// same but is not considered equals(). However as long as it's never provide false negative
// we will always correctly pass writer schema to backend.
if (firstRequestForDestinationSwitch
|| !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema)) {
writerSchema = originalRequest.getProtoRows().getWriterSchema();
} else {
// During non table/schema switch requests, clear writer schema.
if (!firstRequestForTableOrSchemaSwitch) {
originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema();
}
firstRequestForDestinationSwitch = false;
firstRequestForTableOrSchemaSwitch = false;

// Send should only throw an exception if there is a problem with the request. The catch
// block will handle this case, and return the exception with the result.
Expand Down Expand Up @@ -716,14 +723,14 @@ private void requestCallback(AppendRowsResponse response) {
});
}

private boolean isRetriableError(Throwable t) {
private boolean isConnectionErrorRetriable(Throwable t) {
Status status = Status.fromThrowable(t);
if (Errors.isRetryableInternalStatus(status)) {
return true;
}
return status.getCode() == Code.ABORTED
|| status.getCode() == Code.UNAVAILABLE
|| status.getCode() == Code.CANCELLED;
|| status.getCode() == Code.CANCELLED
|| status.getCode() == Code.INTERNAL
|| status.getCode() == Code.FAILED_PRECONDITION
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this error for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to this CL: https://critique.corp.google.com/cl/483521407/depot/google3/cloud/helix/vortex/frontend/base/vortex_error_util.cc, except that two client trying to talk to the same stream. I think in this case, retry with a timeout also applies from client side point of view. If the situation persist then it eventually fails out, but if it is just transient, in the race of two workers, then retry still works.

|| status.getCode() == Code.DEADLINE_EXCEEDED;
}

private void doneCallback(Throwable finalStatus) {
Expand All @@ -740,7 +747,7 @@ private void doneCallback(Throwable finalStatus) {
connectionRetryStartTime = System.currentTimeMillis();
}
// If the error can be retried, don't set it here, let it try to retry later on.
if (isRetriableError(finalStatus)
if (isConnectionErrorRetriable(finalStatus)
&& !userClosed
&& (maxRetryDuration.toMillis() == 0f
|| System.currentTimeMillis() - connectionRetryStartTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,17 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
streamWriter,
(key, existingStream) -> {
// Stick to the existing stream if it's not overwhelmed.
if (existingStream != null && !existingStream.getLoad().isOverwhelmed()) {
if (existingStream != null
&& !existingStream.getLoad().isOverwhelmed()
&& !existingStream.isConnectionInUnrecoverableState()) {
return existingStream;
}
if (existingStream != null && existingStream.isConnectionInUnrecoverableState()) {
existingStream = null;
}
// Before search for the next connection to attach, clear the finalized connections
// first so that they will not be selected.
clearFinalizedConnectionWorker();
// Try to create or find another existing stream to reuse.
ConnectionWorker createdOrExistingConnection = null;
try {
Expand Down Expand Up @@ -299,7 +307,6 @@ private ConnectionWorker createOrReuseConnectionWorker(
}
return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
} else {

// Stick to the original connection if all the connections are overwhelmed.
if (existingConnectionWorker != null) {
return existingConnectionWorker;
Expand All @@ -310,6 +317,18 @@ private ConnectionWorker createOrReuseConnectionWorker(
}
}

private void clearFinalizedConnectionWorker() {
Set<ConnectionWorker> connectionWorkerSet = new HashSet<>();
for (ConnectionWorker existingWorker : connectionWorkerPool) {
if (existingWorker.isConnectionInUnrecoverableState()) {
connectionWorkerSet.add(existingWorker);
}
}
for (ConnectionWorker workerToRemove : connectionWorkerSet) {
connectionWorkerPool.remove(workerToRemove);
}
}

/** Select out the best connection worker among the given connection workers. */
static ConnectionWorker pickBestLoadConnection(
Comparator<Load> comparator, List<ConnectionWorker> connectionWorkerList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -450,6 +451,15 @@ static void cleanUp() {
connectionPoolMap.clear();
}

@VisibleForTesting
ConnectionWorkerPool getTestOnlyConnectionWorkerPool() {
ConnectionWorkerPool connectionWorkerPool = null;
for (Entry<ConnectionPoolKey, ConnectionWorkerPool> entry : connectionPoolMap.entrySet()) {
connectionWorkerPool = entry.getValue();
}
return connectionWorkerPool;
}

/** A builder of {@link StreamWriter}s. */
public static final class Builder {
private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,10 @@ public void testAppendInSameStream_switchSchema() throws Exception {
// We will get the request as the pattern of:
// (writer_stream: t1, schema: schema1)
// (writer_stream: _, schema: _)
// (writer_stream: _, schema: schema3)
// (writer_stream: _, schema: _)
// (writer_stream: _, schema: schema1)
// (writer_stream: _, schema: _)
// (writer_stream: t1, schema: schema3)
// (writer_stream: t1, schema: _)
// (writer_stream: t1, schema: schema1)
// (writer_stream: t1, schema: _)
switch (i % 4) {
case 0:
if (i == 0) {
Expand All @@ -261,19 +261,23 @@ public void testAppendInSameStream_switchSchema() throws Exception {
.isEqualTo("foo");
break;
case 1:
assertThat(serverRequest.getWriteStream()).isEmpty();
if (i == 1) {
assertThat(serverRequest.getWriteStream()).isEmpty();
} else {
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
}
// Schema is empty if not at the first request after table switch.
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
break;
case 2:
assertThat(serverRequest.getWriteStream()).isEmpty();
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
// Schema is populated after table switch.
assertThat(
serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())
.isEqualTo("bar");
break;
case 3:
assertThat(serverRequest.getWriteStream()).isEmpty();
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
// Schema is empty if not at the first request after table switch.
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
break;
Expand Down
Loading