diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 3e01112366c..8058802a8fc 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -1751,6 +1751,13 @@ final class PoolMaintainer { */ @VisibleForTesting Instant lastExecutionTime; + /** + * The previous numSessionsAcquired seen by the maintainer. This is used to calculate the + * transactions per second, which again is used to determine whether to randomize the order of + * the session pool. + */ + private long prevNumSessionsAcquired; + boolean closed = false; @GuardedBy("lock") @@ -1794,6 +1801,12 @@ void maintainPool() { return; } running = true; + if (loopFrequency >= 1000L) { + SessionPool.this.transactionsPerSecond = + (SessionPool.this.numSessionsAcquired - prevNumSessionsAcquired) + / (loopFrequency / 1000L); + } + this.prevNumSessionsAcquired = SessionPool.this.numSessionsAcquired; } Instant currTime = clock.instant(); removeIdleSessions(currTime); @@ -1995,6 +2008,7 @@ enum Position { private final SettableFuture dialect = SettableFuture.create(); private final String databaseRole; private final SessionClient sessionClient; + private final int numChannels; private final ScheduledExecutorService executor; private final ExecutorFactory executorFactory; @@ -2054,6 +2068,9 @@ enum Position { @GuardedBy("lock") private long numIdleSessionsRemoved = 0; + @GuardedBy("lock") + private long transactionsPerSecond = 0L; + @GuardedBy("lock") private long numLeakedSessionsRemoved = 0; @@ -2190,6 +2207,7 @@ private SessionPool( this.executorFactory = executorFactory; this.executor = executor; this.sessionClient = sessionClient; + this.numChannels = sessionClient.getSpanner().getOptions().getNumChannels(); this.clock = clock; this.initialReleasePosition = initialReleasePosition; this.poolMaintainer = new PoolMaintainer(); @@ -2493,11 +2511,13 @@ private void releaseSession( if (closureFuture != null) { return; } - if (waiters.size() == 0) { + if (waiters.isEmpty()) { // There are no pending waiters. - // Add to a random position if the head of the session pool already contains many sessions - // with the same channel as this one. - if (session.releaseToPosition == Position.FIRST && isUnbalanced(session)) { + // Add to a random position if the transactions per second is high or the head of the + // session pool already contains many sessions with the same channel as this one. + if (session.releaseToPosition != Position.RANDOM && shouldRandomize()) { + session.releaseToPosition = Position.RANDOM; + } else if (session.releaseToPosition == Position.FIRST && isUnbalanced(session)) { session.releaseToPosition = Position.RANDOM; } else if (session.releaseToPosition == Position.RANDOM && !isNewSession @@ -2532,6 +2552,25 @@ private void releaseSession( } } + /** + * Returns true if the position where we return the session should be random if: + * + *
    + *
  1. The current TPS is higher than the configured threshold. + *
  2. AND the number of sessions checked out is larger than the number of channels. + *
+ * + * The second check prevents the session pool from being randomized when the application is + * running many small, quick queries using a small number of parallel threads. This can cause a + * high TPS, without actually having a high degree of parallelism. + */ + @VisibleForTesting + boolean shouldRandomize() { + return this.options.getRandomizePositionQPSThreshold() > 0 + && this.transactionsPerSecond >= this.options.getRandomizePositionQPSThreshold() + && this.numSessionsInUse >= this.numChannels; + } + private boolean isUnbalanced(PooledSession session) { int channel = session.getChannel(); int numChannels = sessionClient.getSpanner().getOptions().getNumChannels(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 80d53a4d71f..2ebe77e1ba2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -64,6 +64,7 @@ public class SessionPoolOptions { private final Duration waitForMinSessions; private final Duration acquireSessionTimeout; private final Position releaseToPosition; + private final long randomizePositionQPSThreshold; /** Property for allowing mocking of session maintenance clock. */ private final Clock poolMaintainerClock; @@ -89,6 +90,7 @@ private SessionPoolOptions(Builder builder) { this.waitForMinSessions = builder.waitForMinSessions; this.acquireSessionTimeout = builder.acquireSessionTimeout; this.releaseToPosition = builder.releaseToPosition; + this.randomizePositionQPSThreshold = builder.randomizePositionQPSThreshold; this.inactiveTransactionRemovalOptions = builder.inactiveTransactionRemovalOptions; this.poolMaintainerClock = builder.poolMaintainerClock; } @@ -118,6 +120,7 @@ public boolean equals(Object o) { && Objects.equals(this.waitForMinSessions, other.waitForMinSessions) && Objects.equals(this.acquireSessionTimeout, other.acquireSessionTimeout) && Objects.equals(this.releaseToPosition, other.releaseToPosition) + && Objects.equals(this.randomizePositionQPSThreshold, other.randomizePositionQPSThreshold) && Objects.equals( this.inactiveTransactionRemovalOptions, other.inactiveTransactionRemovalOptions) && Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock); @@ -143,6 +146,7 @@ public int hashCode() { this.waitForMinSessions, this.acquireSessionTimeout, this.releaseToPosition, + this.randomizePositionQPSThreshold, this.inactiveTransactionRemovalOptions, this.poolMaintainerClock); } @@ -263,6 +267,10 @@ Position getReleaseToPosition() { return releaseToPosition; } + long getRandomizePositionQPSThreshold() { + return randomizePositionQPSThreshold; + } + public static Builder newBuilder() { return new Builder(); } @@ -451,6 +459,13 @@ public static class Builder { private Duration waitForMinSessions = Duration.ZERO; private Duration acquireSessionTimeout = Duration.ofSeconds(60); private Position releaseToPosition = getReleaseToPositionFromSystemProperty(); + /** + * The session pool will randomize the position of a session that is being returned when this + * threshold is exceeded. That is: If the transactions per second exceeds this threshold, then + * the session pool will use a random order for the sessions instead of LIFO. The default is 0, + * which means that the option is disabled. + */ + private long randomizePositionQPSThreshold = 0L; private Clock poolMaintainerClock; @@ -487,6 +502,7 @@ private Builder(SessionPoolOptions options) { this.autoDetectDialect = options.autoDetectDialect; this.waitForMinSessions = options.waitForMinSessions; this.acquireSessionTimeout = options.acquireSessionTimeout; + this.randomizePositionQPSThreshold = options.randomizePositionQPSThreshold; this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions; this.poolMaintainerClock = options.poolMaintainerClock; } @@ -764,6 +780,13 @@ Builder setReleaseToPosition(Position releaseToPosition) { return this; } + Builder setRandomizePositionQPSThreshold(long randomizePositionQPSThreshold) { + Preconditions.checkArgument( + randomizePositionQPSThreshold >= 0L, "randomizePositionQPSThreshold must be >= 0"); + this.randomizePositionQPSThreshold = randomizePositionQPSThreshold; + return this; + } + /** Build a SessionPoolOption object */ public SessionPoolOptions build() { validate(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java index f629c0fc1d6..b489511b73b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java @@ -18,6 +18,8 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -29,6 +31,7 @@ import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SessionPool.Position; import com.google.cloud.spanner.SessionPool.SessionConsumerImpl; +import com.google.common.base.Preconditions; import io.opencensus.trace.Tracing; import io.opentelemetry.api.OpenTelemetry; import java.util.ArrayList; @@ -116,6 +119,10 @@ private SessionImpl setupMockSession(final SessionImpl session, final ReadContex } private SessionPool createPool() throws Exception { + return createPool(this.options); + } + + private SessionPool createPool(SessionPoolOptions options) throws Exception { // Allow sessions to be added to the head of the pool in all cases in this test, as it is // otherwise impossible to know which session exactly is getting pinged at what point in time. SessionPool pool = @@ -324,4 +331,67 @@ public void testIdleSessions() throws Exception { } assertThat(pool.totalSessions()).isEqualTo(options.getMinSessions()); } + + @Test + public void testRandomizeThreshold() throws Exception { + SessionPool pool = + createPool( + this.options + .toBuilder() + .setMaxSessions(400) + .setLoopFrequency(1000L) + .setRandomizePositionQPSThreshold(4) + .build()); + List sessions; + + // Run a maintenance loop. No sessions have been checked out so far, so the TPS should be 0. + runMaintenanceLoop(clock, pool, 1); + assertFalse(pool.shouldRandomize()); + + // Get and return one session. This means TPS == 1. + returnSessions(1, useSessions(1, pool)); + runMaintenanceLoop(clock, pool, 1); + assertFalse(pool.shouldRandomize()); + + // Get and return four sessions. This means TPS == 4, and that no sessions are checked out. + returnSessions(4, useSessions(4, pool)); + runMaintenanceLoop(clock, pool, 1); + assertFalse(pool.shouldRandomize()); + + // Get four sessions without returning them. + // This means TPS == 4 and that they are all still checked out. + sessions = useSessions(4, pool); + runMaintenanceLoop(clock, pool, 1); + assertTrue(pool.shouldRandomize()); + // Returning one of the sessions reduces the number of checked out sessions enough to stop the + // randomizing. + returnSessions(1, sessions); + runMaintenanceLoop(clock, pool, 1); + assertFalse(pool.shouldRandomize()); + + // Get three more session and run the maintenance loop. + // The TPS is then 3, as we've only gotten 3 sessions since the last maintenance run. + // That means that we should not randomize. + sessions.addAll(useSessions(3, pool)); + runMaintenanceLoop(clock, pool, 1); + assertFalse(pool.shouldRandomize()); + + returnSessions(sessions.size(), sessions); + } + + private List useSessions(int numSessions, SessionPool pool) { + List sessions = new ArrayList<>(numSessions); + for (int i = 0; i < numSessions; i++) { + sessions.add(pool.getSession()); + sessions.get(sessions.size() - 1).singleUse().executeQuery(Statement.of("SELECT 1")).next(); + } + return sessions; + } + + private void returnSessions(int numSessions, List sessions) { + Preconditions.checkArgument(numSessions <= sessions.size()); + for (int i = 0; i < numSessions; i++) { + sessions.remove(0).close(); + } + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java index 23aa626f393..22d10d92a87 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -218,4 +219,31 @@ public void verifyDefaultAcquireSessionTimeout() { assertEquals(Duration.ofSeconds(60), sessionPoolOptions.getAcquireSessionTimeout()); } + + @Test + public void testRandomizePositionQPSThreshold() { + assertEquals(0L, SessionPoolOptions.newBuilder().build().getRandomizePositionQPSThreshold()); + assertEquals( + 4L, + SessionPoolOptions.newBuilder() + .setRandomizePositionQPSThreshold(4L) + .build() + .getRandomizePositionQPSThreshold()); + assertEquals( + 10L, + SessionPoolOptions.newBuilder() + .setRandomizePositionQPSThreshold(4L) + .setRandomizePositionQPSThreshold(10L) + .build() + .getRandomizePositionQPSThreshold()); + assertEquals( + 0L, + SessionPoolOptions.newBuilder() + .setRandomizePositionQPSThreshold(0L) + .build() + .getRandomizePositionQPSThreshold()); + assertThrows( + IllegalArgumentException.class, + () -> SessionPoolOptions.newBuilder().setRandomizePositionQPSThreshold(-1L)); + } }