diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index d048e1d848c..15307569a0b 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -336,6 +336,9 @@ benchmark + + + @@ -355,7 +358,7 @@ -classpath org.openjdk.jmh.Main - .* + ${benchmark.name} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 0582f4b1f58..8d731f191cc 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -69,6 +69,7 @@ import io.opencensus.trace.Status; import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; +import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -848,7 +849,7 @@ private void keepAlive() { } } - private void markUsed() { + void markUsed() { lastUseTime = clock.instant(); } @@ -929,24 +930,30 @@ private SessionOrError pollUninterruptiblyWithTimeout(long timeoutMillis) { } } - // Background task to maintain the pool. It closes idle sessions, keeps alive sessions that have - // not been used for a user configured time and creates session if needed to bring pool up to - // minimum required sessions. We keep track of the number of concurrent sessions being used. - // The maximum value of that over a window (10 minutes) tells us how many sessions we need in the - // pool. We close the remaining sessions. To prevent bursty traffic, we smear this out over the - // window length. We also smear out the keep alive traffic over the keep alive period. + /** + * Background task to maintain the pool. Tasks: + * + *
    + *
  • Removes idle sessions from the pool. Sessions that go above MinSessions that have not + * been used for the last 55 minutes will be removed from the pool. These will automatically + * be garbage collected by the backend. + *
  • Keeps alive sessions that have not been used for a user configured time in order to keep + * MinSessions sessions alive in the pool at any time. The keep-alive traffic is smeared out + * over a window of 10 minutes to avoid bursty traffic. + *
