From 609dae0f1bec2b6720f017f78f808ab7b73160a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Sat, 20 Jan 2024 12:30:06 +0100 Subject: [PATCH 1/2] chore: randomize session pool order based on TPS --- .../com/google/cloud/spanner/SessionPool.java | 48 +++++++++++-- .../cloud/spanner/SessionPoolOptions.java | 27 +++++++ .../spanner/SessionPoolMaintainerTest.java | 70 +++++++++++++++++++ .../google/cloud/spanner/SessionPoolTest.java | 16 +++-- 4 files changed, 151 insertions(+), 10 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 54a0a292cd8..a22144d1004 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -1751,6 +1751,13 @@ final class PoolMaintainer { */ @VisibleForTesting Instant lastExecutionTime; + /** + * The previous numSessionsAcquired seen by the maintainer. This is used to calculate the + * transactions per second, which again is used to determine whether to randomize the order of + * the session pool. + */ + private long prevNumSessionsAcquired; + boolean closed = false; @GuardedBy("lock") @@ -1794,6 +1801,12 @@ void maintainPool() { return; } running = true; + if (loopFrequency >= 1000L) { + SessionPool.this.transactionsPerSecond = + (SessionPool.this.numSessionsAcquired - prevNumSessionsAcquired) + / (loopFrequency / 1000L); + } + this.prevNumSessionsAcquired = SessionPool.this.numSessionsAcquired; } Instant currTime = clock.instant(); removeIdleSessions(currTime); @@ -1995,6 +2008,7 @@ enum Position { private final SettableFuture dialect = SettableFuture.create(); private final String databaseRole; private final SessionClient sessionClient; + private final int numChannels; private final ScheduledExecutorService executor; private final ExecutorFactory executorFactory; @@ -2054,6 +2068,9 @@ enum Position { @GuardedBy("lock") private long numIdleSessionsRemoved = 0; + @GuardedBy("lock") + private long transactionsPerSecond = 0L; + @GuardedBy("lock") private long numLeakedSessionsRemoved = 0; @@ -2160,6 +2177,7 @@ private SessionPool( this.executorFactory = executorFactory; this.executor = executor; this.sessionClient = sessionClient; + this.numChannels = sessionClient.getSpanner().getOptions().getNumChannels(); this.clock = clock; this.initialReleasePosition = initialReleasePosition; this.poolMaintainer = new PoolMaintainer(); @@ -2458,11 +2476,13 @@ private void releaseSession(PooledSession session, boolean isNewSession) { if (closureFuture != null) { return; } - if (waiters.size() == 0) { + if (waiters.isEmpty()) { // There are no pending waiters. - // Add to a random position if the head of the session pool already contains many sessions - // with the same channel as this one. - if (session.releaseToPosition == Position.FIRST && isUnbalanced(session)) { + // Add to a random position if the transactions per second is high or the head of the + // session pool already contains many sessions with the same channel as this one. + if (session.releaseToPosition != Position.RANDOM && shouldRandomize()) { + session.releaseToPosition = Position.RANDOM; + } else if (session.releaseToPosition == Position.FIRST && isUnbalanced(session)) { session.releaseToPosition = Position.RANDOM; } else if (session.releaseToPosition == Position.RANDOM && !isNewSession @@ -2491,6 +2511,26 @@ private void releaseSession(PooledSession session, boolean isNewSession) { } } + /** + * Returns true if the position where we return the session should be random if: + * + *
    + *
  1. The current TPS is higher than the configured threshold. + *
  2. AND the number of sessions checked out is larger than the number of channels. + *
+ * + * The second check prevents the session pool from being randomized when the application is + * running many small, quick queries using a small number of parallel threads. This can cause a + * high TPS, without actually having a high degree of parallelism. + */ + @VisibleForTesting + boolean shouldRandomize() { + return this.options.getRandomizePositionTransactionsPerSecondThreshold() > 0 + && this.transactionsPerSecond + >= this.options.getRandomizePositionTransactionsPerSecondThreshold() + && this.numSessionsInUse >= this.numChannels; + } + private boolean isUnbalanced(PooledSession session) { int channel = session.getChannel(); int numChannels = sessionClient.getSpanner().getOptions().getNumChannels(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 80d53a4d71f..650ec2dd687 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -64,6 +64,7 @@ public class SessionPoolOptions { private final Duration waitForMinSessions; private final Duration acquireSessionTimeout; private final Position releaseToPosition; + private final long randomizePositionTransactionsPerSecondThreshold; /** Property for allowing mocking of session maintenance clock. */ private final Clock poolMaintainerClock; @@ -89,6 +90,8 @@ private SessionPoolOptions(Builder builder) { this.waitForMinSessions = builder.waitForMinSessions; this.acquireSessionTimeout = builder.acquireSessionTimeout; this.releaseToPosition = builder.releaseToPosition; + this.randomizePositionTransactionsPerSecondThreshold = + builder.randomizePositionTransactionsPerSecondThreshold; this.inactiveTransactionRemovalOptions = builder.inactiveTransactionRemovalOptions; this.poolMaintainerClock = builder.poolMaintainerClock; } @@ -118,6 +121,9 @@ public boolean equals(Object o) { && Objects.equals(this.waitForMinSessions, other.waitForMinSessions) && Objects.equals(this.acquireSessionTimeout, other.acquireSessionTimeout) && Objects.equals(this.releaseToPosition, other.releaseToPosition) + && Objects.equals( + this.randomizePositionTransactionsPerSecondThreshold, + other.randomizePositionTransactionsPerSecondThreshold) && Objects.equals( this.inactiveTransactionRemovalOptions, other.inactiveTransactionRemovalOptions) && Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock); @@ -143,6 +149,7 @@ public int hashCode() { this.waitForMinSessions, this.acquireSessionTimeout, this.releaseToPosition, + this.randomizePositionTransactionsPerSecondThreshold, this.inactiveTransactionRemovalOptions, this.poolMaintainerClock); } @@ -263,6 +270,10 @@ Position getReleaseToPosition() { return releaseToPosition; } + long getRandomizePositionTransactionsPerSecondThreshold() { + return randomizePositionTransactionsPerSecondThreshold; + } + public static Builder newBuilder() { return new Builder(); } @@ -451,6 +462,13 @@ public static class Builder { private Duration waitForMinSessions = Duration.ZERO; private Duration acquireSessionTimeout = Duration.ofSeconds(60); private Position releaseToPosition = getReleaseToPositionFromSystemProperty(); + /** + * The session pool will randomize the position of a session that is being returned when this + * threshold is exceeded. That is: If the transactions per second exceeds this threshold, then + * the session pool will use a random order for the sessions instead of LIFO. The default is 0, + * which means that the option is disabled. + */ + private long randomizePositionTransactionsPerSecondThreshold = 0L; private Clock poolMaintainerClock; @@ -487,6 +505,8 @@ private Builder(SessionPoolOptions options) { this.autoDetectDialect = options.autoDetectDialect; this.waitForMinSessions = options.waitForMinSessions; this.acquireSessionTimeout = options.acquireSessionTimeout; + this.randomizePositionTransactionsPerSecondThreshold = + options.randomizePositionTransactionsPerSecondThreshold; this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions; this.poolMaintainerClock = options.poolMaintainerClock; } @@ -764,6 +784,13 @@ Builder setReleaseToPosition(Position releaseToPosition) { return this; } + Builder setRandomizePositionTransactionsPerSecondThreshold( + long randomizePositionTransactionsPerSecondThreshold) { + this.randomizePositionTransactionsPerSecondThreshold = + randomizePositionTransactionsPerSecondThreshold; + return this; + } + /** Build a SessionPoolOption object */ public SessionPoolOptions build() { validate(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java index 217e214c832..f58f6c06907 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java @@ -17,6 +17,8 @@ package com.google.cloud.spanner; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -28,6 +30,7 @@ import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SessionPool.Position; import com.google.cloud.spanner.SessionPool.SessionConsumerImpl; +import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -113,6 +116,10 @@ private SessionImpl setupMockSession(final SessionImpl session, final ReadContex } private SessionPool createPool() throws Exception { + return createPool(this.options); + } + + private SessionPool createPool(SessionPoolOptions options) throws Exception { // Allow sessions to be added to the head of the pool in all cases in this test, as it is // otherwise impossible to know which session exactly is getting pinged at what point in time. SessionPool pool = @@ -303,4 +310,67 @@ public void testIdleSessions() throws Exception { } assertThat(pool.totalSessions()).isEqualTo(options.getMinSessions()); } + + @Test + public void testRandomizeThreshold() throws Exception { + SessionPool pool = + createPool( + this.options + .toBuilder() + .setMaxSessions(400) + .setLoopFrequency(1000L) + .setRandomizePositionTransactionsPerSecondThreshold(4) + .build()); + List sessions; + + // Run a maintenance loop. No sessions have been checked out so far, so the TPS should be 0. + runMaintenanceLoop(clock, pool, 1); + assertFalse(pool.shouldRandomize()); + + // Get and return one session. This means TPS == 1. + returnSessions(1, useSessions(1, pool)); + runMaintenanceLoop(clock, pool, 1); + assertFalse(pool.shouldRandomize()); + + // Get and return four sessions. This means TPS == 4, and that no sessions are checked out. + returnSessions(4, useSessions(4, pool)); + runMaintenanceLoop(clock, pool, 1); + assertFalse(pool.shouldRandomize()); + + // Get four sessions without returning them. + // This means TPS == 4 and that they are all still checked out. + sessions = useSessions(4, pool); + runMaintenanceLoop(clock, pool, 1); + assertTrue(pool.shouldRandomize()); + // Returning one of the sessions reduces the number of checked out sessions enough to stop the + // randomizing. + returnSessions(1, sessions); + runMaintenanceLoop(clock, pool, 1); + assertFalse(pool.shouldRandomize()); + + // Get three more session and run the maintenance loop. + // The TPS is then 3, as we've only gotten 3 sessions since the last maintenance run. + // That means that we should not randomize. + sessions.addAll(useSessions(3, pool)); + runMaintenanceLoop(clock, pool, 1); + assertFalse(pool.shouldRandomize()); + + returnSessions(sessions.size(), sessions); + } + + private List useSessions(int numSessions, SessionPool pool) { + List sessions = new ArrayList<>(numSessions); + for (int i = 0; i < numSessions; i++) { + sessions.add(pool.getSession()); + sessions.get(sessions.size() - 1).singleUse().executeQuery(Statement.of("SELECT 1")).next(); + } + return sessions; + } + + private void returnSessions(int numSessions, List sessions) { + Preconditions.checkArgument(numSessions <= sessions.size()); + for (int i = 0; i < numSessions; i++) { + sessions.remove(0).close(); + } + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 65c7c7c03c1..7c58a0526bb 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -117,8 +117,8 @@ public class SessionPoolTest extends BaseSessionPoolTest { DatabaseId db = DatabaseId.of("projects/p/instances/i/databases/unused"); SessionPool pool; SessionPoolOptions options; - private String sessionName = String.format("%s/sessions/s", db.getName()); - private String TEST_DATABASE_ROLE = "my-role"; + private final String sessionName = String.format("%s/sessions/s", db.getName()); + private static final String TEST_DATABASE_ROLE = "my-role"; @Parameters(name = "min sessions = {0}") public static Collection data() { @@ -126,6 +126,10 @@ public static Collection data() { } private SessionPool createPool() { + return createPool(this.options); + } + + private SessionPool createPool(SessionPoolOptions options) { return SessionPool.createPool(options, new TestExecutorFactory(), client.getSessionClient(db)); } @@ -286,7 +290,7 @@ public void poolFifo() throws Exception { public void poolAllPositions() throws Exception { int maxAttempts = 100; setupMockSessionCreation(); - for (Position position : Position.values()) { + for (Position position : new Position[] {Position.LAST}) { runWithSystemProperty( "com.google.cloud.spanner.session_pool_release_to_position", position.name(), @@ -294,14 +298,14 @@ public void poolAllPositions() throws Exception { int attempt = 0; while (attempt < maxAttempts) { int numSessions = 5; - options = - options + SessionPoolOptions options = + this.options .toBuilder() .setMinSessions(numSessions) .setMaxSessions(numSessions) .setWaitForMinSessions(Duration.ofSeconds(10L)) .build(); - pool = createPool(); + pool = createPool(options); pool.maybeWaitOnMinSessions(); // First check out and release the sessions twice to the pool, so we know that we have // finalized the position of them. From fd44afd84a8d5910a1eaf75bb6f922fddf4e66ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Sat, 20 Jan 2024 12:34:34 +0100 Subject: [PATCH 2/2] chore: remove unnecessary changes --- .../google/cloud/spanner/SessionPoolTest.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 7c58a0526bb..65c7c7c03c1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -117,8 +117,8 @@ public class SessionPoolTest extends BaseSessionPoolTest { DatabaseId db = DatabaseId.of("projects/p/instances/i/databases/unused"); SessionPool pool; SessionPoolOptions options; - private final String sessionName = String.format("%s/sessions/s", db.getName()); - private static final String TEST_DATABASE_ROLE = "my-role"; + private String sessionName = String.format("%s/sessions/s", db.getName()); + private String TEST_DATABASE_ROLE = "my-role"; @Parameters(name = "min sessions = {0}") public static Collection data() { @@ -126,10 +126,6 @@ public static Collection data() { } private SessionPool createPool() { - return createPool(this.options); - } - - private SessionPool createPool(SessionPoolOptions options) { return SessionPool.createPool(options, new TestExecutorFactory(), client.getSessionClient(db)); } @@ -290,7 +286,7 @@ public void poolFifo() throws Exception { public void poolAllPositions() throws Exception { int maxAttempts = 100; setupMockSessionCreation(); - for (Position position : new Position[] {Position.LAST}) { + for (Position position : Position.values()) { runWithSystemProperty( "com.google.cloud.spanner.session_pool_release_to_position", position.name(), @@ -298,14 +294,14 @@ public void poolAllPositions() throws Exception { int attempt = 0; while (attempt < maxAttempts) { int numSessions = 5; - SessionPoolOptions options = - this.options + options = + options .toBuilder() .setMinSessions(numSessions) .setMaxSessions(numSessions) .setWaitForMinSessions(Duration.ofSeconds(10L)) .build(); - pool = createPool(options); + pool = createPool(); pool.maybeWaitOnMinSessions(); // First check out and release the sessions twice to the pool, so we know that we have // finalized the position of them.