Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: include stack trace of checked out sessions in exception #3092

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -577,6 +579,7 @@ public PooledSessionFuture replaceSession(
numSessionsInUse--;
numSessionsReleased++;
checkedOutSessions.remove(session);
markedCheckedOutSessions.remove(session);
}
session.leakedException = null;
invalidateSession(session.get());
Expand Down Expand Up @@ -1333,6 +1336,9 @@ void clearLeakedException() {
private void markCheckedOut() {
if (options.isTrackStackTraceOfSessionCheckout()) {
this.leakedException = new LeakedSessionException();
synchronized (SessionPool.this.lock) {
SessionPool.this.markedCheckedOutSessions.add(this);
}
}
}

Expand Down Expand Up @@ -1526,6 +1532,7 @@ public ApiFuture<Empty> asyncClose() {
synchronized (lock) {
leakedException = null;
checkedOutSessions.remove(this);
markedCheckedOutSessions.remove(this);
}
}
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
Expand Down Expand Up @@ -2347,7 +2354,8 @@ private PooledSession pollUninterruptiblyWithTimeout(
"Timed out after waiting "
+ acquireSessionTimeout.toMillis()
+ "ms for acquiring session. To mitigate error SessionPoolOptions#setAcquireSessionTimeout(Duration) to set a higher timeout"
+ " or increase the number of sessions in the session pool.");
+ " or increase the number of sessions in the session pool.\n"
+ createCheckedOutSessionsStackTraces());
if (waiter.setException(exception)) {
// Only throw the exception if setting it on the waiter was successful. The
// waiter.setException(..) method returns false if some other thread in the meantime
Expand Down Expand Up @@ -2794,6 +2802,9 @@ enum Position {
@VisibleForTesting
final Set<PooledSessionFuture> checkedOutSessions = new HashSet<>();

@GuardedBy("lock")
private final Set<PooledSessionFuture> markedCheckedOutSessions = new HashSet<>();

private final SessionConsumer sessionConsumer = new SessionConsumerImpl();

private final MultiplexedSessionInitializationConsumer multiplexedSessionInitializationConsumer =
Expand Down Expand Up @@ -3012,6 +3023,13 @@ int getNumberOfSessionsInUse() {
}
}

@VisibleForTesting
int getMaxSessionsInUse() {
synchronized (lock) {
return maxSessionsInUse;
}
}

@VisibleForTesting
double getRatioOfSessionsInUse() {
synchronized (lock) {
Expand Down Expand Up @@ -3266,22 +3284,54 @@ private void incrementNumSessionsInUse(boolean isMultiplexed) {

private void maybeCreateSession() {
ISpan span = tracer.getCurrentSpan();
boolean throwResourceExhaustedException = false;
synchronized (lock) {
if (numWaiters() >= numSessionsBeingCreated) {
if (canCreateSession()) {
span.addAnnotation("Creating sessions");
createSessions(getAllowedCreateSessions(options.getIncStep()), false);
} else if (options.isFailIfPoolExhausted()) {
span.addAnnotation("Pool exhausted. Failing");
// throw specific exception
throw newSpannerException(
ErrorCode.RESOURCE_EXHAUSTED,
"No session available in the pool. Maximum number of sessions in the pool can be"
+ " overridden by invoking SessionPoolOptions#Builder#setMaxSessions. Client can be made to block"
+ " rather than fail by setting SessionPoolOptions#Builder#setBlockIfPoolExhausted.");
throwResourceExhaustedException = true;
}
}
}
if (!throwResourceExhaustedException) {
return;
}
span.addAnnotation("Pool exhausted. Failing");

String message =
"No session available in the pool. Maximum number of sessions in the pool can be"
+ " overridden by invoking SessionPoolOptions#Builder#setMaxSessions. Client can be made to block"
+ " rather than fail by setting SessionPoolOptions#Builder#setBlockIfPoolExhausted.\n"
+ createCheckedOutSessionsStackTraces();
throw newSpannerException(ErrorCode.RESOURCE_EXHAUSTED, message);
}

private StringBuilder createCheckedOutSessionsStackTraces() {
List<PooledSessionFuture> currentlyCheckedOutSessions;
synchronized (lock) {
currentlyCheckedOutSessions = new ArrayList<>(this.markedCheckedOutSessions);
}

// Create the error message without holding the lock, as we are potentially looping through a
// large set, and analyzing a large number of stack traces.
StringBuilder stackTraces =
new StringBuilder(
"There are currently "
+ currentlyCheckedOutSessions.size()
+ " sessions checked out:\n\n");
if (options.isTrackStackTraceOfSessionCheckout()) {
for (PooledSessionFuture session : currentlyCheckedOutSessions) {
if (session.leakedException != null) {
StringWriter writer = new StringWriter();
PrintWriter printWriter = new PrintWriter(writer);
session.leakedException.printStackTrace(printWriter);
stackTraces.append(writer).append("\n\n");
}
}
}
return stackTraces;
}

private void releaseSession(Tuple<PooledSession, Integer> sessionWithPosition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5060,6 +5060,62 @@ public void testRetryOnResourceExhausted() {
}
}

@Test
public void testSessionPoolExhaustedError_containsStackTraces() {
try (Spanner spanner =
SpannerOptions.newBuilder()
.setProjectId(TEST_PROJECT)
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance())
.setSessionPoolOption(
SessionPoolOptions.newBuilder()
.setFailIfPoolExhausted()
.setMinSessions(2)
.setMaxSessions(4)
.setWaitForMinSessions(Duration.ofSeconds(10L))
.build())
.build()
.getService()) {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
List<TransactionManager> transactions = new ArrayList<>();
// Deliberately leak 4 sessions.
for (int i = 0; i < 4; i++) {
// Get a transaction manager without doing anything with it. This will reserve a session
// from
// the pool, but not increase the number of sessions marked as in use.
transactions.add(client.transactionManager());
}
// Trying to get yet another transaction will fail.
// NOTE: This fails directly, because we have set the setFailIfPoolExhausted() option.
SpannerException spannerException =
assertThrows(SpannerException.class, client::transactionManager);
assertEquals(ErrorCode.RESOURCE_EXHAUSTED, spannerException.getErrorCode());
assertTrue(
spannerException.getMessage(),
spannerException.getMessage().contains("There are currently 4 sessions checked out:"));
assertTrue(
spannerException.getMessage(),
spannerException.getMessage().contains("Session was checked out from the pool at"));

SessionPool pool = ((DatabaseClientImpl) client).pool;
// Verify that there are no sessions in the pool.
assertEquals(0, pool.getNumberOfSessionsInPool());
// Verify that the sessions have not (yet) been marked as in use.
assertEquals(0, pool.getNumberOfSessionsInUse());
assertEquals(0, pool.getMaxSessionsInUse());
// Verify that we have 4 sessions in the pool.
assertEquals(4, pool.getTotalSessionsPlusNumSessionsBeingCreated());

// Release the sessions back into the pool.
for (TransactionManager transaction : transactions) {
transaction.close();
}
// Closing the transactions should return the sessions to the pool.
assertEquals(4, pool.getNumberOfSessionsInPool());
}
}

static void assertAsString(String expected, ResultSet resultSet, int col) {
assertEquals(expected, resultSet.getValue(col).getAsString());
assertEquals(ImmutableList.of(expected), resultSet.getValue(col).getAsStringList());
Expand Down
Loading