Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid unbalanced session pool creation #2442

Merged
merged 28 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
bb09e1b
fix: avoid unbalanced session pool creation
olavloite May 19, 2023
c2ef5b1
fix: automatically balance pool
olavloite May 19, 2023
8a44f2b
fix: skip empty pool
olavloite May 19, 2023
75773fd
fix: shuffle if unbalanced
olavloite May 19, 2023
913fa8a
fix: only reset randomness if actually randomized
olavloite May 20, 2023
4130c8c
test: randomize if many sessions are checked out
olavloite May 20, 2023
6c48990
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] May 20, 2023
f8637bb
test: try with more channels
olavloite May 22, 2023
be9fca2
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] May 22, 2023
9be0abe
Merge branch 'main' into randomize-session-position-at-first-add
olavloite May 31, 2023
f260eca
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] May 31, 2023
8939c7e
fix: also consider checked out sessions for unbalanced pool
olavloite Jun 1, 2023
80ded1b
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jun 1, 2023
8a84488
Merge branch 'main' into randomize-session-position-at-first-add
olavloite Jul 31, 2023
8346c5c
docs: add javadoc for property
olavloite Jul 31, 2023
7a6e536
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 31, 2023
5e3d68d
perf: optimize low-QPS workloads
olavloite Aug 1, 2023
ed25c99
test: only randomize if more than 2 sessions are checked out
olavloite Aug 2, 2023
bc406f4
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Aug 2, 2023
104b36e
test: only skip randomization for existing sessions
olavloite Aug 2, 2023
f5095f3
Merge branch 'randomize-session-position-at-first-add' of github.com:…
olavloite Aug 2, 2023
6885745
Merge branch 'main' into randomize-session-position-at-first-add
olavloite Aug 29, 2023
86909e3
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Aug 29, 2023
8e7ea3a
chore: run formatter
olavloite Aug 29, 2023
c6b1542
Merge branch 'randomize-session-position-at-first-add' of github.com:…
olavloite Aug 29, 2023
30d7edc
chore: address review comments
olavloite Sep 7, 2023
51320ab
Merge branch 'main' into randomize-session-position-at-first-add
olavloite Sep 7, 2023
3c6060a
docs: update comment on how session is added to the pool
olavloite Sep 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.google.cloud.spanner.SessionPoolOptions.InactiveTransactionRemovalOptions;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -1366,12 +1367,19 @@ PooledSession get(final boolean eligibleForLongRunning) {
}
}

