diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index eb9b365f345..0727f57b835 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -63,7 +63,6 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ForwardingListenableFuture; import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture; import com.google.common.util.concurrent.ListenableFuture; @@ -120,17 +119,6 @@ class SessionPool { private static final Logger logger = Logger.getLogger(SessionPool.class.getName()); private static final Tracer tracer = Tracing.getTracer(); static final String WAIT_FOR_SESSION = "SessionPool.WaitForSession"; - static final ImmutableSet SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES = - ImmutableSet.of( - ErrorCode.UNKNOWN, - ErrorCode.INVALID_ARGUMENT, - ErrorCode.PERMISSION_DENIED, - ErrorCode.UNAUTHENTICATED, - ErrorCode.RESOURCE_EXHAUSTED, - ErrorCode.FAILED_PRECONDITION, - ErrorCode.OUT_OF_RANGE, - ErrorCode.UNIMPLEMENTED, - ErrorCode.INTERNAL); /** * If the {@link SessionPoolOptions#getWaitForMinSessions()} duration is greater than zero, waits @@ -1675,7 +1663,8 @@ public PooledSession get() { while (true) { Span span = tracer.spanBuilder(WAIT_FOR_SESSION).startSpan(); try (Scope waitScope = tracer.withSpan(span)) { - PooledSession s = pollUninterruptiblyWithTimeout(currentTimeout); + PooledSession s = + pollUninterruptiblyWithTimeout(currentTimeout, options.getAcquireSessionTimeout()); if (s == null) { // Set the status to DEADLINE_EXCEEDED and retry. numWaiterTimeouts.incrementAndGet(); @@ -1685,6 +1674,11 @@ public PooledSession get() { return s; } } catch (Exception e) { + if (e instanceof SpannerException + && ErrorCode.RESOURCE_EXHAUSTED.equals(((SpannerException) e).getErrorCode())) { + numWaiterTimeouts.incrementAndGet(); + tracer.getCurrentSpan().setStatus(Status.RESOURCE_EXHAUSTED); + } TraceUtil.setWithFailure(span, e); throw e; } finally { @@ -1693,15 +1687,26 @@ public PooledSession get() { } } - private PooledSession pollUninterruptiblyWithTimeout(long timeoutMillis) { + private PooledSession pollUninterruptiblyWithTimeout( + long timeoutMillis, Duration acquireSessionTimeout) { boolean interrupted = false; try { while (true) { try { - return waiter.get(timeoutMillis, TimeUnit.MILLISECONDS); + return acquireSessionTimeout == null + ? waiter.get(timeoutMillis, TimeUnit.MILLISECONDS) + : waiter.get(acquireSessionTimeout.toMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { interrupted = true; } catch (TimeoutException e) { + if (acquireSessionTimeout != null) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.RESOURCE_EXHAUSTED, + "Timed out after waiting " + + acquireSessionTimeout.toMillis() + + "ms for acquiring session. To mitigate error SessionPoolOptions#setAcquireSessionTimeout(Duration) to set a higher timeout" + + " or increase the number of sessions in the session pool."); + } return null; } catch (ExecutionException e) { throw SpannerExceptionFactory.newSpannerException(e.getCause()); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 232e71817cb..46e34b6c8ae 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -52,9 +52,16 @@ public class SessionPoolOptions { private final ActionOnSessionLeak actionOnSessionLeak; private final boolean trackStackTraceOfSessionCheckout; private final InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions; - private final long initialWaitForSessionTimeoutMillis; + + /** + * Use {@link #acquireSessionTimeout} instead to specify the total duration to wait while + * acquiring session for a transaction. + */ + @Deprecated private final long initialWaitForSessionTimeoutMillis; + private final boolean autoDetectDialect; private final Duration waitForMinSessions; + private final Duration acquireSessionTimeout; /** Property for allowing mocking of session maintenance clock. */ private final Clock poolMaintainerClock; @@ -78,6 +85,7 @@ private SessionPoolOptions(Builder builder) { this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter; this.autoDetectDialect = builder.autoDetectDialect; this.waitForMinSessions = builder.waitForMinSessions; + this.acquireSessionTimeout = builder.acquireSessionTimeout; this.inactiveTransactionRemovalOptions = builder.inactiveTransactionRemovalOptions; this.poolMaintainerClock = builder.poolMaintainerClock; } @@ -105,6 +113,7 @@ public boolean equals(Object o) { && Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter) && Objects.equals(this.autoDetectDialect, other.autoDetectDialect) && Objects.equals(this.waitForMinSessions, other.waitForMinSessions) + && Objects.equals(this.acquireSessionTimeout, other.acquireSessionTimeout) && Objects.equals( this.inactiveTransactionRemovalOptions, other.inactiveTransactionRemovalOptions) && Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock); @@ -128,6 +137,7 @@ public int hashCode() { this.removeInactiveSessionAfter, this.autoDetectDialect, this.waitForMinSessions, + this.acquireSessionTimeout, this.inactiveTransactionRemovalOptions, this.poolMaintainerClock); } @@ -239,6 +249,11 @@ Duration getWaitForMinSessions() { return waitForMinSessions; } + @VisibleForTesting + Duration getAcquireSessionTimeout() { + return acquireSessionTimeout; + } + public static Builder newBuilder() { return new Builder(); } @@ -424,6 +439,7 @@ public static class Builder { private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L); private boolean autoDetectDialect = false; private Duration waitForMinSessions = Duration.ZERO; + private Duration acquireSessionTimeout = Duration.ofSeconds(60); private Clock poolMaintainerClock; @@ -446,6 +462,7 @@ private Builder(SessionPoolOptions options) { this.removeInactiveSessionAfter = options.removeInactiveSessionAfter; this.autoDetectDialect = options.autoDetectDialect; this.waitForMinSessions = options.waitForMinSessions; + this.acquireSessionTimeout = options.acquireSessionTimeout; this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions; this.poolMaintainerClock = options.poolMaintainerClock; } @@ -538,6 +555,10 @@ public Builder setFailIfPoolExhausted() { /** * If all sessions are in use and there is no more room for creating new sessions, block for a * session to become available. Default behavior is same. + * + *

By default the requests are blocked for 60s and will fail with a `SpannerException` with + * error code `ResourceExhausted` if this timeout is exceeded. If you wish to block for a + * different period use the option {@link Builder#setAcquireSessionTimeout(Duration)} ()} */ public Builder setBlockIfPoolExhausted() { this.actionOnExhaustion = ActionOnExhaustion.BLOCK; @@ -695,6 +716,25 @@ public Builder setWaitForMinSessions(Duration waitForMinSessions) { return this; } + /** + * If greater than zero, we wait for said duration when no sessions are available in the {@link + * SessionPool}. The default is a 60s timeout. Set the value to null to disable the timeout. + */ + public Builder setAcquireSessionTimeout(Duration acquireSessionTimeout) { + try { + if (acquireSessionTimeout != null) { + Preconditions.checkArgument( + acquireSessionTimeout.toMillis() > 0, + "acquireSessionTimeout should be greater than 0 ns"); + } + } catch (ArithmeticException ex) { + throw new IllegalArgumentException( + "acquireSessionTimeout in millis should be lesser than Long.MAX_VALUE"); + } + this.acquireSessionTimeout = acquireSessionTimeout; + return this; + } + /** Build a SessionPoolOption object */ public SessionPoolOptions build() { validate(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchCreateSessionsSlowTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchCreateSessionsSlowTest.java new file mode 100644 index 00000000000..b67123e0a58 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchCreateSessionsSlowTest.java @@ -0,0 +1,207 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.spanner; + +import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_RESULTSET; +import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_STATEMENT; +import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; +import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Server; +import io.grpc.inprocess.InProcessServerBuilder; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; + +@Category(SlowTest.class) +@RunWith(JUnit4.class) +public class BatchCreateSessionsSlowTest { + private static final String TEST_PROJECT = "my-project"; + private static final String TEST_DATABASE_ROLE = "my-role"; + private static MockSpannerServiceImpl mockSpanner; + private static Server server; + private static LocalChannelProvider channelProvider; + private Spanner spanner; + + @BeforeClass + public static void startStaticServer() throws IOException { + mockSpanner = new MockSpannerServiceImpl(); + mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions. + mockSpanner.putStatementResult( + StatementResult.query(SELECT1, MockSpannerTestUtil.SELECT1_RESULTSET)); + mockSpanner.putStatementResult( + StatementResult.query(READ_ONE_KEY_VALUE_STATEMENT, READ_ONE_KEY_VALUE_RESULTSET)); + + String uniqueName = InProcessServerBuilder.generateName(); + server = + InProcessServerBuilder.forName(uniqueName) + // We need to use a real executor for timeouts to occur. + .scheduledExecutorService(new ScheduledThreadPoolExecutor(1)) + .addService(mockSpanner) + .build() + .start(); + channelProvider = LocalChannelProvider.create(uniqueName); + } + + @AfterClass + public static void stopServer() throws InterruptedException { + server.shutdown(); + server.awaitTermination(); + } + + @Before + public void setUp() { + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(SessionPoolOptions.newBuilder().setFailOnSessionLeak().build()) + .build() + .getService(); + } + + @After + public void tearDown() { + mockSpanner.unfreeze(); + spanner.close(); + mockSpanner.reset(); + mockSpanner.removeAllExecutionTimes(); + } + + @Test + public void testBatchCreateSessionsTimesOut_whenDeadlineExceeded() throws Exception { + // Simulate a minimum execution time of 1000 milliseconds for the BatchCreateSessions RPC. + mockSpanner.setBatchCreateSessionsExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0)); + SpannerOptions.Builder builder = + SpannerOptions.newBuilder() + .setProjectId("my-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()); + // Set the timeout and retry settings for BatchCreateSessions to a simple + // single-attempt-and-timeout after 100ms. + builder + .getSpannerStubSettingsBuilder() + .batchCreateSessionsSettings() + .setSimpleTimeoutNoRetries(Duration.ofMillis(100)); + + try (Spanner spanner = builder.build().getService()) { + DatabaseId databaseId = DatabaseId.of("my-project", "my-instance", "my-database"); + DatabaseClient client = spanner.getDatabaseClient(databaseId); + + ListeningExecutorService service = + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1000)); + List> futures = new ArrayList<>(5000); + AtomicInteger counter = new AtomicInteger(); + for (int i = 0; i < 5000; i++) { + final int index = i; + futures.add( + service.submit( + () -> { + // The following call is non-blocking and will not generate an exception. + ResultSet rs = client.singleUse().executeQuery(SELECT1); + // Actually trying to get any results will cause an exception. + // The DEADLINE_EXCEEDED error of the BatchCreateSessions RPC is in this case + // propagated to + // the application. + SpannerException e = assertThrows(SpannerException.class, rs::next); + assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode()); + System.out.printf("finished test %d\n", counter.incrementAndGet()); + + return null; + })); + } + service.shutdown(); + assertEquals(5000, Futures.allAsList(futures).get().size()); + } + } + + @Test + public void testBatchCreateSessionsTimesOut_whenResourceExhausted() throws Exception { + // Simulate a minimum execution time of 2000 milliseconds for the BatchCreateSessions RPC. + mockSpanner.setBatchCreateSessionsExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime(2000, 0)); + // Add a timeout for the max amount of time (60ms) that a request waits when a session is + // unavailable. + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder().setAcquireSessionTimeout(Duration.ofMillis(60)).build(); + SpannerOptions.Builder builder = + SpannerOptions.newBuilder() + .setProjectId("my-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions); + // Set the timeout and retry settings for BatchCreateSessions to a simple + // single-attempt-and-timeout after 1000ms. This will ensure that session acquisition timeout of + // 60ms will kick for all requests before the overall request RPC timeout is breached. + builder + .getSpannerStubSettingsBuilder() + .batchCreateSessionsSettings() + .setSimpleTimeoutNoRetries(Duration.ofMillis(1000)); + + try (Spanner spanner = builder.build().getService()) { + DatabaseId databaseId = DatabaseId.of("my-project", "my-instance", "my-database"); + DatabaseClient client = spanner.getDatabaseClient(databaseId); + + ListeningExecutorService service = + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1000)); + List> futures = new ArrayList<>(5000); + AtomicInteger counter = new AtomicInteger(); + for (int i = 0; i < 5000; i++) { + final int index = i; + futures.add( + service.submit( + () -> { + // The following call is non-blocking and will not generate an exception. + ResultSet rs = client.singleUse().executeQuery(SELECT1); + // Actually trying to get any results will cause an exception. + // When number of requests > MAX_SESSIONS, post setAcquireSessionTimeout + // a few requests will timeout with RESOURCE_EXHAUSTED error. + SpannerException e = assertThrows(SpannerException.class, rs::next); + assertEquals(ErrorCode.RESOURCE_EXHAUSTED, e.getErrorCode()); + System.out.printf("finished test %d\n", counter.incrementAndGet()); + + return null; + })); + } + service.shutdown(); + assertEquals(5000, Futures.allAsList(futures).get().size()); + } + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java index a979c6fed9a..24e754f4050 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java @@ -186,4 +186,36 @@ public void setNegativeIdleTimeThreshold() { SessionPoolOptions.newBuilder() .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions); } + + @Test + public void setAcquireSessionTimeout() { + SessionPoolOptions sessionPoolOptions1 = + SessionPoolOptions.newBuilder().setAcquireSessionTimeout(Duration.ofSeconds(20)).build(); + SessionPoolOptions sessionPoolOptions2 = + SessionPoolOptions.newBuilder() + .setAcquireSessionTimeout(Duration.ofMillis(Long.MAX_VALUE)) + .build(); + + assertEquals(Duration.ofSeconds(20), sessionPoolOptions1.getAcquireSessionTimeout()); + assertEquals(Duration.ofMillis(Long.MAX_VALUE), sessionPoolOptions2.getAcquireSessionTimeout()); + } + + @Test(expected = IllegalArgumentException.class) + public void setAcquireSessionTimeout_valueLessThanLowerBound() { + SessionPoolOptions.newBuilder().setAcquireSessionTimeout(Duration.ofMillis(0)).build(); + } + + @Test(expected = IllegalArgumentException.class) + public void setAcquireSessionTimeout_valueMoreThanUpperBound() { + SessionPoolOptions.newBuilder() + .setAcquireSessionTimeout(Duration.ofSeconds(Long.MAX_VALUE)) + .build(); + } + + @Test + public void verifyDefaultAcquireSessionTimeout() { + SessionPoolOptions sessionPoolOptions = SessionPoolOptions.newBuilder().build(); + + assertEquals(Duration.ofSeconds(60), sessionPoolOptions.getAcquireSessionTimeout()); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 8949ba6afa8..6ba9e7fc924 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -980,6 +980,55 @@ public void blockAndTimeoutOnPoolExhaustion() throws Exception { .setMinSessions(minSessions) .setMaxSessions(1) .setInitialWaitForSessionTimeoutMillis(20L) + .setAcquireSessionTimeout(null) + .build(); + setupMockSessionCreation(); + pool = createPool(); + // Take the only session that can be in the pool. + PooledSessionFuture checkedOutSession = pool.getSession(); + checkedOutSession.get(); + ExecutorService executor = Executors.newFixedThreadPool(1); + final CountDownLatch latch = new CountDownLatch(1); + // Then try asynchronously to take another session. This attempt should time out. + Future fut = + executor.submit( + () -> { + latch.countDown(); + PooledSessionFuture session = pool.getSession(); + session.close(); + return null; + }); + // Wait until the background thread is actually waiting for a session. + latch.await(); + // Wait until the request has timed out. + int waitCount = 0; + while (pool.getNumWaiterTimeouts() == 0L && waitCount < 1000) { + Thread.sleep(5L); + waitCount++; + } + // Return the checked out session to the pool so the async request will get a session and + // finish. + checkedOutSession.close(); + // Verify that the async request also succeeds. + fut.get(10L, TimeUnit.SECONDS); + executor.shutdown(); + + // Verify that the session was returned to the pool and that we can get it again. + Session session = pool.getSession(); + assertThat(session).isNotNull(); + session.close(); + assertThat(pool.getNumWaiterTimeouts()).isAtLeast(1L); + } + + @Test + public void blockAndTimeoutOnPoolExhaustion_withAcquireSessionTimeout() throws Exception { + // Create a session pool with max 1 session and a low timeout for waiting for a session. + options = + SessionPoolOptions.newBuilder() + .setMinSessions(minSessions) + .setMaxSessions(1) + .setInitialWaitForSessionTimeoutMillis(20L) + .setAcquireSessionTimeout(Duration.ofMillis(20L)) .build(); setupMockSessionCreation(); pool = createPool();