Skip to content

Commit

Permalink
fix: use streaming read/query settings for stream retry (#2579)
Browse files Browse the repository at this point in the history
* fix: use streaming read/query settings for stream retry

Use the streaming read/query settings from the SpannerStubSettings to
determine when and how to retry a failure halfway a streaming call.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
olavloite and gcf-owl-bot[bot] authored Aug 17, 2023
1 parent 051c530 commit f78b838
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 23 deletions.
21 changes: 21 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -422,4 +422,25 @@
<className>com/google/cloud/spanner/spi/v1/SpannerRpc$StreamingCall</className>
<method>com.google.api.gax.rpc.ApiCallContext getCallContext()</method>
</difference>
<!-- (Internal change, propagate streaming retry settings) -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.api.gax.retrying.RetrySettings getReadRetrySettings()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.api.gax.retrying.RetrySettings getExecuteQueryRetrySettings()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>java.util.Set getReadRetryableCodes()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>java.util.Set getExecuteQueryRetryableCodes()</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,12 @@ ResultSet executeQueryInternalWithOptions(
getExecuteSqlRequestBuilder(
statement, queryMode, options, /* withTransactionSelector = */ false);
ResumableStreamIterator stream =
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) {
new ResumableStreamIterator(
MAX_BUFFERED_CHUNKS,
SpannerImpl.QUERY,
span,
rpc.getExecuteQueryRetrySettings(),
rpc.getExecuteQueryRetryableCodes()) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
Expand Down Expand Up @@ -798,7 +803,12 @@ ResultSet readInternalWithOptions(
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.READ, span) {
new ResumableStreamIterator(
MAX_BUFFERED_CHUNKS,
SpannerImpl.READ,
span,
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
Expand Down Expand Up @@ -65,6 +67,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -1082,10 +1085,12 @@ public void onError(SpannerException e) {
@VisibleForTesting
abstract static class ResumableStreamIterator extends AbstractIterator<PartialResultSet>
implements CloseableIterator<PartialResultSet> {
private static final RetrySettings STREAMING_RETRY_SETTINGS =
private static final RetrySettings DEFAULT_STREAMING_RETRY_SETTINGS =
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings();
private final RetrySettings streamingRetrySettings;
private final Set<Code> retryableCodes;
private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName());
private final BackOff backOff = newBackOff();
private final BackOff backOff;
private final LinkedList<PartialResultSet> buffer = new LinkedList<>();
private final int maxBufferSize;
private final Span span;
Expand All @@ -1099,24 +1104,58 @@ abstract static class ResumableStreamIterator extends AbstractIterator<PartialRe
*/
private boolean safeToRetry = true;

protected ResumableStreamIterator(int maxBufferSize, String streamName, Span parent) {
protected ResumableStreamIterator(
int maxBufferSize,
String streamName,
Span parent,
RetrySettings streamingRetrySettings,
Set<Code> retryableCodes) {
checkArgument(maxBufferSize >= 0);
this.maxBufferSize = maxBufferSize;
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent).startSpan();
}

private static ExponentialBackOff newBackOff() {
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
this.backOff = newBackOff();
}

private ExponentialBackOff newBackOff() {
if (Objects.equals(streamingRetrySettings, DEFAULT_STREAMING_RETRY_SETTINGS)) {
return new ExponentialBackOff.Builder()
.setMultiplier(streamingRetrySettings.getRetryDelayMultiplier())
.setInitialIntervalMillis(
Math.max(10, (int) streamingRetrySettings.getInitialRetryDelay().toMillis()))
.setMaxIntervalMillis(
Math.max(1000, (int) streamingRetrySettings.getMaxRetryDelay().toMillis()))
.setMaxElapsedTimeMillis(
Integer.MAX_VALUE) // Prevent Backoff.STOP from getting returned.
.build();
}
return new ExponentialBackOff.Builder()
.setMultiplier(STREAMING_RETRY_SETTINGS.getRetryDelayMultiplier())
.setMultiplier(streamingRetrySettings.getRetryDelayMultiplier())
// All of these values must be > 0.
.setInitialIntervalMillis(
Math.max(10, (int) STREAMING_RETRY_SETTINGS.getInitialRetryDelay().toMillis()))
Math.max(
1,
(int)
Math.min(
streamingRetrySettings.getInitialRetryDelay().toMillis(),
Integer.MAX_VALUE)))
.setMaxIntervalMillis(
Math.max(1000, (int) STREAMING_RETRY_SETTINGS.getMaxRetryDelay().toMillis()))
.setMaxElapsedTimeMillis(Integer.MAX_VALUE) // Prevent Backoff.STOP from getting returned.
Math.max(
1,
(int)
Math.min(
streamingRetrySettings.getMaxRetryDelay().toMillis(), Integer.MAX_VALUE)))
.setMaxElapsedTimeMillis(
Math.max(
1,
(int)
Math.min(
streamingRetrySettings.getTotalTimeout().toMillis(), Integer.MAX_VALUE)))
.build();
}

private static void backoffSleep(Context context, BackOff backoff) throws SpannerException {
private void backoffSleep(Context context, BackOff backoff) throws SpannerException {
backoffSleep(context, nextBackOffMillis(backoff));
}

Expand All @@ -1128,7 +1167,7 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException {
}
}

private static void backoffSleep(Context context, long backoffMillis) throws SpannerException {
private void backoffSleep(Context context, long backoffMillis) throws SpannerException {
tracer
.getCurrentSpan()
.addAnnotation(
Expand All @@ -1145,7 +1184,7 @@ private static void backoffSleep(Context context, long backoffMillis) throws Spa
try {
if (backoffMillis == BackOff.STOP) {
// Highly unlikely but we handle it just in case.
backoffMillis = STREAMING_RETRY_SETTINGS.getMaxRetryDelay().toMillis();
backoffMillis = streamingRetrySettings.getMaxRetryDelay().toMillis();
}
if (latch.await(backoffMillis, TimeUnit.MILLISECONDS)) {
// Woken by context cancellation.
Expand Down Expand Up @@ -1233,19 +1272,20 @@ protected PartialResultSet computeNext() {
return null;
}
}
} catch (SpannerException e) {
if (safeToRetry && e.isRetryable()) {
} catch (SpannerException spannerException) {
if (safeToRetry && isRetryable(spannerException)) {
span.addAnnotation(
"Stream broken. Safe to retry", TraceUtil.getExceptionAnnotations(e));
logger.log(Level.FINE, "Retryable exception, will sleep and retry", e);
"Stream broken. Safe to retry",
TraceUtil.getExceptionAnnotations(spannerException));
logger.log(Level.FINE, "Retryable exception, will sleep and retry", spannerException);
// Truncate any items in the buffer before the last retry token.
while (!buffer.isEmpty() && buffer.getLast().getResumeToken().isEmpty()) {
buffer.removeLast();
}
assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken);
stream = null;
try (Scope s = tracer.withSpan(span)) {
long delay = e.getRetryDelayInMillis();
long delay = spannerException.getRetryDelayInMillis();
if (delay != -1) {
backoffSleep(context, delay);
} else {
Expand All @@ -1256,15 +1296,21 @@ protected PartialResultSet computeNext() {
continue;
}
span.addAnnotation("Stream broken. Not safe to retry");
TraceUtil.setWithFailure(span, e);
throw e;
TraceUtil.setWithFailure(span, spannerException);
throw spannerException;
} catch (RuntimeException e) {
span.addAnnotation("Stream broken. Not safe to retry");
TraceUtil.setWithFailure(span, e);
throw e;
}
}
}

boolean isRetryable(SpannerException spannerException) {
return spannerException.isRetryable()
|| retryableCodes.contains(
GrpcStatusCode.of(spannerException.getErrorCode().getGrpcStatusCode()).getCode());
}
}

static double valueProtoToFloat64(com.google.protobuf.Value proto) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
Expand Down Expand Up @@ -230,6 +231,10 @@ public class GapicSpannerRpc implements SpannerRpc {

private boolean rpcIsClosed;
private final SpannerStub spannerStub;
private final RetrySettings executeQueryRetrySettings;
private final Set<Code> executeQueryRetryableCodes;
private final RetrySettings readRetrySettings;
private final Set<Code> readRetryableCodes;
private final SpannerStub partitionedDmlStub;
private final RetrySettings partitionedDmlRetrySettings;
private final InstanceAdminStub instanceAdminStub;
Expand Down Expand Up @@ -368,6 +373,14 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.build());
this.readRetrySettings =
options.getSpannerStubSettings().streamingReadSettings().getRetrySettings();
this.readRetryableCodes =
options.getSpannerStubSettings().streamingReadSettings().getRetryableCodes();
this.executeQueryRetrySettings =
options.getSpannerStubSettings().executeStreamingSqlSettings().getRetrySettings();
this.executeQueryRetryableCodes =
options.getSpannerStubSettings().executeStreamingSqlSettings().getRetryableCodes();
partitionedDmlRetrySettings =
options
.getSpannerStubSettings()
Expand Down Expand Up @@ -472,6 +485,10 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
this.databaseAdminStub = null;
this.instanceAdminStub = null;
this.spannerStub = null;
this.readRetrySettings = null;
this.readRetryableCodes = null;
this.executeQueryRetrySettings = null;
this.executeQueryRetryableCodes = null;
this.partitionedDmlStub = null;
this.databaseAdminStubSettings = null;
this.spannerWatchdog = null;
Expand Down Expand Up @@ -1585,6 +1602,16 @@ public ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Opt
return spannerStub.deleteSessionCallable().futureCall(request, context);
}

@Override
public RetrySettings getReadRetrySettings() {
return readRetrySettings;
}

@Override
public Set<Code> getReadRetryableCodes() {
return readRetryableCodes;
}

@Override
public StreamingCall read(
ReadRequest request,
Expand All @@ -1599,6 +1626,16 @@ public StreamingCall read(
return new GrpcStreamingCall(context, responseObserver.getController());
}

@Override
public RetrySettings getExecuteQueryRetrySettings() {
return executeQueryRetrySettings;
}

@Override
public Set<Code> getExecuteQueryRetryableCodes() {
return executeQueryRetryableCodes;
}

@Override
public ResultSet executeQuery(
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean routeToLeader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.BackupId;
import com.google.cloud.spanner.Restore;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.collect.ImmutableList;
import com.google.iam.v1.GetPolicyOptions;
import com.google.iam.v1.Policy;
Expand All @@ -53,6 +55,7 @@
import com.google.spanner.v1.*;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

Expand Down Expand Up @@ -337,6 +340,16 @@ Session createSession(
ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException;

/** Returns the retry settings for streaming read operations. */
default RetrySettings getReadRetrySettings() {
return SpannerStubSettings.newBuilder().streamingReadSettings().getRetrySettings();
}

/** Returns the retryable codes for streaming read operations. */
default Set<Code> getReadRetryableCodes() {
return SpannerStubSettings.newBuilder().streamingReadSettings().getRetryableCodes();
}

/**
* Performs a streaming read.
*
Expand All @@ -351,6 +364,16 @@ StreamingCall read(
@Nullable Map<Option, ?> options,
boolean routeToLeader);

/** Returns the retry settings for streaming query operations. */
default RetrySettings getExecuteQueryRetrySettings() {
return SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings();
}

/** Returns the retryable codes for streaming query operations. */
default Set<Code> getExecuteQueryRetryableCodes() {
return SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes();
}

/**
* Executes a query.
*
Expand Down
Loading

0 comments on commit f78b838

Please sign in to comment.