Skip to content

Commit

Permalink
perf: close sessions async (#24)
Browse files Browse the repository at this point in the history
* perf: close sessions async

Sessions should be closed using an async gRPC call in order
to speed up the closing of a large session pool. Instead of
using its own executor, which is limited to 8 worker threads,
to execute asynchronous delete session calls, the session pool
should use the asynchronous call option in gRPC. This allows a
larger number of asynchronous delete session calls to be executed
in parallel and speeds up closing a session pool with a large
number of sessions.

Testing against a real Cloud Spanner database with a session pool
containing 1,000 sessions shows the following performance for
closing the session pool:

Before (3 runs):
6603ms
8169ms
8367ms

After (3 runs):
1297ms
1710ms
1851ms

Fixes #19.

* fix: wait for test servers to terminate

* remove tracing for async call

* do not use directExecutor which could be a gRPC thread

* fix: return failed future instead of throwing exception

* fix: remove commented code
  • Loading branch information
olavloite authored and skuruppu committed Jan 23, 2020
1 parent 11e4a90 commit ab25087
Show file tree
Hide file tree
Showing 17 changed files with 111 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.protobuf.Empty;

/**
* A {@code Session} can be used to perform transactions that read and/or modify data in a Cloud
* Spanner database.
Expand Down Expand Up @@ -54,4 +57,10 @@ public interface Session extends DatabaseClient, AutoCloseable {

@Override
void close();

/**
* Closes the session asynchronously and returns the {@link ApiFuture} that can be used to monitor
* the operation progress.
*/
ApiFuture<Empty> asyncClose();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.core.ApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
Expand All @@ -27,6 +28,7 @@
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
Expand Down Expand Up @@ -196,6 +198,11 @@ public void prepareReadWriteTransaction() {
readyTransactionId = beginTransaction();
}

@Override
public ApiFuture<Empty> asyncClose() {
return spanner.getRpc().asyncDeleteSession(name, options);
}

@Override
public void close() {
Span span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION).startSpan();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
Expand All @@ -35,6 +37,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import io.opencensus.common.Scope;
import io.opencensus.trace.Annotation;
import io.opencensus.trace.AttributeValue;
Expand Down Expand Up @@ -767,6 +770,12 @@ public TransactionRunner readWriteTransaction() {
return new SessionPoolTransactionRunner(SessionPool.this, this);
}

@Override
public ApiFuture<Empty> asyncClose() {
close();
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}

@Override
public void close() {
synchronized (lock) {
Expand Down Expand Up @@ -999,7 +1008,7 @@ private void closeIdleSessions(Instant currTime) {
}
for (PooledSession sess : sessionsToClose) {
logger.log(Level.FINE, "Closing session {0}", sess.getName());
closeSession(sess);
closeSessionAsync(sess);
}
}

Expand Down Expand Up @@ -1612,37 +1621,27 @@ int totalSessions() {
}
}

private void closeSessionAsync(final PooledSession sess) {
executor.submit(
private ApiFuture<Empty> closeSessionAsync(final PooledSession sess) {
ApiFuture<Empty> res = sess.delegate.asyncClose();
res.addListener(
new Runnable() {
@Override
public void run() {
closeSession(sess);
synchronized (lock) {
allSessions.remove(sess);
if (isClosed()) {
decrementPendingClosures(1);
return;
}
// Create a new session if needed to unblock some waiter.
if (numWaiters() > numSessionsBeingCreated) {
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
}
}
}
});
}

private void closeSession(PooledSession sess) {
try {
sess.delegate.close();
} catch (SpannerException e) {
// Backend will delete these sessions after a while even if we fail to close them.
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "Failed to close session: " + sess.getName(), e);
}
} finally {
synchronized (lock) {
allSessions.remove(sess);
if (isClosed()) {
decrementPendingClosures(1);
return;
}
// Create a new session if needed to unblock some waiter.
if (numWaiters() > numSessionsBeingCreated) {
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
}
}
}
},
executor);
return res;
}

