Skip to content

Commit

Permalink
fix: apply stream wait timeout
Browse files Browse the repository at this point in the history
Use the streamWaitTimeout that has been set on the call context when polling
from the gRPC stream. This prevents the stream from blocking forever if for
some reason the stream is no longer delivering data, and also no error is
propagated to the client.

The default stream wait timeout that is set for all call contexts is 30 mins.
This value can be overridden by configuring a custom call context for a specific
query.

Fixes #2494
  • Loading branch information
olavloite committed Jul 25, 2023
1 parent 8aa407f commit da05c44
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
Expand Down Expand Up @@ -74,6 +75,7 @@
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/** Implementation of {@link ResultSet}. */
abstract class AbstractResultSet<R> extends AbstractStructReader implements ResultSet {
Expand Down Expand Up @@ -944,6 +946,8 @@ static class GrpcStreamIterator extends AbstractIterator<PartialResultSet>

private SpannerRpc.StreamingCall call;
private volatile boolean withBeginTransaction;
private TimeUnit streamWaitTimeoutUnit;
private long streamWaitTimeoutValue;
private SpannerException error;

@VisibleForTesting
Expand All @@ -965,6 +969,20 @@ protected final SpannerRpc.ResultStreamConsumer consumer() {
public void setCall(SpannerRpc.StreamingCall call, boolean withBeginTransaction) {
this.call = call;
this.withBeginTransaction = withBeginTransaction;
ApiCallContext callContext = call.getCallContext();
Duration streamWaitTimeout = callContext == null ? null : callContext.getStreamWaitTimeout();
if (streamWaitTimeout != null) {
// Determine the timeout unit to use. This reduces the precision to seconds if the timeout
// value is more than 1 second, which is lower than the precision that would normally be
// used by the stream watchdog (which uses a precision of 10 seconds by default).
if (streamWaitTimeout.getSeconds() > 0L) {
streamWaitTimeoutValue = streamWaitTimeout.getSeconds();
streamWaitTimeoutUnit = TimeUnit.SECONDS;
} else if (streamWaitTimeout.getNano() > 0) {
streamWaitTimeoutValue = streamWaitTimeout.getNano();
streamWaitTimeoutUnit = TimeUnit.NANOSECONDS;
}
}
}

@Override
Expand All @@ -983,11 +1001,15 @@ public boolean isWithBeginTransaction() {
protected final PartialResultSet computeNext() {
PartialResultSet next;
try {
// TODO: Ideally honor io.grpc.Context while blocking here. In practice,
// cancellation/deadline results in an error being delivered to "stream", which
// should mean that we do not block significantly longer afterwards, but it would
// be more robust to use poll() with a timeout.
next = stream.take();
if (streamWaitTimeoutUnit != null) {
next = stream.poll(streamWaitTimeoutValue, streamWaitTimeoutUnit);
if (next == null) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED, "stream wait timeout");
}
} else {
next = stream.take();
}
} catch (InterruptedException e) {
// Treat interrupt as a request to cancel the read.
throw SpannerExceptionFactory.propagateInterrupt(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1596,20 +1596,7 @@ public StreamingCall read(
options, request.getSession(), request, SpannerGrpc.getReadMethod(), routeToLeader);
SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
spannerStub.streamingReadCallable().call(request, responseObserver, context);
final StreamController controller = responseObserver.getController();
return new StreamingCall() {
@Override
public void request(int numMessage) {
controller.request(numMessage);
}

// TODO(hzyi): streamController currently does not support cancel with message. Add
// this in gax and update this method later
@Override
public void cancel(String message) {
controller.cancel();
}
};
return new GrpcStreamingCall(context, responseObserver.getController());
}

@Override
Expand Down Expand Up @@ -1673,22 +1660,10 @@ public StreamingCall executeQuery(
request,
SpannerGrpc.getExecuteStreamingSqlMethod(),
routeToLeader);

SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
spannerStub.executeStreamingSqlCallable().call(request, responseObserver, context);
final StreamController controller = responseObserver.getController();
return new StreamingCall() {
@Override
public void request(int numMessage) {
controller.request(numMessage);
}

// TODO(hzyi): streamController currently does not support cancel with message. Add
// this in gax and update this method later
@Override
public void cancel(String message) {
controller.cancel();
}
};
return new GrpcStreamingCall(context, responseObserver.getController());
}

@Override
Expand Down Expand Up @@ -1957,6 +1932,31 @@ public boolean isClosed() {
return rpcIsClosed;
}

private static final class GrpcStreamingCall implements StreamingCall {
private final ApiCallContext callContext;
private final StreamController controller;

GrpcStreamingCall(ApiCallContext callContext, StreamController controller) {
this.callContext = callContext;
this.controller = controller;
}

@Override
public ApiCallContext getCallContext() {
return callContext;
}

@Override
public void request(int numMessages) {
controller.request(numMessages);
}

@Override
public void cancel(@Nullable String message) {
controller.cancel();
}
}

/**
* A {@code ResponseObserver} that exposes the {@code StreamController} and delegates callbacks to
* the {@link ResultStreamConsumer}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.BackupId;
Expand Down Expand Up @@ -150,6 +151,9 @@ interface ResultStreamConsumer {
/** Handle for cancellation of a streaming read or query call. */
interface StreamingCall {

/** Returns the {@link ApiCallContext} that is used for this streaming call. */
ApiCallContext getCallContext();

/**
* Requests more messages from the stream. We disable the auto flow control mechanism in grpc,
* so we need to request messages ourself. This gives us more control over how much buffer we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.NoCredentials;
import com.google.cloud.Timestamp;
Expand All @@ -51,6 +52,7 @@
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator;
import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator;
import com.google.cloud.spanner.Type.Code;
import com.google.cloud.spanner.connection.RandomResultSetGenerator;
Expand All @@ -77,6 +79,7 @@
import com.google.spanner.v1.TypeAnnotationCode;
import com.google.spanner.v1.TypeCode;
import io.grpc.Context;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
Expand Down Expand Up @@ -2963,6 +2966,33 @@ public void testStatementWithBytesArrayParameter() {
}
}

@Test
public void testStreamWaitTimeout() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
// Create a custom call configuration that uses a 1 nanosecond stream timeout value. This will
// always time out, as a call to the mock server will always take more than 1 nanosecond.
CallContextConfigurator configurator =
new CallContextConfigurator() {
@Override
public <ReqT, RespT> ApiCallContext configure(
ApiCallContext context, ReqT request, MethodDescriptor<ReqT, RespT> method) {
return context.withStreamWaitTimeout(Duration.ofNanos(1L));
}
};
Context context =
Context.current().withValue(SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY, configurator);
context.run(
() -> {
try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1)) {
SpannerException exception = assertThrows(SpannerException.class, resultSet::next);
assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode());
assertTrue(
exception.getMessage(), exception.getMessage().contains("stream wait timeout"));
}
});
}

static void assertAsString(String expected, ResultSet resultSet, int col) {
assertEquals(expected, resultSet.getValue(col).getAsString());
assertEquals(ImmutableList.of(expected), resultSet.getValue(col).getAsStringList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
Expand Down Expand Up @@ -50,6 +52,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

/** Unit tests for {@link com.google.cloud.spanner.AbstractResultSet.GrpcResultSet}. */
@RunWith(JUnit4.class)
Expand All @@ -58,6 +61,7 @@ public class GrpcResultSetTest {
private AbstractResultSet.GrpcResultSet resultSet;
private SpannerRpc.ResultStreamConsumer consumer;
private AbstractResultSet.GrpcStreamIterator stream;
private final Duration streamWaitTimeout = Duration.ofNanos(1L);

private static class NoOpListener implements AbstractResultSet.Listener {
@Override
Expand All @@ -78,6 +82,11 @@ public void setUp() {
stream = new AbstractResultSet.GrpcStreamIterator(10);
stream.setCall(
new SpannerRpc.StreamingCall() {
@Override
public ApiCallContext getCallContext() {
return GrpcCallContext.createDefault().withStreamWaitTimeout(streamWaitTimeout);
}

@Override
public void cancel(@Nullable String message) {}

Expand All @@ -93,6 +102,14 @@ public AbstractResultSet.GrpcResultSet resultSetWithMode(QueryMode queryMode) {
return new AbstractResultSet.GrpcResultSet(stream, new NoOpListener());
}

@Test
public void testStreamTimeout() {
// We don't add any results to the stream. That means that it will time out after 1ns.
SpannerException exception = assertThrows(SpannerException.class, resultSet::next);
assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode());
assertTrue(exception.getMessage(), exception.getMessage().contains("stream wait timeout"));
}

@Test
public void metadata() {
Type rowType = Type.struct(Type.StructField.of("f", Type.string()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.io.Resources;
Expand Down Expand Up @@ -115,6 +117,11 @@ private void run() throws Exception {
stream = new AbstractResultSet.GrpcStreamIterator(10);
stream.setCall(
new SpannerRpc.StreamingCall() {
@Override
public ApiCallContext getCallContext() {
return GrpcCallContext.createDefault();
}

@Override
public void cancel(@Nullable String message) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

import com.google.api.core.ApiFutures;
import com.google.api.core.NanoClock;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
Expand Down Expand Up @@ -407,6 +409,11 @@ public void singleUseReadOnlyTransactionReturnsEmptyTransactionMetadata() {
}

private static class NoOpStreamingCall implements SpannerRpc.StreamingCall {
@Override
public ApiCallContext getCallContext() {
return GrpcCallContext.createDefault();
}

@Override
public void cancel(@Nullable String message) {}

Expand Down

0 comments on commit da05c44

Please sign in to comment.