Skip to content

Commit

Permalink
feat: add session pool option for modelling a timeout around session …
Browse files Browse the repository at this point in the history
…acquisition. (#2641)

* feat: add session pool option for modelling a timeout around session acquisition.

* docs: modify existing documentation.

* Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java

Co-authored-by: Knut Olav Løite <[email protected]>

* Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java

Co-authored-by: Knut Olav Løite <[email protected]>

* chore: address review comments.

* fix: lint errors.

* fix: broken unit test.

* chore: add more unit test.

* fix: review comments.

* fix: NPE in unit test.

* Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java

Co-authored-by: Knut Olav Løite <[email protected]>

* chore: fix review comments.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Knut Olav Løite <[email protected]>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 28, 2023
1 parent f03ce56 commit 428e294
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -120,17 +119,6 @@ class SessionPool {
private static final Logger logger = Logger.getLogger(SessionPool.class.getName());
private static final Tracer tracer = Tracing.getTracer();
static final String WAIT_FOR_SESSION = "SessionPool.WaitForSession";
static final ImmutableSet<ErrorCode> SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES =
ImmutableSet.of(
ErrorCode.UNKNOWN,
ErrorCode.INVALID_ARGUMENT,
ErrorCode.PERMISSION_DENIED,
ErrorCode.UNAUTHENTICATED,
ErrorCode.RESOURCE_EXHAUSTED,
ErrorCode.FAILED_PRECONDITION,
ErrorCode.OUT_OF_RANGE,
ErrorCode.UNIMPLEMENTED,
ErrorCode.INTERNAL);

/**
* If the {@link SessionPoolOptions#getWaitForMinSessions()} duration is greater than zero, waits
Expand Down Expand Up @@ -1675,7 +1663,8 @@ public PooledSession get() {
while (true) {
Span span = tracer.spanBuilder(WAIT_FOR_SESSION).startSpan();
try (Scope waitScope = tracer.withSpan(span)) {
PooledSession s = pollUninterruptiblyWithTimeout(currentTimeout);
PooledSession s =
pollUninterruptiblyWithTimeout(currentTimeout, options.getAcquireSessionTimeout());
if (s == null) {
// Set the status to DEADLINE_EXCEEDED and retry.
numWaiterTimeouts.incrementAndGet();
Expand All @@ -1685,6 +1674,11 @@ public PooledSession get() {
return s;
}
} catch (Exception e) {
if (e instanceof SpannerException
&& ErrorCode.RESOURCE_EXHAUSTED.equals(((SpannerException) e).getErrorCode())) {
numWaiterTimeouts.incrementAndGet();
tracer.getCurrentSpan().setStatus(Status.RESOURCE_EXHAUSTED);
}
TraceUtil.setWithFailure(span, e);
throw e;
} finally {
Expand All @@ -1693,15 +1687,26 @@ public PooledSession get() {
}
}

private PooledSession pollUninterruptiblyWithTimeout(long timeoutMillis) {
private PooledSession pollUninterruptiblyWithTimeout(
long timeoutMillis, Duration acquireSessionTimeout) {
boolean interrupted = false;
try {
while (true) {
try {
return waiter.get(timeoutMillis, TimeUnit.MILLISECONDS);
return acquireSessionTimeout == null
? waiter.get(timeoutMillis, TimeUnit.MILLISECONDS)
: waiter.get(acquireSessionTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
interrupted = true;
} catch (TimeoutException e) {
if (acquireSessionTimeout != null) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.RESOURCE_EXHAUSTED,
"Timed out after waiting "
+ acquireSessionTimeout.toMillis()
+ "ms for acquiring session. To mitigate error SessionPoolOptions#setAcquireSessionTimeout(Duration) to set a higher timeout"
+ " or increase the number of sessions in the session pool.");
}
return null;
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,16 @@ public class SessionPoolOptions {
private final ActionOnSessionLeak actionOnSessionLeak;
private final boolean trackStackTraceOfSessionCheckout;
private final InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions;
private final long initialWaitForSessionTimeoutMillis;

/**
* Use {@link #acquireSessionTimeout} instead to specify the total duration to wait while
* acquiring session for a transaction.
*/
@Deprecated private final long initialWaitForSessionTimeoutMillis;

private final boolean autoDetectDialect;
private final Duration waitForMinSessions;
private final Duration acquireSessionTimeout;

/** Property for allowing mocking of session maintenance clock. */
private final Clock poolMaintainerClock;
Expand All @@ -78,6 +85,7 @@ private SessionPoolOptions(Builder builder) {
this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter;
this.autoDetectDialect = builder.autoDetectDialect;
this.waitForMinSessions = builder.waitForMinSessions;
this.acquireSessionTimeout = builder.acquireSessionTimeout;
this.inactiveTransactionRemovalOptions = builder.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = builder.poolMaintainerClock;
}
Expand Down Expand Up @@ -105,6 +113,7 @@ public boolean equals(Object o) {
&& Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter)
&& Objects.equals(this.autoDetectDialect, other.autoDetectDialect)
&& Objects.equals(this.waitForMinSessions, other.waitForMinSessions)
&& Objects.equals(this.acquireSessionTimeout, other.acquireSessionTimeout)
&& Objects.equals(
this.inactiveTransactionRemovalOptions, other.inactiveTransactionRemovalOptions)
&& Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock);
Expand All @@ -128,6 +137,7 @@ public int hashCode() {
this.removeInactiveSessionAfter,
this.autoDetectDialect,
this.waitForMinSessions,
this.acquireSessionTimeout,
this.inactiveTransactionRemovalOptions,
this.poolMaintainerClock);
}
Expand Down Expand Up @@ -239,6 +249,11 @@ Duration getWaitForMinSessions() {
return waitForMinSessions;
}