private void prepareSession(final PooledSession sess) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
Expand Down Expand Up @@ -523,9 +524,14 @@ public Session createSession(
@Override
public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException {
get(asyncDeleteSession(sessionName, options));
}

@Override
public ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options) {
DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build();
GrpcCallContext context = newCallContext(options, sessionName);
get(spannerStub.deleteSessionCallable().futureCall(request, context));
return spannerStub.deleteSessionCallable().futureCall(request, context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner.spi.v1;

import com.google.api.core.ApiFuture;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.SpannerException;
Expand Down Expand Up @@ -219,6 +220,9 @@ Session createSession(

void deleteSession(String sessionName, @Nullable Map<Option, ?> options) throws SpannerException;

ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options)
throws SpannerException;

StreamingCall read(
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.SessionPool.Clock;
import com.google.protobuf.Empty;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -61,6 +63,7 @@ SessionImpl mockSession() {
when(session.getName())
.thenReturn(
"projects/dummy/instances/dummy/database/dummy/sessions/session" + sessionIndex);
when(session.asyncClose()).thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
sessionIndex++;
return session;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,9 @@ public static void startStaticServer() throws IOException {
}

@AfterClass
public static void stopServer() {
public static void stopServer() throws InterruptedException {
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,9 @@ public static void startStaticServer() throws IOException {
}

@AfterClass
public static void stopServer() {
public static void stopServer() throws InterruptedException {
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,10 @@ public static void startStaticServer() throws IOException {
}

@AfterClass
public static void stopServer() {
public static void stopServer() throws InterruptedException {
spannerClient.close();
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -55,13 +56,19 @@ public static void startStaticServer() throws IOException {
mockSpanner = new MockSpannerServiceImpl();
mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions.
String uniqueName = InProcessServerBuilder.generateName();
server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start();
server =
InProcessServerBuilder.forName(uniqueName)
.scheduledExecutorService(new ScheduledThreadPoolExecutor(1))
.addService(mockSpanner)
.build()
.start();
channelProvider = LocalChannelProvider.create(uniqueName);
}

@AfterClass
public static void stopServer() {
public static void stopServer() throws InterruptedException {
server.shutdown();
server.awaitTermination();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SessionPool.SessionConsumerImpl;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -152,14 +155,15 @@ public ResultSet answer(InvocationOnMock invocation) throws Throwable {
});
when(mockResult.next()).thenReturn(true);
doAnswer(
new Answer<Void>() {
new Answer<ApiFuture<Empty>>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
public ApiFuture<Empty> answer(InvocationOnMock invocation) throws Throwable {
synchronized (lock) {
if (expiredSessions.contains(session.getName())) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND, "Session not found");
return ApiFutures.immediateFailedFuture(
SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND, "Session not found"));
}
if (sessions.remove(session.getName()) == null) {
setFailed(closedSessions.get(session.getName()));
Expand All @@ -169,11 +173,11 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
minSessionsWhenSessionClosed = sessions.size();
}
}
return null;
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}
})
.when(session)
.close();
.asyncClose();

doAnswer(
new Answer<Void>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
Expand All @@ -45,6 +47,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
Expand Down Expand Up @@ -217,8 +220,8 @@ public void run() {
leakedSession.clearLeakedException();
session1.close();
pool.closeAsync().get(5L, TimeUnit.SECONDS);
verify(mockSession1).close();
verify(mockSession2).close();
verify(mockSession1).asyncClose();
verify(mockSession2).asyncClose();
}

@Test
Expand Down Expand Up @@ -874,16 +877,16 @@ public void run() {
.asyncBatchCreateSessions(Mockito.eq(1), any(SessionConsumer.class));
for (Session session : new Session[] {session1, session2, session3}) {
doAnswer(
new Answer<Void>() {
new Answer<ApiFuture<Empty>>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
public ApiFuture<Empty> answer(InvocationOnMock invocation) throws Throwable {
numSessionClosed.incrementAndGet();
return null;
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}
})
.when(session)
.close();
.asyncClose();
}
FakeClock clock = new FakeClock();
clock.currentTimeMillis = System.currentTimeMillis();
Expand Down Expand Up @@ -1161,6 +1164,8 @@ public void testSessionNotFoundReadWriteTransaction() {
SpannerRpc.StreamingCall closedStreamingCall = mock(SpannerRpc.StreamingCall.class);
doThrow(sessionNotFound).when(closedStreamingCall).request(Mockito.anyInt());
SpannerRpc rpc = mock(SpannerRpc.class);
when(rpc.asyncDeleteSession(Mockito.anyString(), Mockito.anyMap()))
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(rpc.executeQuery(
any(ExecuteSqlRequest.class), any(ResultStreamConsumer.class), any(Map.class)))
.thenReturn(closedStreamingCall);
Expand All @@ -1177,13 +1182,17 @@ public void testSessionNotFoundReadWriteTransaction() {
hasPreparedTransaction ? ByteString.copyFromUtf8("test-txn") : null;
final TransactionContextImpl closedTransactionContext =
new TransactionContextImpl(closedSession, preparedTransactionId, rpc, 10);
when(closedSession.asyncClose())
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(closedSession.newTransaction()).thenReturn(closedTransactionContext);
when(closedSession.beginTransaction()).thenThrow(sessionNotFound);
TransactionRunnerImpl closedTransactionRunner =
new TransactionRunnerImpl(closedSession, rpc, 10);
when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner);

final SessionImpl openSession = mock(SessionImpl.class);
when(openSession.asyncClose())
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
when(openSession.getName())
.thenReturn("projects/dummy/instances/dummy/database/dummy/sessions/session-open");
final TransactionContextImpl openTransactionContext = mock(TransactionContextImpl.class);
Expand Down
Loading

0 comments on commit ab25087

Please sign in to comment.