Skip to content

Commit

Permalink
fix: multiplexed session metrics were not included in refactor move (#…
Browse files Browse the repository at this point in the history
…3088)

The metrics for multiplexed sessions were not included in the
refactoring that moved multiplexed sessions out of the session pool.
This change re-adds those metrics based on the new client for
multiplexed sessions.
  • Loading branch information
olavloite authored May 3, 2024
1 parent 86481b5 commit f3589c4
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,7 @@ public final void invalidate() {

@Override
public void close() {
session.onTransactionDone();
span.end();
synchronized (lock) {
isClosed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,14 @@ public ResultSet analyzeQuery(Statement statement, QueryAnalyzeMode queryMode) {
}

@Override
public void close() {}
public void close() {
try {
this.readContextFuture.get().close();
} catch (Throwable ignore) {
// Ignore any errors during close, as this error has already propagated to the user through
// other means.
}
}

/**
* Represents a {@link ReadContext} using a multiplexed session that is not yet ready. The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -63,6 +64,8 @@ static class MultiplexedSessionTransaction extends SessionImpl {

private final int singleUseChannelHint;

private boolean done;

MultiplexedSessionTransaction(
MultiplexedSessionDatabaseClient client,
ISpan span,
Expand All @@ -73,6 +76,7 @@ static class MultiplexedSessionTransaction extends SessionImpl {
this.client = client;
this.singleUse = singleUse;
this.singleUseChannelHint = singleUseChannelHint;
this.client.numSessionsAcquired.incrementAndGet();
setCurrentSpan(span);
}

Expand Down Expand Up @@ -103,6 +107,20 @@ void onReadDone() {
}
}

@Override
void onTransactionDone() {
boolean markedDone = false;
synchronized (this) {
if (!this.done) {
this.done = true;
markedDone = true;
}
}
if (markedDone) {
client.numSessionsReleased.incrementAndGet();
}
}

@Override
public void close() {
// no-op, we don't want to delete the multiplexed session.
Expand Down Expand Up @@ -152,6 +170,10 @@ public void close() {
private final AtomicReference<ResourceNotFoundException> resourceNotFoundException =
new AtomicReference<>();

private final AtomicLong numSessionsAcquired = new AtomicLong();

private final AtomicLong numSessionsReleased = new AtomicLong();

/**
* This flag is set to true if the server return UNIMPLEMENTED when we try to create a multiplexed
* session. TODO: Remove once this is guaranteed to be available.
Expand Down Expand Up @@ -239,6 +261,14 @@ boolean isValid() {
return resourceNotFoundException.get() == null;
}

AtomicLong getNumSessionsAcquired() {
return this.numSessionsAcquired;
}

AtomicLong getNumSessionsReleased() {
return this.numSessionsReleased;
}

boolean isMultiplexedSessionsSupported() {
return !this.unimplemented.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,8 @@ void onError(SpannerException spannerException) {}

void onReadDone() {}

void onTransactionDone() {}

TraceWrapper getTracer() {
return tracer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
Expand Down Expand Up @@ -156,7 +155,8 @@ void maybeWaitOnMinSessions() {
}
}

private abstract static class CachedResultSetSupplier implements Supplier<ResultSet> {
private abstract static class CachedResultSetSupplier
implements com.google.common.base.Supplier<ResultSet> {

private ResultSet cached;

Expand Down Expand Up @@ -2265,7 +2265,6 @@ public String getName() {
@Override
public void close() {
synchronized (lock) {
numMultiplexedSessionsReleased++;
if (lastException != null && isDatabaseOrInstanceNotFound(lastException)) {
SessionPool.this.resourceNotFoundException =
MoreObjects.firstNonNull(
Expand Down Expand Up @@ -2771,15 +2770,9 @@ enum Position {
@GuardedBy("lock")
private long numSessionsAcquired = 0;

@GuardedBy("lock")
private long numMultiplexedSessionsAcquired = 0;

@GuardedBy("lock")
private long numSessionsReleased = 0;

@GuardedBy("lock")
private long numMultiplexedSessionsReleased = 0;

@GuardedBy("lock")
private long numIdleSessionsRemoved = 0;

Expand Down Expand Up @@ -2830,7 +2823,9 @@ static SessionPool createPool(
SessionClient sessionClient,
TraceWrapper tracer,
List<LabelValue> labelValues,
Attributes attributes) {
Attributes attributes,
AtomicLong numMultiplexedSessionsAcquired,
AtomicLong numMultiplexedSessionsReleased) {
final SessionPoolOptions sessionPoolOptions = spannerOptions.getSessionPoolOptions();

// A clock instance is passed in {@code SessionPoolOptions} in order to allow mocking via tests.
Expand All @@ -2846,7 +2841,9 @@ static SessionPool createPool(
tracer,
labelValues,
spannerOptions.getOpenTelemetry(),
attributes);
attributes,
numMultiplexedSessionsAcquired,
numMultiplexedSessionsReleased);
}

static SessionPool createPool(
Expand Down Expand Up @@ -2884,7 +2881,9 @@ static SessionPool createPool(
tracer,
SPANNER_DEFAULT_LABEL_VALUES,
openTelemetry,
null);
null,
new AtomicLong(),
new AtomicLong());
}

static SessionPool createPool(
Expand All @@ -2898,7 +2897,9 @@ static SessionPool createPool(
TraceWrapper tracer,
List<LabelValue> labelValues,
OpenTelemetry openTelemetry,
Attributes attributes) {
Attributes attributes,
AtomicLong numMultiplexedSessionsAcquired,
AtomicLong numMultiplexedSessionsReleased) {
SessionPool pool =
new SessionPool(
poolOptions,
Expand All @@ -2912,7 +2913,9 @@ static SessionPool createPool(
tracer,
labelValues,
openTelemetry,
attributes);
attributes,
numMultiplexedSessionsAcquired,
numMultiplexedSessionsReleased);
pool.initPool();
return pool;
}
Expand All @@ -2929,7 +2932,9 @@ private SessionPool(
TraceWrapper tracer,
List<LabelValue> labelValues,
OpenTelemetry openTelemetry,
Attributes attributes) {
Attributes attributes,
AtomicLong numMultiplexedSessionsAcquired,
AtomicLong numMultiplexedSessionsReleased) {
this.options = options;
this.databaseRole = databaseRole;
this.executorFactory = executorFactory;
Expand All @@ -2940,8 +2945,13 @@ private SessionPool(
this.initialReleasePosition = initialReleasePosition;
this.poolMaintainer = new PoolMaintainer();
this.tracer = tracer;
this.initOpenCensusMetricsCollection(metricRegistry, labelValues);
this.initOpenTelemetryMetricsCollection(openTelemetry, attributes);
this.initOpenCensusMetricsCollection(
metricRegistry,
labelValues,
numMultiplexedSessionsAcquired,
numMultiplexedSessionsReleased);
this.initOpenTelemetryMetricsCollection(
openTelemetry, attributes, numMultiplexedSessionsAcquired, numMultiplexedSessionsReleased);
this.waitOnMinSessionsLatch =
options.getMinSessions() > 0 ? new CountDownLatch(1) : new CountDownLatch(0);
this.waitOnMultiplexedSessionsLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -3143,7 +3153,7 @@ boolean isValid() {

/**
* Returns a multiplexed session. The method fallbacks to a regular session if {@link
* SessionPoolOptions#useMultiplexedSession} is not set.
* SessionPoolOptions#getUseMultiplexedSession} is not set.
*/
SessionFutureWrapper getMultiplexedSessionWithFallback() throws SpannerException {
if (useMultiplexedSessions()) {
Expand Down Expand Up @@ -3250,8 +3260,6 @@ private void incrementNumSessionsInUse(boolean isMultiplexed) {
maxSessionsInUse = numSessionsInUse;
}
numSessionsAcquired++;
} else {
numMultiplexedSessionsAcquired++;
}
}
}
Expand Down Expand Up @@ -3775,7 +3783,10 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
* exporter, it allows users to monitor client behavior.
*/
private void initOpenCensusMetricsCollection(
MetricRegistry metricRegistry, List<LabelValue> labelValues) {
MetricRegistry metricRegistry,
List<LabelValue> labelValues,
AtomicLong numMultiplexedSessionsAcquired,
AtomicLong numMultiplexedSessionsReleased) {
if (!SpannerOptions.isEnabledOpenCensusMetrics()) {
return;
}
Expand Down Expand Up @@ -3860,18 +3871,14 @@ private void initOpenCensusMetricsCollection(
labelValuesWithRegularSessions, this, sessionPool -> sessionPool.numSessionsAcquired);
numAcquiredSessionsMetric.removeTimeSeries(labelValuesWithMultiplexedSessions);
numAcquiredSessionsMetric.createTimeSeries(
labelValuesWithMultiplexedSessions,
this,
sessionPool -> sessionPool.numMultiplexedSessionsAcquired);
labelValuesWithMultiplexedSessions, this, unused -> numMultiplexedSessionsAcquired.get());

numReleasedSessionsMetric.removeTimeSeries(labelValuesWithRegularSessions);
numReleasedSessionsMetric.createTimeSeries(
labelValuesWithRegularSessions, this, sessionPool -> sessionPool.numSessionsReleased);
numReleasedSessionsMetric.removeTimeSeries(labelValuesWithMultiplexedSessions);
numReleasedSessionsMetric.createTimeSeries(
labelValuesWithMultiplexedSessions,
this,
sessionPool -> sessionPool.numMultiplexedSessionsReleased);
labelValuesWithMultiplexedSessions, this, unused -> numMultiplexedSessionsReleased.get());

List<LabelValue> labelValuesWithBeingPreparedType = new ArrayList<>(labelValues);
labelValuesWithBeingPreparedType.add(NUM_SESSIONS_BEING_PREPARED);
Expand Down Expand Up @@ -3909,7 +3916,10 @@ private void initOpenCensusMetricsCollection(
* an exporter, it allows users to monitor client behavior.
*/
private void initOpenTelemetryMetricsCollection(
OpenTelemetry openTelemetry, Attributes attributes) {
OpenTelemetry openTelemetry,
Attributes attributes,
AtomicLong numMultiplexedSessionsAcquired,
AtomicLong numMultiplexedSessionsReleased) {
if (openTelemetry == null || !SpannerOptions.isEnabledOpenTelemetryMetrics()) {
return;
}
Expand Down Expand Up @@ -3981,7 +3991,8 @@ private void initOpenTelemetryMetricsCollection(
.buildWithCallback(
measurement -> {
measurement.record(this.numSessionsAcquired, attributesRegularSession);
measurement.record(this.numMultiplexedSessionsAcquired, attributesMultiplexedSession);
measurement.record(
numMultiplexedSessionsAcquired.get(), attributesMultiplexedSession);
});

meter
Expand All @@ -3991,7 +4002,8 @@ private void initOpenTelemetryMetricsCollection(
.buildWithCallback(
measurement -> {
measurement.record(this.numSessionsReleased, attributesRegularSession);
measurement.record(this.numMultiplexedSessionsReleased, attributesMultiplexedSession);
measurement.record(
numMultiplexedSessionsReleased.get(), attributesMultiplexedSession);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -271,17 +272,29 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
attributesBuilder.put("database", db.getDatabase());
attributesBuilder.put("instance_id", db.getInstanceId().getName());

boolean useMultiplexedSession =
getOptions().getSessionPoolOptions().getUseMultiplexedSession();
MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient =
useMultiplexedSession
? new MultiplexedSessionDatabaseClient(SpannerImpl.this.getSessionClient(db))
: null;
AtomicLong numMultiplexedSessionsAcquired =
useMultiplexedSession
? multiplexedSessionDatabaseClient.getNumSessionsAcquired()
: new AtomicLong();
AtomicLong numMultiplexedSessionsReleased =
useMultiplexedSession
? multiplexedSessionDatabaseClient.getNumSessionsReleased()
: new AtomicLong();
SessionPool pool =
SessionPool.createPool(
getOptions(),
SpannerImpl.this.getSessionClient(db),
this.tracer,
labelValues,
attributesBuilder.build());
MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient =
getOptions().getSessionPoolOptions().getUseMultiplexedSession()
? new MultiplexedSessionDatabaseClient(SpannerImpl.this.getSessionClient(db))
: null;
attributesBuilder.build(),
numMultiplexedSessionsAcquired,
numMultiplexedSessionsReleased);
pool.maybeWaitOnMinSessions();
DatabaseClientImpl dbClient =
createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3859,12 +3859,12 @@ public void testCreateSessionsFailure_shouldNotPropagateToCloseMethod() {
// Simulate session creation failures on the backend.
mockSpanner.setCreateSessionExecutionTime(
SimulatedExecutionTime.ofStickyException(Status.RESOURCE_EXHAUSTED.asRuntimeException()));
DatabaseClient client =
spannerWithEmptySessionPool.getDatabaseClient(
DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
// This will not cause any failure as getting a session from the pool is guaranteed to be
// non-blocking, and any exceptions will be delayed until actual query execution.
mockSpanner.freeze();
DatabaseClient client =
spannerWithEmptySessionPool.getDatabaseClient(
DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
try (ResultSet rs = client.singleUse().executeQuery(SELECT1)) {
mockSpanner.unfreeze();
SpannerException e = assertThrows(SpannerException.class, rs::next);
Expand Down
Loading

0 comments on commit f3589c4

Please sign in to comment.