@VisibleForTesting
Duration getAcquireSessionTimeout() {
return acquireSessionTimeout;
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down Expand Up @@ -424,6 +439,7 @@ public static class Builder {
private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L);
private boolean autoDetectDialect = false;
private Duration waitForMinSessions = Duration.ZERO;
private Duration acquireSessionTimeout = Duration.ofSeconds(60);

private Clock poolMaintainerClock;

Expand All @@ -446,6 +462,7 @@ private Builder(SessionPoolOptions options) {
this.removeInactiveSessionAfter = options.removeInactiveSessionAfter;
this.autoDetectDialect = options.autoDetectDialect;
this.waitForMinSessions = options.waitForMinSessions;
this.acquireSessionTimeout = options.acquireSessionTimeout;
this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = options.poolMaintainerClock;
}
Expand Down Expand Up @@ -538,6 +555,10 @@ public Builder setFailIfPoolExhausted() {
/**
* If all sessions are in use and there is no more room for creating new sessions, block for a
* session to become available. Default behavior is same.
*
* <p>By default the requests are blocked for 60s and will fail with a `SpannerException` with
* error code `ResourceExhausted` if this timeout is exceeded. If you wish to block for a
* different period use the option {@link Builder#setAcquireSessionTimeout(Duration)} ()}
*/
public Builder setBlockIfPoolExhausted() {
this.actionOnExhaustion = ActionOnExhaustion.BLOCK;
Expand Down Expand Up @@ -695,6 +716,25 @@ public Builder setWaitForMinSessions(Duration waitForMinSessions) {
return this;
}

/**
* If greater than zero, we wait for said duration when no sessions are available in the {@link
* SessionPool}. The default is a 60s timeout. Set the value to null to disable the timeout.
*/
public Builder setAcquireSessionTimeout(Duration acquireSessionTimeout) {
try {
if (acquireSessionTimeout != null) {
Preconditions.checkArgument(
acquireSessionTimeout.toMillis() > 0,
"acquireSessionTimeout should be greater than 0 ns");
}
} catch (ArithmeticException ex) {
throw new IllegalArgumentException(
"acquireSessionTimeout in millis should be lesser than Long.MAX_VALUE");
}
this.acquireSessionTimeout = acquireSessionTimeout;
return this;
}

/** Build a SessionPoolOption object */
public SessionPoolOptions build() {
validate();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spanner;

import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_RESULTSET;
import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_STATEMENT;
import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.cloud.NoCredentials;
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Server;
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@Category(SlowTest.class)
@RunWith(JUnit4.class)
public class BatchCreateSessionsSlowTest {
private static final String TEST_PROJECT = "my-project";
private static final String TEST_DATABASE_ROLE = "my-role";
private static MockSpannerServiceImpl mockSpanner;
private static Server server;
private static LocalChannelProvider channelProvider;
private Spanner spanner;

@BeforeClass
public static void startStaticServer() throws IOException {
mockSpanner = new MockSpannerServiceImpl();
mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions.
mockSpanner.putStatementResult(
StatementResult.query(SELECT1, MockSpannerTestUtil.SELECT1_RESULTSET));
mockSpanner.putStatementResult(
StatementResult.query(READ_ONE_KEY_VALUE_STATEMENT, READ_ONE_KEY_VALUE_RESULTSET));

String uniqueName = InProcessServerBuilder.generateName();
server =
InProcessServerBuilder.forName(uniqueName)
// We need to use a real executor for timeouts to occur.
.scheduledExecutorService(new ScheduledThreadPoolExecutor(1))
.addService(mockSpanner)
.build()
.start();
channelProvider = LocalChannelProvider.create(uniqueName);
}

@AfterClass
public static void stopServer() throws InterruptedException {
server.shutdown();
server.awaitTermination();
}

@Before
public void setUp() {
spanner =
SpannerOptions.newBuilder()
.setProjectId(TEST_PROJECT)
.setDatabaseRole(TEST_DATABASE_ROLE)
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance())
.setSessionPoolOption(SessionPoolOptions.newBuilder().setFailOnSessionLeak().build())
.build()
.getService();
}

@After
public void tearDown() {
mockSpanner.unfreeze();
spanner.close();
mockSpanner.reset();
mockSpanner.removeAllExecutionTimes();
}

@Test
public void testBatchCreateSessionsTimesOut_whenDeadlineExceeded() throws Exception {
// Simulate a minimum execution time of 1000 milliseconds for the BatchCreateSessions RPC.
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0));
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId("my-project")
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance());
// Set the timeout and retry settings for BatchCreateSessions to a simple
// single-attempt-and-timeout after 100ms.
builder
.getSpannerStubSettingsBuilder()
.batchCreateSessionsSettings()
.setSimpleTimeoutNoRetries(Duration.ofMillis(100));

try (Spanner spanner = builder.build().getService()) {
DatabaseId databaseId = DatabaseId.of("my-project", "my-instance", "my-database");
DatabaseClient client = spanner.getDatabaseClient(databaseId);

ListeningExecutorService service =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1000));
List<ListenableFuture<Void>> futures = new ArrayList<>(5000);
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 5000; i++) {
final int index = i;
futures.add(
service.submit(
() -> {
// The following call is non-blocking and will not generate an exception.
ResultSet rs = client.singleUse().executeQuery(SELECT1);
// Actually trying to get any results will cause an exception.
// The DEADLINE_EXCEEDED error of the BatchCreateSessions RPC is in this case
// propagated to
// the application.
SpannerException e = assertThrows(SpannerException.class, rs::next);
assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode());
System.out.printf("finished test %d\n", counter.incrementAndGet());

return null;
}));
}
service.shutdown();
assertEquals(5000, Futures.allAsList(futures).get().size());
}
}