+ */ final class PoolMaintainer { // Length of the window in millis over which we keep track of maximum number of concurrent // sessions in use. private final Duration windowLength = Duration.ofMillis(TimeUnit.MINUTES.toMillis(10)); // Frequency of the timer loop. - @VisibleForTesting static final long LOOP_FREQUENCY = 10 * 1000L; + @VisibleForTesting final long loopFrequency = options.getLoopFrequency(); // Number of loop iterations in which we need to to close all the sessions waiting for closure. - @VisibleForTesting final long numClosureCycles = windowLength.toMillis() / LOOP_FREQUENCY; + @VisibleForTesting final long numClosureCycles = windowLength.toMillis() / loopFrequency; private final Duration keepAliveMilis = Duration.ofMillis(TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes())); // Number of loop iterations in which we need to keep alive all the sessions - @VisibleForTesting final long numKeepAliveCycles = keepAliveMilis.toMillis() / LOOP_FREQUENCY; + @VisibleForTesting final long numKeepAliveCycles = keepAliveMilis.toMillis() / loopFrequency; Instant lastResetTime = Instant.ofEpochMilli(0); int numSessionsToClose = 0; @@ -969,8 +976,8 @@ public void run() { maintainPool(); } }, - LOOP_FREQUENCY, - LOOP_FREQUENCY, + loopFrequency, + loopFrequency, TimeUnit.MILLISECONDS); } } @@ -993,7 +1000,7 @@ void maintainPool() { running = true; } Instant currTime = clock.instant(); - closeIdleSessions(currTime); + removeIdleSessions(currTime); // Now go over all the remaining sessions and see if they need to be kept alive explicitly. keepAliveSessions(currTime); replenishPool(); @@ -1005,46 +1012,43 @@ void maintainPool() { } } - private void closeIdleSessions(Instant currTime) { - LinkedList sessionsToClose = new LinkedList<>(); + private void removeIdleSessions(Instant currTime) { synchronized (lock) { - // Every ten minutes figure out how many sessions need to be closed then close them over - // next ten minutes. - if (currTime.isAfter(lastResetTime.plus(windowLength))) { - int sessionsToKeep = - Math.max(options.getMinSessions(), maxSessionsInUse + options.getMaxIdleSessions()); - numSessionsToClose = totalSessions() - sessionsToKeep; - sessionsToClosePerLoop = (int) Math.ceil((double) numSessionsToClose / numClosureCycles); - maxSessionsInUse = 0; - lastResetTime = currTime; - } - if (numSessionsToClose > 0) { - while (sessionsToClose.size() < Math.min(numSessionsToClose, sessionsToClosePerLoop)) { - PooledSession sess = - readSessions.size() > 0 ? readSessions.poll() : writePreparedSessions.poll(); - if (sess != null) { - if (sess.state != SessionState.CLOSING) { - sess.markClosing(); - sessionsToClose.add(sess); + // Determine the minimum last use time for a session to be deemed to still be alive. Remove + // all sessions that have a lastUseTime before that time, unless it would cause us to go + // below MinSessions. Prefer to remove read sessions above write-prepared sessions. + Instant minLastUseTime = currTime.minus(options.getRemoveInactiveSessionAfter()); + for (Iterator iterator : + Arrays.asList( + readSessions.descendingIterator(), writePreparedSessions.descendingIterator())) { + while (iterator.hasNext()) { + PooledSession session = iterator.next(); + if (session.lastUseTime.isBefore(minLastUseTime)) { + if (session.state != SessionState.CLOSING) { + removeFromPool(session); + iterator.remove(); } - } else { - break; } } - numSessionsToClose -= sessionsToClose.size(); } } - for (PooledSession sess : sessionsToClose) { - logger.log(Level.FINE, "Closing session {0}", sess.getName()); - closeSessionAsync(sess); - } } private void keepAliveSessions(Instant currTime) { long numSessionsToKeepAlive = 0; synchronized (lock) { + if (numSessionsInUse >= (options.getMinSessions() + options.getMaxIdleSessions())) { + // At least MinSessions are in use, so we don't have to ping any sessions. + return; + } // In each cycle only keep alive a subset of sessions to prevent burst of traffic. - numSessionsToKeepAlive = (long) Math.ceil((double) totalSessions() / numKeepAliveCycles); + numSessionsToKeepAlive = + (long) + Math.ceil( + (double) + ((options.getMinSessions() + options.getMaxIdleSessions()) + - numSessionsInUse) + / numKeepAliveCycles); } // Now go over all the remaining sessions and see if they need to be kept alive explicitly. Instant keepAliveThreshold = currTime.minus(keepAliveMilis); @@ -1053,9 +1057,11 @@ private void keepAliveSessions(Instant currTime) { while (numSessionsToKeepAlive > 0) { PooledSession sessionToKeepAlive = null; synchronized (lock) { - sessionToKeepAlive = findSessionToKeepAlive(readSessions, keepAliveThreshold); + sessionToKeepAlive = findSessionToKeepAlive(readSessions, keepAliveThreshold, 0); if (sessionToKeepAlive == null) { - sessionToKeepAlive = findSessionToKeepAlive(writePreparedSessions, keepAliveThreshold); + sessionToKeepAlive = + findSessionToKeepAlive( + writePreparedSessions, keepAliveThreshold, readSessions.size()); } } if (sessionToKeepAlive == null) { @@ -1137,6 +1143,9 @@ private static enum Position { @GuardedBy("lock") private long numSessionsReleased = 0; + @GuardedBy("lock") + private long numIdleSessionsRemoved = 0; + private AtomicLong numWaiterTimeouts = new AtomicLong(); @GuardedBy("lock") @@ -1144,6 +1153,8 @@ private static enum Position { private final SessionConsumer sessionConsumer = new SessionConsumerImpl(); + @VisibleForTesting Function idleSessionRemovedListener; + /** * Create a session pool with the given options and for the given database. It will also start * eagerly creating sessions if {@link SessionPoolOptions#getMinSessions()} is greater than 0. @@ -1232,6 +1243,28 @@ private SessionPool( this.initMetricsCollection(metricRegistry, labelValues); } + @VisibleForTesting + void removeFromPool(PooledSession session) { + synchronized (lock) { + if (isClosed()) { + decrementPendingClosures(1); + return; + } + session.markClosing(); + allSessions.remove(session); + numIdleSessionsRemoved++; + if (idleSessionRemovedListener != null) { + idleSessionRemovedListener.apply(session); + } + } + } + + long numIdleSessionsRemoved() { + synchronized (lock) { + return numIdleSessionsRemoved; + } + } + @VisibleForTesting int getNumberOfAvailableWritePreparedSessions() { synchronized (lock) { @@ -1313,14 +1346,18 @@ private void invalidateSession(PooledSession session) { } private PooledSession findSessionToKeepAlive( - Queue queue, Instant keepAliveThreshold) { + Queue queue, Instant keepAliveThreshold, int numAlreadyChecked) { + int numChecked = 0; Iterator iterator = queue.iterator(); - while (iterator.hasNext()) { + while (iterator.hasNext() + && (numChecked + numAlreadyChecked) + < (options.getMinSessions() + options.getMaxIdleSessions() - numSessionsInUse)) { PooledSession session = iterator.next(); if (session.lastUseTime.isBefore(keepAliveThreshold)) { iterator.remove(); return session; } + numChecked++; } return null; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 31bebac32d0..17295a38ab4 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.threeten.bp.Duration; /** Options for the session pool used by {@code DatabaseClient}. */ public class SessionPoolOptions { @@ -32,7 +33,9 @@ public class SessionPoolOptions { private final int maxIdleSessions; private final float writeSessionsFraction; private final ActionOnExhaustion actionOnExhaustion; + private final long loopFrequency; private final int keepAliveIntervalMinutes; + private final Duration removeInactiveSessionAfter; private final ActionOnSessionNotFound actionOnSessionNotFound; private final long initialWaitForSessionTimeoutMillis; @@ -48,7 +51,9 @@ private SessionPoolOptions(Builder builder) { this.actionOnExhaustion = builder.actionOnExhaustion; this.actionOnSessionNotFound = builder.actionOnSessionNotFound; this.initialWaitForSessionTimeoutMillis = builder.initialWaitForSessionTimeoutMillis; + this.loopFrequency = builder.loopFrequency; this.keepAliveIntervalMinutes = builder.keepAliveIntervalMinutes; + this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter; } public int getMinSessions() { @@ -71,10 +76,18 @@ public float getWriteSessionsFraction() { return writeSessionsFraction; } + long getLoopFrequency() { + return loopFrequency; + } + public int getKeepAliveIntervalMinutes() { return keepAliveIntervalMinutes; } + public Duration getRemoveInactiveSessionAfter() { + return removeInactiveSessionAfter; + } + public boolean isFailIfPoolExhausted() { return actionOnExhaustion == ActionOnExhaustion.FAIL; } @@ -118,7 +131,9 @@ public static class Builder { private ActionOnExhaustion actionOnExhaustion = DEFAULT_ACTION; private long initialWaitForSessionTimeoutMillis = 30_000L; private ActionOnSessionNotFound actionOnSessionNotFound = ActionOnSessionNotFound.RETRY; + private long loopFrequency = 10 * 1000L; private int keepAliveIntervalMinutes = 30; + private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L); /** * Minimum number of sessions that this pool will always maintain. These will be created eagerly @@ -165,6 +180,16 @@ public Builder setMaxIdleSessions(int maxIdleSessions) { return this; } + Builder setLoopFrequency(long loopFrequency) { + this.loopFrequency = loopFrequency; + return this; + } + + public Builder setRemoveInactiveSessionAfter(Duration duration) { + this.removeInactiveSessionAfter = duration; + return this; + } + /** * How frequently to keep alive idle sessions. This should be less than 60 since an idle session * is automatically closed after 60 minutes. Sessions will be kept alive by sending a dummy diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java index c3f724edea9..26bbef4535b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java @@ -71,7 +71,7 @@ SessionImpl mockSession() { void runMaintainanceLoop(FakeClock clock, SessionPool pool, long numCycles) { for (int i = 0; i < numCycles; i++) { pool.poolMaintainer.maintainPool(); - clock.currentTimeMillis += SessionPool.PoolMaintainer.LOOP_FREQUENCY; + clock.currentTimeMillis += pool.poolMaintainer.loopFrequency; } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java index 6fc750b57a0..fe5599b32ca 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java @@ -18,27 +18,14 @@ import static com.google.common.truth.Truth.assertThat; -import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.NoCredentials; -import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; -import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; -import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.protobuf.AbstractMessage; -import com.google.protobuf.ListValue; import com.google.spanner.v1.BatchCreateSessionsRequest; -import com.google.spanner.v1.ResultSetMetadata; -import com.google.spanner.v1.StructType; -import com.google.spanner.v1.StructType.Field; -import com.google.spanner.v1.TypeCode; -import io.grpc.Server; -import io.grpc.Status; -import io.grpc.inprocess.InProcessServerBuilder; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -63,8 +50,10 @@ /** * Benchmarks for common session pool scenarios. The simulated execution times are based on * reasonable estimates and are primarily intended to keep the benchmarks comparable with each other - * before and after changes have been made to the pool. The benchmarks are bound to the build - * profile `benchmark` and can be executed like this: `mvn test -Pbenchmark` + * before and after changes have been made to the pool. The benchmarks are bound to the Maven + * profile `benchmark` and can be executed like this: + * mvn clean test -DskipTests -Pbenchmark -Dbenchmark.name=SessionPoolBenchmark + * */ @BenchmarkMode(Mode.AverageTime) @Fork(value = 1, warmups = 0) @@ -79,59 +68,10 @@ public class SessionPoolBenchmark { private static final int RND_WAIT_TIME_BETWEEN_REQUESTS = 10; private static final Random RND = new Random(); - public static void main(String[] args) throws Exception { - org.openjdk.jmh.Main.main(args); - } - @State(Scope.Thread) @AuxCounters(org.openjdk.jmh.annotations.AuxCounters.Type.EVENTS) - public static class MockServer { - private static final int NETWORK_LATENCY_TIME = 10; - private static final int BATCH_CREATE_SESSIONS_MIN_TIME = 10; - private static final int BATCH_CREATE_SESSIONS_RND_TIME = 10; - private static final int BEGIN_TRANSACTION_MIN_TIME = 1; - private static final int BEGIN_TRANSACTION_RND_TIME = 1; - private static final int COMMIT_TRANSACTION_MIN_TIME = 5; - private static final int COMMIT_TRANSACTION_RND_TIME = 5; - private static final int ROLLBACK_TRANSACTION_MIN_TIME = 1; - private static final int ROLLBACK_TRANSACTION_RND_TIME = 1; - private static final int EXECUTE_STREAMING_SQL_MIN_TIME = 10; - private static final int EXECUTE_STREAMING_SQL_RND_TIME = 10; - private static final int EXECUTE_SQL_MIN_TIME = 10; - private static final int EXECUTE_SQL_RND_TIME = 10; - - private static final Statement UPDATE_STATEMENT = - Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2"); - private static final Statement INVALID_UPDATE_STATEMENT = - Statement.of("UPDATE NON_EXISTENT_TABLE SET BAR=1 WHERE BAZ=2"); - private static final long UPDATE_COUNT = 1L; - private static final Statement SELECT1 = Statement.of("SELECT 1 AS COL1"); - private static final ResultSetMetadata SELECT1_METADATA = - ResultSetMetadata.newBuilder() - .setRowType( - StructType.newBuilder() - .addFields( - Field.newBuilder() - .setName("COL1") - .setType( - com.google.spanner.v1.Type.newBuilder() - .setCode(TypeCode.INT64) - .build()) - .build()) - .build()) - .build(); - private static final com.google.spanner.v1.ResultSet SELECT1_RESULTSET = - com.google.spanner.v1.ResultSet.newBuilder() - .addRows( - ListValue.newBuilder() - .addValues(com.google.protobuf.Value.newBuilder().setStringValue("1").build()) - .build()) - .setMetadata(SELECT1_METADATA) - .build(); - private MockSpannerServiceImpl mockSpanner; - private Server server; - private LocalChannelProvider channelProvider; - + public static class BenchmarkState { + private StandardBenchmarkMockServer mockServer; private Spanner spanner; private DatabaseClientImpl client; @@ -152,50 +92,18 @@ public static class MockServer { /** AuxCounter for number of RPCs. */ public int numBatchCreateSessionsRpcs() { - return countRequests(BatchCreateSessionsRequest.class); + return mockServer.countRequests(BatchCreateSessionsRequest.class); } /** AuxCounter for number of sessions created. */ public int sessionsCreated() { - return mockSpanner.numSessionsCreated(); + return mockServer.getMockSpanner().numSessionsCreated(); } @Setup(Level.Invocation) public void setup() throws Exception { - mockSpanner = new MockSpannerServiceImpl(); - mockSpanner.setAbortProbability( - 0.0D); // We don't want any unpredictable aborted transactions. - mockSpanner.putStatementResult(StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); - mockSpanner.putStatementResult(StatementResult.query(SELECT1, SELECT1_RESULTSET)); - mockSpanner.putStatementResult( - StatementResult.exception( - INVALID_UPDATE_STATEMENT, - Status.INVALID_ARGUMENT.withDescription("invalid statement").asRuntimeException())); - - mockSpanner.setBatchCreateSessionsExecutionTime( - SimulatedExecutionTime.ofMinimumAndRandomTime( - NETWORK_LATENCY_TIME + BATCH_CREATE_SESSIONS_MIN_TIME, - BATCH_CREATE_SESSIONS_RND_TIME)); - mockSpanner.setBeginTransactionExecutionTime( - SimulatedExecutionTime.ofMinimumAndRandomTime( - NETWORK_LATENCY_TIME + BEGIN_TRANSACTION_MIN_TIME, BEGIN_TRANSACTION_RND_TIME)); - mockSpanner.setCommitExecutionTime( - SimulatedExecutionTime.ofMinimumAndRandomTime( - NETWORK_LATENCY_TIME + COMMIT_TRANSACTION_MIN_TIME, COMMIT_TRANSACTION_RND_TIME)); - mockSpanner.setRollbackExecutionTime( - SimulatedExecutionTime.ofMinimumAndRandomTime( - NETWORK_LATENCY_TIME + ROLLBACK_TRANSACTION_MIN_TIME, ROLLBACK_TRANSACTION_RND_TIME)); - mockSpanner.setExecuteStreamingSqlExecutionTime( - SimulatedExecutionTime.ofMinimumAndRandomTime( - NETWORK_LATENCY_TIME + EXECUTE_STREAMING_SQL_MIN_TIME, - EXECUTE_STREAMING_SQL_RND_TIME)); - mockSpanner.setExecuteSqlExecutionTime( - SimulatedExecutionTime.ofMinimumAndRandomTime( - NETWORK_LATENCY_TIME + EXECUTE_SQL_MIN_TIME, EXECUTE_SQL_RND_TIME)); - - String uniqueName = InProcessServerBuilder.generateName(); - server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start(); - channelProvider = LocalChannelProvider.create(uniqueName); + mockServer = new StandardBenchmarkMockServer(); + TransportChannelProvider channelProvider = mockServer.start(); SpannerOptions options = SpannerOptions.newBuilder() @@ -226,31 +134,18 @@ public void setup() throws Exception { @TearDown(Level.Invocation) public void teardown() throws Exception { spanner.close(); - server.shutdown(); - server.awaitTermination(); + mockServer.shutdown(); } int expectedStepsToMax() { int remainder = (maxSessions - minSessions) % incStep == 0 ? 0 : 1; return numChannels + ((maxSessions - minSessions) / incStep) + remainder; } - - int countRequests(final Class type) { - return Collections2.filter( - mockSpanner.getRequests(), - new Predicate() { - @Override - public boolean apply(AbstractMessage input) { - return input.getClass().equals(type); - } - }) - .size(); - } } /** Measures the time needed to execute a burst of read requests. */ @Benchmark - public void burstRead(final MockServer server) throws Exception { + public void burstRead(final BenchmarkState server) throws Exception { int totalQueries = server.maxSessions * 8; int parallelThreads = server.maxSessions * 2; final DatabaseClient client = @@ -268,7 +163,8 @@ public void burstRead(final MockServer server) throws Exception { @Override public Void call() throws Exception { Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); - try (ResultSet rs = client.singleUse().executeQuery(MockServer.SELECT1)) { + try (ResultSet rs = + client.singleUse().executeQuery(StandardBenchmarkMockServer.SELECT1)) { while (rs.next()) { Thread.sleep(RND.nextInt(HOLD_SESSION_TIME)); } @@ -283,7 +179,7 @@ public Void call() throws Exception { /** Measures the time needed to execute a burst of write requests. */ @Benchmark - public void burstWrite(final MockServer server) throws Exception { + public void burstWrite(final BenchmarkState server) throws Exception { int totalWrites = server.maxSessions * 8; int parallelThreads = server.maxSessions * 2; final DatabaseClient client = @@ -306,7 +202,8 @@ public Long call() throws Exception { new TransactionCallable() { @Override public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(MockServer.UPDATE_STATEMENT); + return transaction.executeUpdate( + StandardBenchmarkMockServer.UPDATE_STATEMENT); } }); } @@ -318,7 +215,7 @@ public Long run(TransactionContext transaction) throws Exception { /** Measures the time needed to execute a burst of read and write requests. */ @Benchmark - public void burstReadAndWrite(final MockServer server) throws Exception { + public void burstReadAndWrite(final BenchmarkState server) throws Exception { int totalWrites = server.maxSessions * 4; int totalReads = server.maxSessions * 4; int parallelThreads = server.maxSessions * 2; @@ -342,7 +239,8 @@ public Long call() throws Exception { new TransactionCallable() { @Override public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(MockServer.UPDATE_STATEMENT); + return transaction.executeUpdate( + StandardBenchmarkMockServer.UPDATE_STATEMENT); } }); } @@ -355,7 +253,8 @@ public Long run(TransactionContext transaction) throws Exception { @Override public Void call() throws Exception { Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); - try (ResultSet rs = client.singleUse().executeQuery(MockServer.SELECT1)) { + try (ResultSet rs = + client.singleUse().executeQuery(StandardBenchmarkMockServer.SELECT1)) { while (rs.next()) { Thread.sleep(RND.nextInt(HOLD_SESSION_TIME)); } @@ -370,7 +269,7 @@ public Void call() throws Exception { /** Measures the time needed to acquire MaxSessions session sequentially. */ @Benchmark - public void steadyIncrease(MockServer server) throws Exception { + public void steadyIncrease(BenchmarkState server) throws Exception { final DatabaseClient client = server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); SessionPool pool = ((DatabaseClientImpl) client).pool; @@ -380,7 +279,7 @@ public void steadyIncrease(MockServer server) throws Exception { List transactions = new ArrayList<>(server.maxSessions); for (int i = 0; i < server.maxSessions; i++) { ReadOnlyTransaction tx = client.readOnlyTransaction(); - tx.executeQuery(MockServer.SELECT1); + tx.executeQuery(StandardBenchmarkMockServer.SELECT1); transactions.add(tx); } for (ReadOnlyTransaction tx : transactions) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerBenchmark.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerBenchmark.java new file mode 100644 index 00000000000..baba73f9e97 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerBenchmark.java @@ -0,0 +1,268 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.TransactionRunner.TransactionCallable; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.spanner.v1.BatchCreateSessionsRequest; +import com.google.spanner.v1.BeginTransactionRequest; +import com.google.spanner.v1.DeleteSessionRequest; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.threeten.bp.Duration; + +/** + * Benchmarks for the SessionPoolMaintainer. Run these benchmarks from the command line like this: + * + * mvn clean test -DskipTests -Pbenchmark -Dbenchmark.name=SessionPoolMaintainerBenchmark + * + */ +@BenchmarkMode(Mode.AverageTime) +@Fork(value = 1, warmups = 0) +@Measurement(batchSize = 1, iterations = 1, timeUnit = TimeUnit.MILLISECONDS) +@Warmup(batchSize = 0, iterations = 0) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class SessionPoolMaintainerBenchmark { + private static final String TEST_PROJECT = "my-project"; + private static final String TEST_INSTANCE = "my-instance"; + private static final String TEST_DATABASE = "my-database"; + private static final int HOLD_SESSION_TIME = 10; + private static final int RND_WAIT_TIME_BETWEEN_REQUESTS = 100; + private static final Random RND = new Random(); + + @State(Scope.Thread) + @AuxCounters(org.openjdk.jmh.annotations.AuxCounters.Type.EVENTS) + public static class MockServer { + private StandardBenchmarkMockServer mockServer; + private Spanner spanner; + private DatabaseClientImpl client; + + /** + * The tests set the session idle timeout to an extremely low value to force timeouts and + * sessions to be evicted from the pool. This is not intended to replicate a realistic scenario, + * only to detect whether certain changes to the client library might cause the number of RPCs + * or the execution time to change drastically. + */ + @Param({"100"}) + long idleTimeout; + + /** AuxCounter for number of create RPCs. */ + public int numBatchCreateSessionsRpcs() { + return mockServer.countRequests(BatchCreateSessionsRequest.class); + } + + /** AuxCounter for number of delete RPCs. */ + public int numDeleteSessionRpcs() { + return mockServer.countRequests(DeleteSessionRequest.class); + } + + /** AuxCounter for number of begin tx RPCs. */ + public int numBeginTransactionRpcs() { + return mockServer.countRequests(BeginTransactionRequest.class); + } + + @Setup(Level.Invocation) + public void setup() throws Exception { + mockServer = new StandardBenchmarkMockServer(); + TransportChannelProvider channelProvider = mockServer.start(); + + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + // Set idle timeout and loop frequency to very low values. + .setRemoveInactiveSessionAfter(Duration.ofMillis(idleTimeout)) + .setLoopFrequency(idleTimeout / 10) + .build()) + .build(); + + spanner = options.getService(); + client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + // Wait until the session pool has initialized. + while (client.pool.getNumberOfSessionsInPool() + < spanner.getOptions().getSessionPoolOptions().getMinSessions()) { + Thread.sleep(1L); + } + } + + @TearDown(Level.Invocation) + public void teardown() throws Exception { + spanner.close(); + mockServer.shutdown(); + } + } + + /** Measures the time and RPCs needed to execute read requests. */ + @Benchmark + public void read(final MockServer server) throws Exception { + int min = server.spanner.getOptions().getSessionPoolOptions().getMinSessions(); + int max = server.spanner.getOptions().getSessionPoolOptions().getMaxSessions(); + int totalQueries = max * 4; + int parallelThreads = min; + final DatabaseClient client = + server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + SessionPool pool = ((DatabaseClientImpl) client).pool; + assertThat(pool.totalSessions()).isEqualTo(min); + + ListeningScheduledExecutorService service = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(parallelThreads)); + List> futures = new ArrayList<>(totalQueries); + for (int i = 0; i < totalQueries; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Void call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + try (ResultSet rs = + client.singleUse().executeQuery(StandardBenchmarkMockServer.SELECT1)) { + while (rs.next()) { + Thread.sleep(RND.nextInt(HOLD_SESSION_TIME)); + } + return null; + } + } + })); + } + Futures.allAsList(futures).get(); + service.shutdown(); + } + + /** Measures the time and RPCs needed to execute write requests. */ + @Benchmark + public void write(final MockServer server) throws Exception { + int min = server.spanner.getOptions().getSessionPoolOptions().getMinSessions(); + int max = server.spanner.getOptions().getSessionPoolOptions().getMaxSessions(); + int totalWrites = max * 4; + int parallelThreads = max; + final DatabaseClient client = + server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + SessionPool pool = ((DatabaseClientImpl) client).pool; + assertThat(pool.totalSessions()).isEqualTo(min); + + ListeningScheduledExecutorService service = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(parallelThreads)); + List> futures = new ArrayList<>(totalWrites); + for (int i = 0; i < totalWrites; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Long call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + TransactionRunner runner = client.readWriteTransaction(); + return runner.run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + return transaction.executeUpdate( + StandardBenchmarkMockServer.UPDATE_STATEMENT); + } + }); + } + })); + } + Futures.allAsList(futures).get(); + service.shutdown(); + } + + /** Measures the time and RPCs needed to execute read and write requests. */ + @Benchmark + public void readAndWrite(final MockServer server) throws Exception { + int min = server.spanner.getOptions().getSessionPoolOptions().getMinSessions(); + int max = server.spanner.getOptions().getSessionPoolOptions().getMaxSessions(); + int totalWrites = max * 2; + int totalReads = max * 2; + int parallelThreads = max; + final DatabaseClient client = + server.spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + SessionPool pool = ((DatabaseClientImpl) client).pool; + assertThat(pool.totalSessions()).isEqualTo(min); + + ListeningScheduledExecutorService service = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(parallelThreads)); + List> futures = new ArrayList<>(totalReads + totalWrites); + for (int i = 0; i < totalWrites; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Long call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + TransactionRunner runner = client.readWriteTransaction(); + return runner.run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + return transaction.executeUpdate( + StandardBenchmarkMockServer.UPDATE_STATEMENT); + } + }); + } + })); + } + for (int i = 0; i < totalReads; i++) { + futures.add( + service.submit( + new Callable() { + @Override + public Void call() throws Exception { + Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); + try (ResultSet rs = + client.singleUse().executeQuery(StandardBenchmarkMockServer.SELECT1)) { + while (rs.next()) { + Thread.sleep(RND.nextInt(HOLD_SESSION_TIME)); + } + return null; + } + } + })); + } + Futures.allAsList(futures).get(); + service.shutdown(); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java new file mode 100644 index 00000000000..8d1b7804327 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java @@ -0,0 +1,315 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.google.cloud.spanner.SessionClient.SessionConsumer; +import com.google.cloud.spanner.SessionPool.PooledSession; +import com.google.cloud.spanner.SessionPool.SessionConsumerImpl; +import com.google.common.base.Function; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +@RunWith(JUnit4.class) +public class SessionPoolMaintainerTest extends BaseSessionPoolTest { + private ExecutorService executor = Executors.newSingleThreadExecutor(); + private @Mock SpannerImpl client; + private @Mock SessionClient sessionClient; + private @Mock SpannerOptions spannerOptions; + private DatabaseId db = DatabaseId.of("projects/p/instances/i/databases/unused"); + private SessionPoolOptions options; + private FakeClock clock = new FakeClock(); + private List idledSessions = new ArrayList<>(); + private Map pingedSessions = new HashMap<>(); + + @Before + public void setUp() throws Exception { + initMocks(this); + when(client.getOptions()).thenReturn(spannerOptions); + when(client.getSessionClient(db)).thenReturn(sessionClient); + when(spannerOptions.getNumChannels()).thenReturn(4); + setupMockSessionCreation(); + options = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxIdleSessions(1) + .setMaxSessions(5) + .setIncStep(1) + .setKeepAliveIntervalMinutes(2) + .build(); + idledSessions.clear(); + pingedSessions.clear(); + } + + private void setupMockSessionCreation() { + doAnswer( + new Answer() { + @Override + public Void answer(final InvocationOnMock invocation) throws Throwable { + executor.submit( + new Runnable() { + @Override + public void run() { + int sessionCount = invocation.getArgumentAt(0, Integer.class); + SessionConsumerImpl consumer = + invocation.getArgumentAt(2, SessionConsumerImpl.class); + for (int i = 0; i < sessionCount; i++) { + consumer.onSessionReady(setupMockSession(mockSession())); + } + } + }); + return null; + } + }) + .when(sessionClient) + .asyncBatchCreateSessions( + Mockito.anyInt(), Mockito.anyBoolean(), any(SessionConsumer.class)); + } + + private SessionImpl setupMockSession(final SessionImpl session) { + ReadContext mockContext = mock(ReadContext.class); + final ResultSet mockResult = mock(ResultSet.class); + when(session.singleUse(any(TimestampBound.class))).thenReturn(mockContext); + when(mockContext.executeQuery(any(Statement.class))) + .thenAnswer( + new Answer() { + @Override + public ResultSet answer(InvocationOnMock invocation) throws Throwable { + Integer currentValue = pingedSessions.get(session.getName()); + if (currentValue == null) { + currentValue = 0; + } + pingedSessions.put(session.getName(), ++currentValue); + return mockResult; + } + }); + when(mockResult.next()).thenReturn(true); + return session; + } + + private SessionPool createPool() throws Exception { + SessionPool pool = + SessionPool.createPool( + options, new TestExecutorFactory(), client.getSessionClient(db), clock); + pool.idleSessionRemovedListener = + new Function() { + @Override + public Void apply(PooledSession input) { + idledSessions.add(input); + return null; + } + }; + // Wait until pool has initialized. + while (pool.totalSessions() < options.getMinSessions()) { + Thread.sleep(1L); + } + return pool; + } + + @Test + public void testKeepAlive() throws Exception { + SessionPool pool = createPool(); + assertThat(pingedSessions).isEmpty(); + // Run one maintenance loop. No sessions should get a keep-alive ping. + runMaintainanceLoop(clock, pool, 1); + assertThat(pingedSessions).isEmpty(); + + // Checkout two sessions and do a maintenance loop. Still no sessions should be getting any + // pings. + Session session1 = pool.getReadSession(); + Session session2 = pool.getReadSession(); + runMaintainanceLoop(clock, pool, 1); + assertThat(pingedSessions).isEmpty(); + + // Check the sessions back into the pool and do a maintenance loop. + session2.close(); + session1.close(); + runMaintainanceLoop(clock, pool, 1); + assertThat(pingedSessions).isEmpty(); + + // Now advance the time enough for both sessions in the pool to be idled. Then do one + // maintenance loop. This should cause the last session to have been checked back into the pool + // to get a ping, but not the second session. + clock.currentTimeMillis += TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes()) + 1; + runMaintainanceLoop(clock, pool, 1); + assertThat(pingedSessions).containsExactly(session1.getName(), 1); + // Do another maintenance loop. This should cause the other session to also get a ping. + runMaintainanceLoop(clock, pool, 1); + assertThat(pingedSessions).containsExactly(session1.getName(), 1, session2.getName(), 1); + + // Now check out three sessions so the pool will create an additional session. The pool will + // only keep 2 sessions alive, as that is the setting for MinSessions. + Session session3 = pool.getReadSession(); + Session session4 = pool.getReadSession(); + Session session5 = pool.getReadSession(); + // Note that session2 was now the first session in the pool as it was the last to receive a + // ping. + assertThat(session3.getName()).isEqualTo(session2.getName()); + assertThat(session4.getName()).isEqualTo(session1.getName()); + session5.close(); + session4.close(); + session3.close(); + // Advance the clock to force pings for the sessions in the pool and do three maintenance loops. + clock.currentTimeMillis += TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes()) + 1; + runMaintainanceLoop(clock, pool, 3); + assertThat(pingedSessions).containsExactly(session1.getName(), 2, session2.getName(), 2); + + // Advance the clock to idle all sessions in the pool again and then check out one session. This + // should cause only one session to get a ping. + clock.currentTimeMillis += TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes()) + 1; + // We are now checking out session2 because + Session session6 = pool.getReadSession(); + // The session that was first in the pool now is equal to the initial first session as each full + // round of pings will swap the order of the first MinSessions sessions in the pool. + assertThat(session6.getName()).isEqualTo(session1.getName()); + runMaintainanceLoop(clock, pool, 3); + assertThat(pool.totalSessions()).isEqualTo(3); + assertThat(pingedSessions).containsExactly(session1.getName(), 2, session2.getName(), 3); + // Update the last use date and release the session to the pool and do another maintenance + // cycle. + ((PooledSession) session6).markUsed(); + session6.close(); + runMaintainanceLoop(clock, pool, 3); + assertThat(pingedSessions).containsExactly(session1.getName(), 2, session2.getName(), 3); + + // Now check out 3 sessions again and make sure the 'extra' session is checked in last. That + // will make it eligible for pings. + Session session7 = pool.getReadSession(); + Session session8 = pool.getReadSession(); + Session session9 = pool.getReadSession(); + + assertThat(session7.getName()).isEqualTo(session1.getName()); + assertThat(session8.getName()).isEqualTo(session2.getName()); + assertThat(session9.getName()).isEqualTo(session5.getName()); + + session7.close(); + session8.close(); + session9.close(); + + clock.currentTimeMillis += TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes()) + 1; + runMaintainanceLoop(clock, pool, 3); + // session1 will not get a ping this time, as it was checked in first and is now the last + // session in the pool. + assertThat(pingedSessions) + .containsExactly(session1.getName(), 2, session2.getName(), 4, session5.getName(), 1); + } + + @Test + public void testIdleSessions() throws Exception { + SessionPool pool = createPool(); + long loopsToIdleSessions = + Double.valueOf( + Math.ceil( + (double) options.getRemoveInactiveSessionAfter().toMillis() + / pool.poolMaintainer.loopFrequency)) + .longValue() + + 2L; + assertThat(idledSessions).isEmpty(); + // Run one maintenance loop. No sessions should be removed from the pool. + runMaintainanceLoop(clock, pool, 1); + assertThat(idledSessions).isEmpty(); + + // Checkout two sessions and do a maintenance loop. Still no sessions should be removed. + Session session1 = pool.getReadSession(); + Session session2 = pool.getReadSession(); + runMaintainanceLoop(clock, pool, 1); + assertThat(idledSessions).isEmpty(); + + // Check the sessions back into the pool and do a maintenance loop. + session2.close(); + session1.close(); + runMaintainanceLoop(clock, pool, 1); + assertThat(idledSessions).isEmpty(); + + // Now advance the time enough for both sessions in the pool to be idled. Both sessions should + // be kept alive by the maintainer and remain in the pool. + runMaintainanceLoop(clock, pool, loopsToIdleSessions); + assertThat(idledSessions).isEmpty(); + + // Now check out three sessions so the pool will create an additional session. The pool will + // only keep 2 sessions alive, as that is the setting for MinSessions. + Session session3 = pool.getReadSession(); + Session session4 = pool.getReadSession(); + Session session5 = pool.getReadSession(); + // Note that session2 was now the first session in the pool as it was the last to receive a + // ping. + assertThat(session3.getName()).isEqualTo(session2.getName()); + assertThat(session4.getName()).isEqualTo(session1.getName()); + session5.close(); + session4.close(); + session3.close(); + // Advance the clock to idle sessions. The pool will keep session4 and session3 alive, session5 + // will be idled and removed. + runMaintainanceLoop(clock, pool, loopsToIdleSessions); + assertThat(idledSessions).containsExactly(session5); + assertThat(pool.totalSessions()).isEqualTo(2); + + // Check out three sessions again and keep one session checked out. + Session session6 = pool.getReadSession(); + Session session7 = pool.getReadSession(); + Session session8 = pool.getReadSession(); + session8.close(); + session7.close(); + // Now advance the clock to idle sessions. This should remove session8 from the pool. + runMaintainanceLoop(clock, pool, loopsToIdleSessions); + assertThat(idledSessions).containsExactly(session5, session8); + assertThat(pool.totalSessions()).isEqualTo(2); + ((PooledSession) session6).markUsed(); + session6.close(); + + // Check out three sessions and keep them all checked out. No sessions should be removed from + // the pool. + Session session9 = pool.getReadSession(); + Session session10 = pool.getReadSession(); + Session session11 = pool.getReadSession(); + runMaintainanceLoop(clock, pool, loopsToIdleSessions); + assertThat(idledSessions).containsExactly(session5, session8); + assertThat(pool.totalSessions()).isEqualTo(3); + // Return the sessions to the pool. As they have not been used, they are all into idle time. + // Running the maintainer will now remove all the sessions from the pool and then start the + // replenish method. + session9.close(); + session10.close(); + session11.close(); + runMaintainanceLoop(clock, pool, 1); + assertThat(idledSessions).containsExactly(session5, session8, session9, session10, session11); + // Check that the pool is replenished. + while (pool.totalSessions() < options.getMinSessions()) { + Thread.sleep(1L); + } + assertThat(pool.totalSessions()).isEqualTo(options.getMinSessions()); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java index f7c2f02a599..bc0460c0301 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java @@ -17,7 +17,7 @@ package com.google.cloud.spanner; import static com.google.common.truth.Truth.assertThat; -import static org.mockito.Matchers.any; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -25,7 +25,9 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.cloud.spanner.SessionClient.SessionConsumer; +import com.google.cloud.spanner.SessionPool.PooledSession; import com.google.cloud.spanner.SessionPool.SessionConsumerImpl; +import com.google.common.base.Function; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Empty; import java.util.ArrayList; @@ -102,7 +104,7 @@ private void setupSpanner(DatabaseId db) { @Override public Session answer(InvocationOnMock invocation) throws Throwable { synchronized (lock) { - Session session = mockSession(); + SessionImpl session = mockSession(); setupSession(session); sessions.put(session.getName(), false); @@ -140,7 +142,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { Mockito.anyInt(), Mockito.anyBoolean(), Mockito.any(SessionConsumer.class)); } - private void setupSession(final Session session) { + private void setupSession(final SessionImpl session) { ReadContext mockContext = mock(ReadContext.class); final ResultSet mockResult = mock(ResultSet.class); when(session.singleUse(any(TimestampBound.class))).thenReturn(mockContext); @@ -258,6 +260,16 @@ public void stressTest() throws Exception { pool = SessionPool.createPool( builder.build(), new TestExecutorFactory(), mockSpanner.getSessionClient(db), clock); + pool.idleSessionRemovedListener = + new Function() { + @Override + public Void apply(PooledSession pooled) { + synchronized (lock) { + sessions.remove(pooled.getName()); + return null; + } + } + }; for (int i = 0; i < concurrentThreads; i++) { new Thread( new Runnable() { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index a9636dea1bd..ab02cb9b4c2 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -33,7 +33,6 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.cloud.Timestamp; import com.google.cloud.spanner.MetricRegistryTestUtils.FakeMetricRegistry; @@ -47,6 +46,7 @@ import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.cloud.spanner.spi.v1.SpannerRpc.ResultStreamConsumer; +import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; @@ -71,7 +71,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -874,7 +873,6 @@ public void idleSessionCleanup() throws Exception { SessionImpl session1 = mockSession(); SessionImpl session2 = mockSession(); SessionImpl session3 = mockSession(); - final AtomicInteger numSessionClosed = new AtomicInteger(); final LinkedList sessions = new LinkedList<>(Arrays.asList(session1, session2, session3)); doAnswer( @@ -895,18 +893,8 @@ public void run() { }) .when(sessionClient) .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); - for (Session session : new Session[] {session1, session2, session3}) { - doAnswer( - new Answer>() { - - @Override - public ApiFuture answer(InvocationOnMock invocation) throws Throwable { - numSessionClosed.incrementAndGet(); - return ApiFutures.immediateFuture(Empty.getDefaultInstance()); - } - }) - .when(session) - .asyncClose(); + for (SessionImpl session : sessions) { + mockKeepAlive(session); } FakeClock clock = new FakeClock(); clock.currentTimeMillis = System.currentTimeMillis(); @@ -914,25 +902,29 @@ public ApiFuture answer(InvocationOnMock invocation) throws Throwable { // Make sure pool has been initialized pool.getReadSession().close(); runMaintainanceLoop(clock, pool, pool.poolMaintainer.numClosureCycles); - assertThat(numSessionClosed.get()).isEqualTo(0); + assertThat(pool.numIdleSessionsRemoved()).isEqualTo(0L); Session readSession1 = pool.getReadSession(); Session readSession2 = pool.getReadSession(); Session readSession3 = pool.getReadSession(); readSession1.close(); readSession2.close(); readSession3.close(); - // Now there are 3 sessions in the pool but since all were used in parallel, we will not close - // any. + // Now there are 3 sessions in the pool but since none of them has timed out, they will all be + // kept in the pool. runMaintainanceLoop(clock, pool, pool.poolMaintainer.numClosureCycles); - assertThat(numSessionClosed.get()).isEqualTo(0); + assertThat(pool.numIdleSessionsRemoved()).isEqualTo(0L); // Counters have now been reset // Use all 3 sessions sequentially pool.getReadSession().close(); pool.getReadSession().close(); pool.getReadSession().close(); - runMaintainanceLoop(clock, pool, pool.poolMaintainer.numClosureCycles); + // Advance the time by running the maintainer. This should cause + // one session to be kept alive and two sessions to be removed. + long cycles = + options.getRemoveInactiveSessionAfter().toMillis() / pool.poolMaintainer.loopFrequency; + runMaintainanceLoop(clock, pool, cycles); // We will still close 2 sessions since at any point in time only 1 session was in use. - assertThat(numSessionClosed.get()).isEqualTo(2); + assertThat(pool.numIdleSessionsRemoved()).isEqualTo(2L); pool.closeAsync().get(5L, TimeUnit.SECONDS); } @@ -975,15 +967,114 @@ public void run() { verify(session, never()).singleUse(any(TimestampBound.class)); runMaintainanceLoop(clock, pool, pool.poolMaintainer.numKeepAliveCycles); verify(session, times(2)).singleUse(any(TimestampBound.class)); - clock.currentTimeMillis += clock.currentTimeMillis + 35 * 60 * 1000; + clock.currentTimeMillis += + clock.currentTimeMillis + (options.getKeepAliveIntervalMinutes() + 5) * 60 * 1000; session1 = pool.getReadSession(); session1.writeAtLeastOnce(new ArrayList()); session1.close(); runMaintainanceLoop(clock, pool, pool.poolMaintainer.numKeepAliveCycles); - verify(session, times(3)).singleUse(any(TimestampBound.class)); + // The session pool only keeps MinSessions + MaxIdleSessions alive. + verify(session, times(options.getMinSessions() + options.getMaxIdleSessions())) + .singleUse(any(TimestampBound.class)); pool.closeAsync().get(5L, TimeUnit.SECONDS); } + @Test + public void testMaintainerKeepsWriteProportion() throws Exception { + options = + SessionPoolOptions.newBuilder() + .setMinSessions(10) + .setMaxSessions(20) + .setWriteSessionsFraction(0.5f) + .build(); + final SessionImpl session = mockSession(); + mockKeepAlive(session); + // This is cheating as we are returning the same session each but it makes the verification + // easier. + doAnswer( + new Answer() { + @Override + public Void answer(final InvocationOnMock invocation) throws Throwable { + executor.submit( + new Runnable() { + @Override + public void run() { + int sessionCount = invocation.getArgumentAt(0, Integer.class); + SessionConsumerImpl consumer = + invocation.getArgumentAt(2, SessionConsumerImpl.class); + for (int i = 0; i < sessionCount; i++) { + consumer.onSessionReady(session); + } + } + }); + return null; + } + }) + .when(sessionClient) + .asyncBatchCreateSessions(anyInt(), Mockito.anyBoolean(), any(SessionConsumer.class)); + FakeClock clock = new FakeClock(); + clock.currentTimeMillis = System.currentTimeMillis(); + pool = createPool(clock); + // Wait until all sessions have been created and prepared. + waitForExpectedSessionPool(options.getMinSessions(), options.getWriteSessionsFraction()); + assertThat(pool.getNumberOfSessionsInPool()).isEqualTo(options.getMinSessions()); + assertThat(pool.getNumberOfAvailableWritePreparedSessions()) + .isEqualTo((int) Math.ceil(options.getMinSessions() * options.getWriteSessionsFraction())); + + // Run maintainer numKeepAliveCycles. No pings should be executed during these. + runMaintainanceLoop(clock, pool, pool.poolMaintainer.numKeepAliveCycles); + verify(session, never()).singleUse(any(TimestampBound.class)); + // Run maintainer numKeepAliveCycles again. All sessions should now be pinged. + runMaintainanceLoop(clock, pool, pool.poolMaintainer.numKeepAliveCycles); + verify(session, times(options.getMinSessions())).singleUse(any(TimestampBound.class)); + // Verify that all sessions are still in the pool, and that the write fraction is maintained. + assertThat(pool.getNumberOfSessionsInPool()).isEqualTo(options.getMinSessions()); + assertThat( + pool.getNumberOfAvailableWritePreparedSessions() + + pool.getNumberOfSessionsBeingPrepared()) + .isEqualTo( + (int) Math.ceil(pool.getNumberOfSessionsInPool() * options.getWriteSessionsFraction())); + + // Check out MaxSessions sessions to add additional sessions to the pool. + List sessions = new ArrayList<>(options.getMaxSessions()); + for (int i = 0; i < options.getMaxSessions(); i++) { + sessions.add(pool.getReadSession()); + } + for (Session s : sessions) { + s.close(); + } + // There should be MaxSessions in the pool and the writeFraction should be respected. + waitForExpectedSessionPool(options.getMaxSessions(), options.getWriteSessionsFraction()); + assertThat(pool.getNumberOfSessionsInPool()).isEqualTo(options.getMaxSessions()); + assertThat(pool.getNumberOfAvailableWritePreparedSessions()) + .isEqualTo((int) Math.ceil(options.getMaxSessions() * options.getWriteSessionsFraction())); + + // Advance the clock to allow the sessions to time out or be kept alive. + clock.currentTimeMillis += + clock.currentTimeMillis + (options.getKeepAliveIntervalMinutes() + 5) * 60 * 1000; + runMaintainanceLoop(clock, pool, pool.poolMaintainer.numKeepAliveCycles); + // The session pool only keeps MinSessions alive. + verify(session, times(options.getMinSessions())).singleUse(any(TimestampBound.class)); + // Verify that MinSessions and WriteFraction are respected. + waitForExpectedSessionPool(options.getMinSessions(), options.getWriteSessionsFraction()); + assertThat(pool.getNumberOfSessionsInPool()).isEqualTo(options.getMinSessions()); + assertThat(pool.getNumberOfAvailableWritePreparedSessions()) + .isEqualTo((int) Math.ceil(options.getMinSessions() * options.getWriteSessionsFraction())); + + pool.closeAsync().get(5L, TimeUnit.SECONDS); + } + + private void waitForExpectedSessionPool(int expectedSessions, float writeFraction) + throws InterruptedException { + Stopwatch watch = Stopwatch.createStarted(); + while ((pool.getNumberOfSessionsInPool() < expectedSessions + || pool.getNumberOfAvailableWritePreparedSessions() + < Math.ceil(expectedSessions * writeFraction)) + && watch.elapsed(TimeUnit.SECONDS) < 5) { + Thread.sleep(1L); + } + } + @Test public void blockAndTimeoutOnPoolExhaustion() throws Exception { // Try to take a read or a read/write session. These requests should block. @@ -1650,6 +1741,7 @@ public Void call() { private void mockKeepAlive(Session session) { ReadContext context = mock(ReadContext.class); ResultSet resultSet = mock(ResultSet.class); + when(resultSet.next()).thenReturn(true, false); when(session.singleUse(any(TimestampBound.class))).thenReturn(context); when(context.executeQuery(any(Statement.class))).thenReturn(resultSet); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/StandardBenchmarkMockServer.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/StandardBenchmarkMockServer.java new file mode 100644 index 00000000000..7262fa163b6 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/StandardBenchmarkMockServer.java @@ -0,0 +1,139 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; +import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import com.google.protobuf.AbstractMessage; +import com.google.protobuf.ListValue; +import com.google.spanner.v1.ResultSetMetadata; +import com.google.spanner.v1.StructType; +import com.google.spanner.v1.StructType.Field; +import com.google.spanner.v1.TypeCode; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.inprocess.InProcessServerBuilder; +import java.io.IOException; + +/** Standard mock server used for benchmarking. */ +class StandardBenchmarkMockServer { + private static final int NETWORK_LATENCY_TIME = 10; + private static final int BATCH_CREATE_SESSIONS_MIN_TIME = 10; + private static final int BATCH_CREATE_SESSIONS_RND_TIME = 10; + private static final int BEGIN_TRANSACTION_MIN_TIME = 1; + private static final int BEGIN_TRANSACTION_RND_TIME = 1; + private static final int COMMIT_TRANSACTION_MIN_TIME = 5; + private static final int COMMIT_TRANSACTION_RND_TIME = 5; + private static final int ROLLBACK_TRANSACTION_MIN_TIME = 1; + private static final int ROLLBACK_TRANSACTION_RND_TIME = 1; + private static final int EXECUTE_STREAMING_SQL_MIN_TIME = 10; + private static final int EXECUTE_STREAMING_SQL_RND_TIME = 10; + private static final int EXECUTE_SQL_MIN_TIME = 10; + private static final int EXECUTE_SQL_RND_TIME = 10; + + static final Statement UPDATE_STATEMENT = Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2"); + static final Statement INVALID_UPDATE_STATEMENT = + Statement.of("UPDATE NON_EXISTENT_TABLE SET BAR=1 WHERE BAZ=2"); + static final long UPDATE_COUNT = 1L; + static final Statement SELECT1 = Statement.of("SELECT 1 AS COL1"); + private static final ResultSetMetadata SELECT1_METADATA = + ResultSetMetadata.newBuilder() + .setRowType( + StructType.newBuilder() + .addFields( + Field.newBuilder() + .setName("COL1") + .setType( + com.google.spanner.v1.Type.newBuilder() + .setCode(TypeCode.INT64) + .build()) + .build()) + .build()) + .build(); + private static final com.google.spanner.v1.ResultSet SELECT1_RESULTSET = + com.google.spanner.v1.ResultSet.newBuilder() + .addRows( + ListValue.newBuilder() + .addValues(com.google.protobuf.Value.newBuilder().setStringValue("1").build()) + .build()) + .setMetadata(SELECT1_METADATA) + .build(); + private MockSpannerServiceImpl mockSpanner; + private Server server; + private LocalChannelProvider channelProvider; + + TransportChannelProvider start() throws IOException { + mockSpanner = new MockSpannerServiceImpl(); + mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions. + mockSpanner.putStatementResult(StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); + mockSpanner.putStatementResult(StatementResult.query(SELECT1, SELECT1_RESULTSET)); + mockSpanner.putStatementResult( + StatementResult.exception( + INVALID_UPDATE_STATEMENT, + Status.INVALID_ARGUMENT.withDescription("invalid statement").asRuntimeException())); + + mockSpanner.setBatchCreateSessionsExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + BATCH_CREATE_SESSIONS_MIN_TIME, BATCH_CREATE_SESSIONS_RND_TIME)); + mockSpanner.setBeginTransactionExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + BEGIN_TRANSACTION_MIN_TIME, BEGIN_TRANSACTION_RND_TIME)); + mockSpanner.setCommitExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + COMMIT_TRANSACTION_MIN_TIME, COMMIT_TRANSACTION_RND_TIME)); + mockSpanner.setRollbackExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + ROLLBACK_TRANSACTION_MIN_TIME, ROLLBACK_TRANSACTION_RND_TIME)); + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + EXECUTE_STREAMING_SQL_MIN_TIME, EXECUTE_STREAMING_SQL_RND_TIME)); + mockSpanner.setExecuteSqlExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime( + NETWORK_LATENCY_TIME + EXECUTE_SQL_MIN_TIME, EXECUTE_SQL_RND_TIME)); + + String uniqueName = InProcessServerBuilder.generateName(); + server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start(); + channelProvider = LocalChannelProvider.create(uniqueName); + + return channelProvider; + } + + void shutdown() throws InterruptedException { + server.shutdown(); + server.awaitTermination(); + } + + MockSpannerServiceImpl getMockSpanner() { + return mockSpanner; + } + + int countRequests(final Class type) { + return Collections2.filter( + mockSpanner.getRequests(), + new Predicate() { + @Override + public boolean apply(AbstractMessage input) { + return input.getClass().equals(type); + } + }) + .size(); + } +}