Skip to content

Commit

Permalink
feat: move session lastUseTime parameter from PooledSession to Sessio…
Browse files Browse the repository at this point in the history
…nImpl class. Fix updation of the parameter for chained RPCs within one transaction. (#2704)

* fix: prevent illegal negative timeout values into thread sleep() method while retrying exceptions in unit tests.

* For details on issue see - #2206

* Fixing lint issues.

* refactor: move session lastUseTime parameter from PooledSession to SessionImpl class. Fix updation of the parameter for chained RPCs within one transaction.

* chore: add clock instances in callees of SessionImpl.

* chore: partially fix failing unit tests in SessionPoolTest and SessionPoolMaintainerTest.

* chore: fix failing tests in SessionPoolStressTest.

* chore: update lastUseTime for methods in SessionPoolTransactionContext. Add a couple of unit tests for testing the new behaviour.

* chore: lint errors.

* chore: fix tests in DatabaseClientImplTest by passing the mocked clock instance.

* fix: update session lastUseTime field for AbstractReadContext class. Fix the unit test to test this change.

* fix: failing tests in TransactionRunnerImplTest.

* fix: failing test in SessionPoolMaintainerTest.

* refactor: move FakeClock to a new class.

* refactor: move Clock to a new class.

* chore: resolving PR comments.

* chore: address review comments.

* chore: updating lastUseTime state in TransactionRunnerImpl. Removing redundant updates from SessionPool class.

* chore: remove redundant update statements from SessionPool class. Add more unit tests.

* chore: add more tests for TransactionRunner.

* chore: remove dead code from constructor of SessionPoolTransactionContext.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Co-authored-by: Knut Olav Løite <[email protected]>

* Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Co-authored-by: Knut Olav Løite <[email protected]>

* Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Co-authored-by: Knut Olav Løite <[email protected]>

* chore: fixing precondition errors due to null clock.

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
arpan14 and gcf-owl-bot[bot] authored Nov 3, 2023
1 parent e320753 commit e75a281
Show file tree
Hide file tree
Showing 13 changed files with 1,009 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
Expand Down Expand Up @@ -72,6 +73,7 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadCon
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
private ExecutorProvider executorProvider;
private Clock clock = new Clock();

Builder() {}

Expand Down Expand Up @@ -110,6 +112,11 @@ B setExecutorProvider(ExecutorProvider executorProvider) {
return self();
}

B setClock(Clock clock) {
this.clock = Preconditions.checkNotNull(clock);
return self();
}

abstract T build();
}