@Test
public void testBatchCreateSessionsTimesOut_whenResourceExhausted() throws Exception {
// Simulate a minimum execution time of 2000 milliseconds for the BatchCreateSessions RPC.
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.ofMinimumAndRandomTime(2000, 0));
// Add a timeout for the max amount of time (60ms) that a request waits when a session is
// unavailable.
SessionPoolOptions sessionPoolOptions =
SessionPoolOptions.newBuilder().setAcquireSessionTimeout(Duration.ofMillis(60)).build();
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId("my-project")
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance())
.setSessionPoolOption(sessionPoolOptions);
// Set the timeout and retry settings for BatchCreateSessions to a simple
// single-attempt-and-timeout after 1000ms. This will ensure that session acquisition timeout of
// 60ms will kick for all requests before the overall request RPC timeout is breached.
builder
.getSpannerStubSettingsBuilder()
.batchCreateSessionsSettings()
.setSimpleTimeoutNoRetries(Duration.ofMillis(1000));

try (Spanner spanner = builder.build().getService()) {
DatabaseId databaseId = DatabaseId.of("my-project", "my-instance", "my-database");
DatabaseClient client = spanner.getDatabaseClient(databaseId);

ListeningExecutorService service =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1000));
List<ListenableFuture<Void>> futures = new ArrayList<>(5000);
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 5000; i++) {
final int index = i;
futures.add(
service.submit(
() -> {
// The following call is non-blocking and will not generate an exception.
ResultSet rs = client.singleUse().executeQuery(SELECT1);
// Actually trying to get any results will cause an exception.
// When number of requests > MAX_SESSIONS, post setAcquireSessionTimeout
// a few requests will timeout with RESOURCE_EXHAUSTED error.
SpannerException e = assertThrows(SpannerException.class, rs::next);
assertEquals(ErrorCode.RESOURCE_EXHAUSTED, e.getErrorCode());
System.out.printf("finished test %d\n", counter.incrementAndGet());

return null;
}));
}
service.shutdown();
assertEquals(5000, Futures.allAsList(futures).get().size());
}
}
}
Loading

0 comments on commit 428e294

Please sign in to comment.