diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml
index 74d6d823ae6..1319b511fd6 100644
--- a/google-cloud-spanner/clirr-ignored-differences.xml
+++ b/google-cloud-spanner/clirr-ignored-differences.xml
@@ -359,4 +359,11 @@
com/google/cloud/spanner/connection/Connection
boolean isDelayTransactionStartUntilFirstWrite()
+
+
+
+ 7012
+ com/google/cloud/spanner/spi/v1/SpannerRpc$StreamingCall
+ com.google.api.gax.rpc.ApiCallContext getCallContext()
+
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
index 4d9ec1cda04..37024bd2676 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java
@@ -25,6 +25,7 @@
import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
@@ -74,6 +75,7 @@
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.threeten.bp.Duration;
/** Implementation of {@link ResultSet}. */
abstract class AbstractResultSet extends AbstractStructReader implements ResultSet {
@@ -944,6 +946,8 @@ static class GrpcStreamIterator extends AbstractIterator
private SpannerRpc.StreamingCall call;
private volatile boolean withBeginTransaction;
+ private TimeUnit streamWaitTimeoutUnit;
+ private long streamWaitTimeoutValue;
private SpannerException error;
@VisibleForTesting
@@ -965,6 +969,22 @@ protected final SpannerRpc.ResultStreamConsumer consumer() {
public void setCall(SpannerRpc.StreamingCall call, boolean withBeginTransaction) {
this.call = call;
this.withBeginTransaction = withBeginTransaction;
+ ApiCallContext callContext = call.getCallContext();
+ Duration streamWaitTimeout = callContext == null ? null : callContext.getStreamWaitTimeout();
+ if (streamWaitTimeout != null) {
+ // Determine the timeout unit to use. This reduces the precision to seconds if the timeout
+ // value is more than 1 second, which is lower than the precision that would normally be
+ // used by the stream watchdog (which uses a precision of 10 seconds by default).
+ if (streamWaitTimeout.getSeconds() > 0L) {
+ streamWaitTimeoutValue = streamWaitTimeout.getSeconds();
+ streamWaitTimeoutUnit = TimeUnit.SECONDS;
+ } else if (streamWaitTimeout.getNano() > 0) {
+ streamWaitTimeoutValue = streamWaitTimeout.getNano();
+ streamWaitTimeoutUnit = TimeUnit.NANOSECONDS;
+ }
+ // Note that if the stream-wait-timeout is zero, we won't set a timeout at all.
+ // That is consistent with ApiCallContext#withStreamWaitTimeout(Duration.ZERO).
+ }
}
@Override
@@ -983,11 +1003,15 @@ public boolean isWithBeginTransaction() {
protected final PartialResultSet computeNext() {
PartialResultSet next;
try {
- // TODO: Ideally honor io.grpc.Context while blocking here. In practice,
- // cancellation/deadline results in an error being delivered to "stream", which
- // should mean that we do not block significantly longer afterwards, but it would
- // be more robust to use poll() with a timeout.
- next = stream.take();
+ if (streamWaitTimeoutUnit != null) {
+ next = stream.poll(streamWaitTimeoutValue, streamWaitTimeoutUnit);
+ if (next == null) {
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.DEADLINE_EXCEEDED, "stream wait timeout");
+ }
+ } else {
+ next = stream.take();
+ }
} catch (InterruptedException e) {
// Treat interrupt as a request to cancel the read.
throw SpannerExceptionFactory.propagateInterrupt(e);
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
index 53954097822..1afea7676db 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
@@ -1596,20 +1596,7 @@ public StreamingCall read(
options, request.getSession(), request, SpannerGrpc.getReadMethod(), routeToLeader);
SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
spannerStub.streamingReadCallable().call(request, responseObserver, context);
- final StreamController controller = responseObserver.getController();
- return new StreamingCall() {
- @Override
- public void request(int numMessage) {
- controller.request(numMessage);
- }
-
- // TODO(hzyi): streamController currently does not support cancel with message. Add
- // this in gax and update this method later
- @Override
- public void cancel(String message) {
- controller.cancel();
- }
- };
+ return new GrpcStreamingCall(context, responseObserver.getController());
}
@Override
@@ -1673,22 +1660,10 @@ public StreamingCall executeQuery(
request,
SpannerGrpc.getExecuteStreamingSqlMethod(),
routeToLeader);
+
SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
spannerStub.executeStreamingSqlCallable().call(request, responseObserver, context);
- final StreamController controller = responseObserver.getController();
- return new StreamingCall() {
- @Override
- public void request(int numMessage) {
- controller.request(numMessage);
- }
-
- // TODO(hzyi): streamController currently does not support cancel with message. Add
- // this in gax and update this method later
- @Override
- public void cancel(String message) {
- controller.cancel();
- }
- };
+ return new GrpcStreamingCall(context, responseObserver.getController());
}
@Override
@@ -1957,6 +1932,31 @@ public boolean isClosed() {
return rpcIsClosed;
}
+ private static final class GrpcStreamingCall implements StreamingCall {
+ private final ApiCallContext callContext;
+ private final StreamController controller;
+
+ GrpcStreamingCall(ApiCallContext callContext, StreamController controller) {
+ this.callContext = callContext;
+ this.controller = controller;
+ }
+
+ @Override
+ public ApiCallContext getCallContext() {
+ return callContext;
+ }
+
+ @Override
+ public void request(int numMessages) {
+ controller.request(numMessages);
+ }
+
+ @Override
+ public void cancel(@Nullable String message) {
+ controller.cancel();
+ }
+ }
+
/**
* A {@code ResponseObserver} that exposes the {@code StreamController} and delegates callbacks to
* the {@link ResultStreamConsumer}.
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java
index 27adf89a235..62c34a58a1a 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java
@@ -20,6 +20,7 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.BackupId;
@@ -150,6 +151,9 @@ interface ResultStreamConsumer {
/** Handle for cancellation of a streaming read or query call. */
interface StreamingCall {
+ /** Returns the {@link ApiCallContext} that is used for this streaming call. */
+ ApiCallContext getCallContext();
+
/**
* Requests more messages from the stream. We disable the auto flow control mechanism in grpc,
* so we need to request messages ourself. This gives us more control over how much buffer we
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
index 0b88edc7f69..53bb30dba70 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
@@ -38,6 +38,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.NoCredentials;
import com.google.cloud.Timestamp;
@@ -51,6 +52,7 @@
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
+import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator;
import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator;
import com.google.cloud.spanner.Type.Code;
import com.google.cloud.spanner.connection.RandomResultSetGenerator;
@@ -77,6 +79,7 @@
import com.google.spanner.v1.TypeAnnotationCode;
import com.google.spanner.v1.TypeCode;
import io.grpc.Context;
+import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -2963,6 +2966,63 @@ public void testStatementWithBytesArrayParameter() {
}
}
+ @Test
+ public void testStreamWaitTimeout() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ // Add a wait time to the mock server. Note that the test won't actually wait 100ms, as it uses
+ // a 1ns time out.
+ mockSpanner.setExecuteStreamingSqlExecutionTime(
+ SimulatedExecutionTime.ofMinimumAndRandomTime(100, 0));
+ // Create a custom call configuration that uses a 1 nanosecond stream timeout value. This will
+ // always time out, as a call to the mock server will always take more than 1 nanosecond.
+ CallContextConfigurator configurator =
+ new CallContextConfigurator() {
+ @Override
+ public ApiCallContext configure(
+ ApiCallContext context, ReqT request, MethodDescriptor method) {
+ return context.withStreamWaitTimeout(Duration.ofNanos(1L));
+ }
+ };
+ Context context =
+ Context.current().withValue(SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY, configurator);
+ context.run(
+ () -> {
+ try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1)) {
+ SpannerException exception = assertThrows(SpannerException.class, resultSet::next);
+ assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode());
+ assertTrue(
+ exception.getMessage(), exception.getMessage().contains("stream wait timeout"));
+ }
+ });
+ }
+
+ @Test
+ public void testZeroStreamWaitTimeout() {
+ DatabaseClient client =
+ spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
+ // Create a custom call configuration that sets the stream timeout to zero.
+ // This should disable the timeout.
+ CallContextConfigurator configurator =
+ new CallContextConfigurator() {
+ @Override
+ public ApiCallContext configure(
+ ApiCallContext context, ReqT request, MethodDescriptor method) {
+ return context.withStreamWaitTimeout(Duration.ZERO);
+ }
+ };
+ Context context =
+ Context.current().withValue(SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY, configurator);
+ context.run(
+ () -> {
+ try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1)) {
+ // A zero timeout should not cause a timeout, and instead be ignored.
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ }
+ });
+ }
+
static void assertAsString(String expected, ResultSet resultSet, int col) {
assertEquals(expected, resultSet.getValue(col).getAsString());
assertEquals(ImmutableList.of(expected), resultSet.getValue(col).getAsStringList());
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java
index e35ecf9ad9a..82300a93090 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java
@@ -22,6 +22,8 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+import com.google.api.gax.grpc.GrpcCallContext;
+import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
@@ -50,6 +52,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.threeten.bp.Duration;
/** Unit tests for {@link com.google.cloud.spanner.AbstractResultSet.GrpcResultSet}. */
@RunWith(JUnit4.class)
@@ -58,6 +61,7 @@ public class GrpcResultSetTest {
private AbstractResultSet.GrpcResultSet resultSet;
private SpannerRpc.ResultStreamConsumer consumer;
private AbstractResultSet.GrpcStreamIterator stream;
+ private final Duration streamWaitTimeout = Duration.ofNanos(1L);
private static class NoOpListener implements AbstractResultSet.Listener {
@Override
@@ -78,6 +82,11 @@ public void setUp() {
stream = new AbstractResultSet.GrpcStreamIterator(10);
stream.setCall(
new SpannerRpc.StreamingCall() {
+ @Override
+ public ApiCallContext getCallContext() {
+ return GrpcCallContext.createDefault().withStreamWaitTimeout(streamWaitTimeout);
+ }
+
@Override
public void cancel(@Nullable String message) {}
@@ -93,6 +102,14 @@ public AbstractResultSet.GrpcResultSet resultSetWithMode(QueryMode queryMode) {
return new AbstractResultSet.GrpcResultSet(stream, new NoOpListener());
}
+ @Test
+ public void testStreamTimeout() {
+ // We don't add any results to the stream. That means that it will time out after 1ns.
+ SpannerException exception = assertThrows(SpannerException.class, resultSet::next);
+ assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode());
+ assertTrue(exception.getMessage(), exception.getMessage().contains("stream wait timeout"));
+ }
+
@Test
public void metadata() {
Type rowType = Type.struct(Type.StructField.of("f", Type.string()));
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java
index d8d39e6931c..af558d14dd4 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java
@@ -18,6 +18,8 @@
import static com.google.common.truth.Truth.assertThat;
+import com.google.api.gax.grpc.GrpcCallContext;
+import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.io.Resources;
@@ -115,6 +117,11 @@ private void run() throws Exception {
stream = new AbstractResultSet.GrpcStreamIterator(10);
stream.setCall(
new SpannerRpc.StreamingCall() {
+ @Override
+ public ApiCallContext getCallContext() {
+ return GrpcCallContext.createDefault();
+ }
+
@Override
public void cancel(@Nullable String message) {}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java
index 90e9a684d95..aa3fe530465 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java
@@ -27,7 +27,9 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.NanoClock;
+import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
@@ -407,6 +409,11 @@ public void singleUseReadOnlyTransactionReturnsEmptyTransactionMetadata() {
}
private static class NoOpStreamingCall implements SpannerRpc.StreamingCall {
+ @Override
+ public ApiCallContext getCallContext() {
+ return GrpcCallContext.createDefault();
+ }
+
@Override
public void cancel(@Nullable String message) {}