Expand Down Expand Up @@ -392,6 +399,8 @@ void initTransaction() {
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;

private final Clock clock;

@GuardedBy("lock")
private boolean isValid = true;

Expand All @@ -416,6 +425,7 @@ void initTransaction() {
this.defaultQueryOptions = builder.defaultQueryOptions;
this.span = builder.span;
this.executorProvider = builder.executorProvider;
this.clock = builder.clock;
}

@Override
Expand Down Expand Up @@ -689,6 +699,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
SpannerRpc.StreamingCall call =
rpc.executeQuery(
request.build(), stream.consumer(), session.getOptions(), isRouteToLeader());
session.markUsed(clock.instant());
call.request(prefetchChunks);
stream.setCall(call, request.getTransaction().hasBegin());
return stream;
Expand Down Expand Up @@ -826,6 +837,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
SpannerRpc.StreamingCall call =
rpc.read(
builder.build(), stream.consumer(), session.getOptions(), isRouteToLeader());
session.markUsed(clock.instant());
call.request(prefetchChunks);
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
return stream;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import org.threeten.bp.Instant;

/**
* Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8
* Clock.
*/
class Clock {
Instant instant() {
return Instant.now();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.threeten.bp.Instant;

/**
* Implementation of {@link Session}. Sessions are managed internally by the client library, and
Expand Down Expand Up @@ -98,12 +99,14 @@ interface SessionTransaction {
ByteString readyTransactionId;
private final Map<SpannerRpc.Option, ?> options;
private Span currentSpan;
private volatile Instant lastUseTime;

SessionImpl(SpannerImpl spanner, String name, Map<SpannerRpc.Option, ?> options) {
this.spanner = spanner;
this.options = options;
this.name = checkNotNull(name);
this.databaseId = SessionId.of(name).getDatabaseId();
this.lastUseTime = Instant.now();
}

@Override
Expand All @@ -123,6 +126,14 @@ Span getCurrentSpan() {
return currentSpan;
}

Instant getLastUseTime() {
return lastUseTime;
}

void markUsed(Instant instant) {
lastUseTime = instant;
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
setActive(null);
Expand Down Expand Up @@ -385,6 +396,9 @@ ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions, boolean
}

TransactionContextImpl newTransaction(Options options) {
// A clock instance is passed in {@code SessionPoolOptions} in order to allow mocking via tests.
final Clock poolMaintainerClock =
spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock();
return TransactionContextImpl.newBuilder()
.setSession(this)
.setOptions(options)
Expand All @@ -396,6 +410,7 @@ TransactionContextImpl newTransaction(Options options) {
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
.setSpan(currentSpan)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
.setClock(poolMaintainerClock == null ? new Clock() : poolMaintainerClock)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,6 @@ void maybeWaitOnMinSessions() {
}
}

/**
* Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8
* Clock.
*/
static class Clock {
Instant instant() {
return Instant.now();
}
}

private abstract static class CachedResultSetSupplier implements Supplier<ResultSet> {
private ResultSet cached;

Expand Down Expand Up @@ -1370,7 +1360,6 @@ PooledSession get(final boolean eligibleForLongRunning) {

class PooledSession implements Session {
@VisibleForTesting SessionImpl delegate;
private volatile Instant lastUseTime;
private volatile SpannerException lastException;
private volatile boolean allowReplacing = true;

Expand Down Expand Up @@ -1409,7 +1398,9 @@ class PooledSession implements Session {
private PooledSession(SessionImpl delegate) {
this.delegate = delegate;
this.state = SessionState.AVAILABLE;
this.lastUseTime = clock.instant();

// initialise the lastUseTime field for each session.
this.markUsed();
}

int getChannel() {
Expand Down Expand Up @@ -1631,7 +1622,7 @@ private void markClosing() {
}

void markUsed() {
lastUseTime = clock.instant();
delegate.markUsed(clock.instant());
}

@Override
Expand Down Expand Up @@ -1827,7 +1818,7 @@ private void removeIdleSessions(Instant currTime) {
Iterator<PooledSession> iterator = sessions.descendingIterator();
while (iterator.hasNext()) {
PooledSession session = iterator.next();
if (session.lastUseTime.isBefore(minLastUseTime)) {
if (session.delegate.getLastUseTime().isBefore(minLastUseTime)) {
if (session.state != SessionState.CLOSING) {
boolean isRemoved = removeFromPool(session);
if (isRemoved) {
Expand Down Expand Up @@ -1929,7 +1920,8 @@ private void removeLongRunningSessions(
// collection is populated only when the get() method in {@code PooledSessionFuture} is
// called.
final PooledSession session = sessionFuture.get();
final Duration durationFromLastUse = Duration.between(session.lastUseTime, currentTime);
final Duration durationFromLastUse =
Duration.between(session.delegate.getLastUseTime(), currentTime);
if (!session.eligibleForLongRunning
&& durationFromLastUse.compareTo(
inactiveTransactionRemovalOptions.getIdleTimeThreshold())
Expand Down Expand Up @@ -2327,7 +2319,7 @@ private PooledSession findSessionToKeepAlive(
&& (numChecked + numAlreadyChecked)
< (options.getMinSessions() + options.getMaxIdleSessions() - numSessionsInUse)) {
PooledSession session = iterator.next();
if (session.lastUseTime.isBefore(keepAliveThreshold)) {
if (session.delegate.getLastUseTime().isBefore(keepAliveThreshold)) {
iterator.remove();
return session;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.google.cloud.spanner;

import com.google.cloud.spanner.SessionPool.Clock;
import com.google.cloud.spanner.SessionPool.Position;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,19 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
@VisibleForTesting
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {
static class Builder extends AbstractReadContext.Builder<Builder, TransactionContextImpl> {

private Clock clock = new Clock();
private ByteString transactionId;
private Options options;
private boolean trackTransactionStarter;

private Builder() {}

Builder setClock(Clock clock) {
this.clock = Preconditions.checkNotNull(clock);
return self();
}

Builder setTransactionId(ByteString transactionId) {
this.transactionId = transactionId;
return self();
Expand Down Expand Up @@ -189,13 +196,15 @@ public void removeListener(Runnable listener) {
volatile ByteString transactionId;

private CommitResponse commitResponse;
private final Clock clock;

private TransactionContextImpl(Builder builder) {
super(builder);
this.transactionId = builder.transactionId;
this.trackTransactionStarter = builder.trackTransactionStarter;
this.options = builder.options;
this.finishedAsyncOperations.set(null);
this.clock = builder.clock;
}

@Override
Expand Down Expand Up @@ -389,6 +398,7 @@ public void run() {
tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span).startSpan();
final ApiFuture<com.google.spanner.v1.CommitResponse> commitFuture =
rpc.commitAsync(commitRequest, session.getOptions());
session.markUsed(clock.instant());
commitFuture.addListener(
tracer.withSpan(
opSpan,
Expand Down Expand Up @@ -463,12 +473,15 @@ ApiFuture<Empty> rollbackAsync() {
// is still in flight. That transaction will then automatically be terminated by the server.
if (transactionId != null) {
span.addAnnotation("Starting Rollback");
return rpc.rollbackAsync(
RollbackRequest.newBuilder()
.setSession(session.getName())
.setTransactionId(transactionId)
.build(),
session.getOptions());
ApiFuture<Empty> apiFuture =
rpc.rollbackAsync(
RollbackRequest.newBuilder()
.setSession(session.getName())
.setTransactionId(transactionId)
.build(),
session.getOptions());
session.markUsed(clock.instant());
return apiFuture;
} else {
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}
Expand Down Expand Up @@ -723,6 +736,7 @@ private ResultSet internalExecuteUpdate(
try {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader());
session.markUsed(clock.instant());
if (resultSet.getMetadata().hasTransaction()) {
onTransactionMetadata(
resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin());
Expand Down Expand Up @@ -753,6 +767,7 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... o
// commit.
increaseAsyncOperations();
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), isRouteToLeader());
session.markUsed(clock.instant());
} catch (Throwable t) {
decreaseAsyncOperations();
throw t;
Expand Down Expand Up @@ -824,6 +839,7 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
try {
com.google.spanner.v1.ExecuteBatchDmlResponse response =
rpc.executeBatchDml(builder.build(), session.getOptions());
session.markUsed(clock.instant());
long[] results = new long[response.getResultSetsCount()];
for (int i = 0; i < response.getResultSetsCount(); ++i) {
results[i] = response.getResultSets(i).getStats().getRowCountExact();
Expand Down Expand Up @@ -863,6 +879,7 @@ public ApiFuture<long[]> batchUpdateAsync(
// commit.
increaseAsyncOperations();
response = rpc.executeBatchDmlAsync(builder.build(), session.getOptions());
session.markUsed(clock.instant());
} catch (Throwable t) {
decreaseAsyncOperations();
throw t;
Expand Down
Loading

0 comments on commit e75a281

Please sign in to comment.