final class PooledSession implements Session {
class PooledSession implements Session {
@VisibleForTesting SessionImpl delegate;
private volatile Instant lastUseTime;
private volatile SpannerException lastException;
private volatile boolean allowReplacing = true;

/**
* This ensures that the session is added at a random position in the pool the first time it is
* actually added to the pool.
*/
@GuardedBy("lock")
private Position releaseToPosition = initialReleasePosition;

/**
* Property to mark if the session is eligible to be long-running. This can only be true if the
* session is executing certain types of transactions (for ex - Partitioned DML) which can be
Expand Down Expand Up @@ -1403,6 +1411,13 @@ private PooledSession(SessionImpl delegate) {
this.lastUseTime = clock.instant();
}

int getChannel() {
Long channelHint = (Long) delegate.getOptions().get(SpannerRpc.Option.CHANNEL_HINT);
return channelHint == null
? 0
: (int) (channelHint % sessionClient.getSpanner().getOptions().getNumChannels());
}

@Override
public String toString() {
return getName();
Expand Down Expand Up @@ -1536,7 +1551,7 @@ public void close() {
if (state != SessionState.CLOSING) {
state = SessionState.AVAILABLE;
}
releaseSession(this, Position.FIRST);
releaseSession(this, false);
}
}

Expand Down Expand Up @@ -1576,7 +1591,7 @@ private void determineDialectAsync(final SettableFuture<Dialect> dialect) {
// in the database dialect, and there's nothing sensible that we can do with it here.
dialect.setException(t);
} finally {
releaseSession(this, Position.FIRST);
releaseSession(this, false);
}
});
}
Expand Down Expand Up @@ -1830,7 +1845,7 @@ private void keepAliveSessions(Instant currTime) {
logger.log(Level.FINE, "Keeping alive session " + sessionToKeepAlive.getName());
numSessionsToKeepAlive--;
sessionToKeepAlive.keepAlive();
releaseSession(sessionToKeepAlive, Position.FIRST);
releaseSession(sessionToKeepAlive, false);
} catch (SpannerException e) {
handleException(e, sessionToKeepAlive);
}
Expand Down Expand Up @@ -1929,7 +1944,7 @@ private void removeLongRunningSessions(
}
}

private enum Position {
enum Position {
FIRST,
RANDOM
}
Expand Down Expand Up @@ -1962,6 +1977,15 @@ private enum Position {

final PoolMaintainer poolMaintainer;
private final Clock clock;
/**
* initialReleasePosition determines where in the pool sessions are added when they are released
* into the pool the first time. This is always RANDOM in production, but some tests use FIRST to
* be able to verify the order of sessions in the pool. Using RANDOM ensures that we do not get an
* unbalanced session pool where all sessions belonging to one gRPC channel are added to the same
* region in the pool.
*/
private final Position initialReleasePosition;

private final Object lock = new Object();
private final Random random = new Random();

Expand Down Expand Up @@ -2045,6 +2069,7 @@ static SessionPool createPool(
((GrpcTransportOptions) spannerOptions.getTransportOptions()).getExecutorFactory(),
sessionClient,
poolMaintainerClock == null ? new Clock() : poolMaintainerClock,
Position.RANDOM,
Metrics.getMetricRegistry(),
labelValues);
}
Expand All @@ -2053,20 +2078,22 @@ static SessionPool createPool(
SessionPoolOptions poolOptions,
ExecutorFactory<ScheduledExecutorService> executorFactory,
SessionClient sessionClient) {
return createPool(poolOptions, executorFactory, sessionClient, new Clock());
return createPool(poolOptions, executorFactory, sessionClient, new Clock(), Position.RANDOM);
}

static SessionPool createPool(
SessionPoolOptions poolOptions,
ExecutorFactory<ScheduledExecutorService> executorFactory,
SessionClient sessionClient,
Clock clock) {
Clock clock,
Position initialReleasePosition) {
return createPool(
poolOptions,
null,
executorFactory,
sessionClient,
clock,
initialReleasePosition,
Metrics.getMetricRegistry(),
SPANNER_DEFAULT_LABEL_VALUES);
}
Expand All @@ -2077,6 +2104,7 @@ static SessionPool createPool(
ExecutorFactory<ScheduledExecutorService> executorFactory,
SessionClient sessionClient,
Clock clock,
Position initialReleasePosition,
MetricRegistry metricRegistry,
List<LabelValue> labelValues) {
SessionPool pool =
Expand All @@ -2087,6 +2115,7 @@ static SessionPool createPool(
executorFactory.get(),
sessionClient,
clock,
initialReleasePosition,
metricRegistry,
labelValues);
pool.initPool();
Expand All @@ -2100,6 +2129,7 @@ private SessionPool(
ScheduledExecutorService executor,
SessionClient sessionClient,
Clock clock,
Position initialReleasePosition,
MetricRegistry metricRegistry,
List<LabelValue> labelValues) {
this.options = options;
Expand All @@ -2108,6 +2138,7 @@ private SessionPool(
this.executor = executor;
this.sessionClient = sessionClient;
this.clock = clock;
this.initialReleasePosition = initialReleasePosition;
this.poolMaintainer = new PoolMaintainer();
this.initMetricsCollection(metricRegistry, labelValues);
this.waitOnMinSessionsLatch =
Expand Down Expand Up @@ -2233,7 +2264,7 @@ private void handleException(SpannerException e, PooledSession session) {
if (isSessionNotFound(e)) {
invalidateSession(session);
} else {
releaseSession(session, Position.FIRST);
releaseSession(session, false);
}
}

Expand Down Expand Up @@ -2396,33 +2427,128 @@ private void maybeCreateSession() {
}
}
}

/** Releases a session back to the pool. This might cause one of the waiters to be unblocked. */
private void releaseSession(PooledSession session, Position position) {
private void releaseSession(PooledSession session, boolean isNewSession) {
Preconditions.checkNotNull(session);
synchronized (lock) {
if (closureFuture != null) {
return;
}
if (waiters.size() == 0) {
// No pending waiters
switch (position) {
case RANDOM:
if (!sessions.isEmpty()) {
int pos = random.nextInt(sessions.size() + 1);
sessions.add(pos, session);
break;
}
// fallthrough
case FIRST:
default:
sessions.addFirst(session);
// There are no pending waiters.
// Add to a random position if the head of the session pool already contains many sessions
// with the same channel as this one.
if (session.releaseToPosition == Position.FIRST && isUnbalanced(session)) {
session.releaseToPosition = Position.RANDOM;
} else if (session.releaseToPosition == Position.RANDOM
&& !isNewSession
&& checkedOutSessions.size() <= 2) {
// Do not randomize if there are few other sessions checked out and this session has been
// used. This ensures that this session will be re-used for the next transaction, which is
// more efficient.
session.releaseToPosition = Position.FIRST;
}
if (session.releaseToPosition == Position.RANDOM && !sessions.isEmpty()) {
// A session should only be added at a random position the first time it is added to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the condition !sessions.isEmpty(), apart from the first session, all other sessions will enter the else block and get added to the front of the queue. If this is intended, then isn't this comment inconsistent?

How are we measuring if a session was added for the first time or has been added before. Shouldn't that be a different state variable that get's toggled the first time the session was added to the pool?

Or are you saying session.releaseToPosition == Position.RANDOM returns true only when session was released first time. I guess all other releases will toggle session.releaseToPosition to Position.FIRST

Assuming that session.releaseToPosition = Position.RANDOM indicates first time insertion isn't great right. And it can easily break the algorithm.

WDYT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment was indeed outdated. It was initially written before adding the isUnbalanced(..) part to the solution, as I initially thought that it would be enough to only randomize the order when a session was initially added to the pool. That was not enough, because it could easily happen that a session would be used multiple times before it would actually be released into the pool (that is; because there was always a waiter for a session, which meant that the session was given to that waiter instead of being released to the pool). In addition, it could still be that an initial randomization would give an unbalanced pool.

I've updated the comment to reflect the current situation.

Due to the condition !sessions.isEmpty(), apart from the first session, all other sessions will enter the else block and get added to the front of the queue. If this is intended, then isn't this comment inconsistent?

Hmmm.... I'm not sure I understand your question correctly. The !sessions.isEmpty() ensures that only the first session to be released into the pool enters the else block. We do that because adding it at a random place in an empty list does not make sense.

How are we measuring if a session was added for the first time or has been added before. Shouldn't that be a different state variable that get's toggled the first time the session was added to the pool?

The original implementation would only set it to RANDOM when the session was created, and then reset it to FIRST here. So in that implementation, it was a valid indicator for whether it was the first time it was added. With the addition of checking for an unbalanced pool, that has changed (but the comment unfortunately not), and a session can be added at a random position multiple times, but the value is always reset to FIRST after it has been added, as that is the default as long as the pool remains balanced.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I had got confused thinking each session when it enters the queue first has to be put in front of the queue. But I think you meant only the first session. Now it's clear.

// the pool or if the pool was deemed unbalanced. All following releases into the pool
// should normally happen at the front of the pool (unless the pool is again deemed to be
// unbalanced).
session.releaseToPosition = Position.FIRST;
int pos = random.nextInt(sessions.size() + 1);
sessions.add(pos, session);
} else {
sessions.addFirst(session);
}
} else {
waiters.poll().put(session);
}
}
}

private boolean isUnbalanced(PooledSession session) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add @VisibleForTesting and add a few more unit tests for this method? It's doing a bunch of things which is currently untested with the existing tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point. I've changed it a bit so we have a static method that does the actual calculation. That method is easier to test than one that depends on all the state fields in the SessionPool class. And I've added different tests for that method.

int channel = session.getChannel();
int numChannels = sessionClient.getSpanner().getOptions().getNumChannels();
return isUnbalanced(channel, this.sessions, this.checkedOutSessions, numChannels);
}

/**
* Returns true if the given list of sessions is considered unbalanced when compared to the
* sessionChannel that is about to be added to the pool.
*
* <p>The method returns true if all the following is true:
*
* <ol>
* <li>The list of sessions is not empty.
* <li>The number of checked out sessions is > 2.
* <li>The number of channels being used by the pool is > 1.
* <li>And at least one of the following is true:
* <ol>
* <li>The first numChannels sessions in the list of sessions contains more than 2
* sessions that use the same channel as the one being added.
* <li>The list of currently checked out sessions contains more than 2 times the the
* number of sessions with the same channel as the one being added than it should in
* order for it to be perfectly balanced. Perfectly balanced in this case means that
* the list should preferably contain size/numChannels sessions of each channel.
* </ol>
* </ol>
*
* @param channelOfSessionBeingAdded the channel number being used by the session that is about to
* be released into the pool
* @param sessions the list of all sessions in the pool
* @param checkedOutSessions the currently checked out sessions of the pool
* @param numChannels the number of channels in use
* @return true if the pool is considered unbalanced, and false otherwise
*/
@VisibleForTesting
static boolean isUnbalanced(
int channelOfSessionBeingAdded,
List<PooledSession> sessions,
Set<PooledSessionFuture> checkedOutSessions,
int numChannels) {
// Do not re-balance the pool if the number of checked out sessions is low, as it is
// better to re-use sessions as much as possible in a low-QPS scenario.
if (sessions.isEmpty() || checkedOutSessions.size() <= 2) {
return false;
}
if (numChannels == 1) {
return false;
}

// Ideally, the first numChannels sessions in the pool should contain exactly one session for
// each channel.
// Check if the first numChannels sessions at the head of the pool already contain more than 2
// sessions that use the same channel as this one. If so, we re-balance.
// We also re-balance the pool in the specific case that the pool uses 2 channels and the first
// two sessions use those two channels.
int maxSessionsAtHeadOfPool = Math.min(numChannels, 3);
int count = 0;
for (int i = 0; i < Math.min(numChannels, sessions.size()); i++) {
PooledSession otherSession = sessions.get(i);
if (channelOfSessionBeingAdded == otherSession.getChannel()) {
count++;
if (count >= maxSessionsAtHeadOfPool) {
return true;
}
}
}
// Ideally, the use of a channel in the checked out sessions is exactly
// numCheckedOut / numChannels
// We check whether we are more than a factor two away from that perfect distribution.
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
// If we are, then we re-balance.
count = 0;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a bug in the previous version that was found with the additional test cases.

int checkedOutThreshold = Math.max(2, 2 * checkedOutSessions.size() / numChannels);
for (PooledSessionFuture otherSession : checkedOutSessions) {
if (otherSession.isDone() && channelOfSessionBeingAdded == otherSession.get().getChannel()) {
count++;
if (count > checkedOutThreshold) {
return true;
}
}
}
return false;
}

private void handleCreateSessionsFailure(SpannerException e, int count) {
synchronized (lock) {
for (int i = 0; i < count; i++) {
Expand Down Expand Up @@ -2622,7 +2748,7 @@ public void onSessionReady(SessionImpl session) {
// Release the session to a random position in the pool to prevent the case that a batch
// of sessions that are affiliated with the same channel are all placed sequentially in
// the pool.
releaseSession(pooledSession, Position.RANDOM);
releaseSession(pooledSession, true);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,21 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.SessionPool.Clock;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.protobuf.Empty;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.threeten.bp.Instant;

abstract class BaseSessionPoolTest {
ScheduledExecutorService mockExecutor;
int sessionIndex;
AtomicLong channelHint = new AtomicLong(0L);

final class TestExecutorFactory implements ExecutorFactory<ScheduledExecutorService> {

Expand Down Expand Up @@ -64,6 +69,9 @@ public void release(ScheduledExecutorService executor) {
@SuppressWarnings("unchecked")
SessionImpl mockSession() {
final SessionImpl session = mock(SessionImpl.class);
Map options = new HashMap<>();
options.put(Option.CHANNEL_HINT, channelHint.getAndIncrement());
when(session.getOptions()).thenReturn(options);
when(session.getName())
.thenReturn(
"projects/dummy/instances/dummy/database/dummy/sessions/session" + sessionIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SessionPool.PooledSession;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SessionPool.Position;
import com.google.cloud.spanner.SessionPool.SessionConsumerImpl;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -58,6 +59,7 @@ public void setUp() {
initMocks(this);
when(client.getOptions()).thenReturn(spannerOptions);
when(client.getSessionClient(db)).thenReturn(sessionClient);
when(sessionClient.getSpanner()).thenReturn(client);
when(spannerOptions.getNumChannels()).thenReturn(4);
when(spannerOptions.getDatabaseRole()).thenReturn("role");
setupMockSessionCreation();
Expand Down Expand Up @@ -111,9 +113,11 @@ private SessionImpl setupMockSession(final SessionImpl session) {
}

private SessionPool createPool() throws Exception {
// Allow sessions to be added to the head of the pool in all cases in this test, as it is
// otherwise impossible to know which session exactly is getting pinged at what point in time.
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
SessionPool pool =
SessionPool.createPool(
options, new TestExecutorFactory(), client.getSessionClient(db), clock);
options, new TestExecutorFactory(), client.getSessionClient(db), clock, Position.FIRST);
pool.idleSessionRemovedListener =
input -> {
idledSessions.add(input);
Expand Down
Loading