Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(spanner): add precommit token support for mux R/W #3341

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RequestOptions;
Expand Down Expand Up @@ -878,10 +879,19 @@ String getTransactionTag() {
return null;
}

@Nullable
MultiplexedSessionPrecommitToken getLatestPrecommitToken() {
return null;
}

/** This method is called when a statement returned a new transaction as part of its results. */
@Override
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {}

/** This method is called when a response returns a new pre-commit token as part of its results. */
@Override
public void onPrecommitToken(MultiplexedSessionPrecommitToken token){}

@Override
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
this.session.onError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.protobuf.ListValue;
import com.google.protobuf.ProtocolMessageEnum;
import com.google.protobuf.Value.KindCase;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.Transaction;
import java.io.IOException;
import java.io.Serializable;
Expand All @@ -52,6 +53,8 @@ interface Listener {
void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
throws SpannerException;

void onPrecommitToken(MultiplexedSessionPrecommitToken token);

/** Called when the read finishes with an error. Returns the error that should be thrown. */
SpannerException onError(SpannerException e, boolean withBeginTransaction);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
public TransactionRunner readWriteTransaction(TransactionOption... options) {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
try (IScope s = tracer.withSpan(span)) {
return getSession().readWriteTransaction(options);
return getMultiplexedSession().readWriteTransaction(options);
} catch (RuntimeException e) {
span.setStatus(e);
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufR

GrpcResultSet(
CloseableIterator<PartialResultSet> iterator, Listener listener, DecodeMode decodeMode) {
this.iterator = new GrpcValueIterator(iterator);
this.iterator = new GrpcValueIterator(iterator, listener);
this.listener = listener;
this.decodeMode = decodeMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
import com.google.cloud.spanner.AbstractResultSet.Listener;
import com.google.common.collect.AbstractIterator;
import com.google.protobuf.ListValue;
import com.google.protobuf.Value.KindCase;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.ResultSetStats;
Expand All @@ -44,9 +46,12 @@ private enum StreamValue {
private PartialResultSet current;
private int pos;
private ResultSetStats statistics;
private MultiplexedSessionPrecommitToken precommitToken;
private final Listener listener;

GrpcValueIterator(CloseableIterator<PartialResultSet> stream) {
GrpcValueIterator(CloseableIterator<PartialResultSet> stream, Listener listener) {
this.stream = stream;
this.listener = listener;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -154,6 +159,10 @@ private boolean ensureReady(StreamValue requiredValue) throws SpannerException {
ErrorCode.INTERNAL, "Invalid type metadata: " + e.getMessage(), e);
}
}
// Collect precommit token from all PartialResultSet
if(current.hasPrecommitToken()) {
listener.onPrecommitToken(current.getPrecommitToken());
}
if (current.hasStats()) {
statistics = current.getStats();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
return createMultiplexedSessionTransaction(false).readOnlyTransaction(bound);
}

@Override
public TransactionRunner readWriteTransaction(TransactionOption... options) {
return createMultiplexedSessionTransaction(false).readWriteTransaction(options);
}

/**
* It is enough with one executor to maintain the multiplexed sessions in all the clients, as they
* do not need to be updated often, and the maintenance task is light. The core pool size is set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,15 +426,18 @@ public void close() {
}
}

ApiFuture<ByteString> beginTransactionAsync(
Options transactionOptions, boolean routeToLeader, Map<SpannerRpc.Option, ?> channelHint) {
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
ApiFuture<Transaction> beginTransactionAsync(
Options transactionOptions, boolean routeToLeader, Map<SpannerRpc.Option, ?> channelHint, com.google.spanner.v1.Mutation mutation) {
final SettableApiFuture<Transaction> res = SettableApiFuture.create();
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
final BeginTransactionRequest request =
BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(getName())
.setOptions(createReadWriteTransactionOptions(transactionOptions))
.build();
if (sessionReference.getIsMultiplexed()) {
request = request.toBuilder().setMutationKey(mutation).build();
}
final ApiFuture<Transaction> requestFuture;
try (IScope ignore = tracer.withSpan(span)) {
requestFuture = spanner.getRpc().beginTransactionAsync(request, channelHint, routeToLeader);
Expand All @@ -448,7 +451,7 @@ ApiFuture<ByteString> beginTransactionAsync(
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
}
span.end();
res.set(txn.getId());
res.set(txn);
} catch (ExecutionException e) {
span.setStatus(e);
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.spanner.v1.ExecuteBatchDmlResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.ResultSetStats;
Expand Down Expand Up @@ -171,6 +172,11 @@ public void removeListener(Runnable listener) {
@GuardedBy("committingLock")
private volatile boolean committing;

private final Object preCommitTokenLock = new Object();

@GuardedBy("preCommitTokenLock")
private MultiplexedSessionPrecommitToken latestPreCommitToken;

@GuardedBy("lock")
private volatile SettableApiFuture<Void> finishedAsyncOperations = SettableApiFuture.create();

Expand Down Expand Up @@ -268,7 +274,7 @@ void ensureTxn() {
ApiFuture<Void> ensureTxnAsync() {
final SettableApiFuture<Void> res = SettableApiFuture.create();
if (transactionId == null || isAborted()) {
createTxnAsync(res);
createTxnAsync(res, null);
} else {
span.addAnnotation("Transaction Initialized", "Id", transactionId.toStringUtf8());
txnLogger.log(
Expand All @@ -280,19 +286,23 @@ ApiFuture<Void> ensureTxnAsync() {
return res;
}

private void createTxnAsync(final SettableApiFuture<Void> res) {
private void createTxnAsync(final SettableApiFuture<Void> res, com.google.spanner.v1.Mutation mutation) {
span.addAnnotation("Creating Transaction");
final ApiFuture<ByteString> fut =
session.beginTransactionAsync(options, isRouteToLeader(), getTransactionChannelHint());
final ApiFuture<Transaction> fut =
session.beginTransactionAsync(options, isRouteToLeader(), getTransactionChannelHint(), mutation);
fut.addListener(
() -> {
try {
transactionId = fut.get();
Transaction txn = fut.get();
transactionId = txn.getId();
span.addAnnotation("Transaction Creation Done", "Id", transactionId.toStringUtf8());
txnLogger.log(
Level.FINER,
"Started transaction {0}",
txnLogger.isLoggable(Level.FINER) ? transactionId.asReadOnlyByteBuffer() : null);
if (txn.hasPrecommitToken()) {
onPrecommitToken(txn.getPrecommitToken());
}
res.set(null);
} catch (ExecutionException e) {
span.addAnnotation(
Expand Down Expand Up @@ -370,7 +380,7 @@ ApiFuture<CommitResponse> commitAsync() {
synchronized (lock) {
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
finishOps = SettableApiFuture.create();
createTxnAsync(finishOps);
createTxnAsync(finishOps, mutationsProto.get(0));
} else {
finishOps = finishedAsyncOperations;
}
Expand Down Expand Up @@ -423,6 +433,13 @@ public void run() {
}
requestBuilder.setRequestOptions(requestOptionsBuilder.build());
}
if (session.getIsMultiplexed()) {
requestBuilder.setPrecommitToken(getLatestPrecommitToken());
System.out.println("setting precommit token in request");
System.out.println(requestBuilder.getPrecommitToken().getPrecommitToken());

txnLogger.log(Level.ALL, "setting precommit token to commit "+requestBuilder.getPrecommitToken().getPrecommitToken());
}
final CommitRequest commitRequest = requestBuilder.build();
span.addAnnotation("Starting Commit");
final ApiFuture<com.google.spanner.v1.CommitResponse> commitFuture;
Expand Down Expand Up @@ -625,6 +642,17 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
}
}

public void onPrecommitToken(MultiplexedSessionPrecommitToken token){
if(token == null) return;
synchronized (preCommitTokenLock) {
if (this.latestPreCommitToken == null || token.getSeqNum() > this.latestPreCommitToken.getSeqNum()) {
this.latestPreCommitToken = token;
System.out.println("Updating precommit token to "+this.latestPreCommitToken);
txnLogger.log(Level.ALL, "Updating precommit token to "+this.latestPreCommitToken);
}
}
}

@Nullable
String getTransactionTag() {
if (this.options.hasTag()) {
Expand All @@ -633,6 +661,13 @@ String getTransactionTag() {
return null;
}

@Nullable
MultiplexedSessionPrecommitToken getLatestPrecommitToken() {
synchronized (preCommitTokenLock) {
return this.latestPreCommitToken;
}
}

@Override
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
e = super.onError(e, withBeginTransaction);
Expand Down Expand Up @@ -811,6 +846,9 @@ private ResultSet internalExecuteUpdate(
throw new IllegalArgumentException(
"DML response missing stats possibly due to non-DML statement as input");
}
if (resultSet.hasPrecommitToken()) {
onPrecommitToken(resultSet.getPrecommitToken());
}
return resultSet;
} catch (Throwable t) {
throw onError(
Expand Down Expand Up @@ -885,6 +923,9 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... u
resultSet.get().getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
}
if (resultSet.get().hasPrecommitToken()) {
onPrecommitToken(resultSet.get().getPrecommitToken());
}
} catch (Throwable e) {
// Ignore this error here as it is handled by the future that is returned by the
// executeUpdateAsync method.
Expand Down Expand Up @@ -938,6 +979,10 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... update
response.getResultSets(i).getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
}
// TODO(harsha): check if we need to get precommit_token from response.getResultSets
if (response.hasPrecommitToken()) {
onPrecommitToken(response.getPrecommitToken());
}
}

// If one of the DML statements was aborted, we should throw an aborted exception.
Expand Down Expand Up @@ -1004,6 +1049,9 @@ public ApiFuture<long[]> batchUpdateAsync(
builder.getTransaction().hasBegin());
}
}
if (batchDmlResponse.hasPrecommitToken()) {
onPrecommitToken(batchDmlResponse.getPrecommitToken());
}
// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
if (batchDmlResponse.getStatus().getCode() == Code.ABORTED_VALUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1629,6 +1629,7 @@ public final Transaction beginTransaction(String session, TransactionOptions opt
* SessionName.of("[PROJECT]", "[INSTANCE]", "[DATABASE]", "[SESSION]").toString())
* .setOptions(TransactionOptions.newBuilder().build())
* .setRequestOptions(RequestOptions.newBuilder().build())
* .setMutationKey(Mutation.newBuilder().build())
* .build();
* Transaction response = spannerClient.beginTransaction(request);
* }
Expand Down Expand Up @@ -1662,6 +1663,7 @@ public final Transaction beginTransaction(BeginTransactionRequest request) {
* SessionName.of("[PROJECT]", "[INSTANCE]", "[DATABASE]", "[SESSION]").toString())
* .setOptions(TransactionOptions.newBuilder().build())
* .setRequestOptions(RequestOptions.newBuilder().build())
* .setMutationKey(Mutation.newBuilder().build())
* .build();
* ApiFuture<Transaction> future = spannerClient.beginTransactionCallable().futureCall(request);
* // Do something.
Expand Down Expand Up @@ -1911,6 +1913,7 @@ public final CommitResponse commit(
* .setReturnCommitStats(true)
* .setMaxCommitDelay(Duration.newBuilder().build())
* .setRequestOptions(RequestOptions.newBuilder().build())
* .setPrecommitToken(MultiplexedSessionPrecommitToken.newBuilder().build())
* .build();
* CommitResponse response = spannerClient.commit(request);
* }
Expand Down Expand Up @@ -1955,6 +1958,7 @@ public final CommitResponse commit(CommitRequest request) {
* .setReturnCommitStats(true)
* .setMaxCommitDelay(Duration.newBuilder().build())
* .setRequestOptions(RequestOptions.newBuilder().build())
* .setPrecommitToken(MultiplexedSessionPrecommitToken.newBuilder().build())
* .build();
* ApiFuture<CommitResponse> future = spannerClient.commitCallable().futureCall(request);
* // Do something.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@
* <p>The builder of this class is recursive, so contained classes are themselves builders. When
* build() is called, the tree of builders is called to create the complete settings object.
*
* <p>For example, to set the total timeout of createSession to 30 seconds:
* <p>For example, to set the
* [RetrySettings](https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings)
* of createSession:
*
* <pre>{@code
* // This snippet has been automatically generated and should be regarded as a code template only.
Expand All @@ -92,10 +94,21 @@
* .createSessionSettings()
* .getRetrySettings()
* .toBuilder()
* .setTotalTimeout(Duration.ofSeconds(30))
* .setInitialRetryDelayDuration(Duration.ofSeconds(1))
* .setInitialRpcTimeoutDuration(Duration.ofSeconds(5))
* .setMaxAttempts(5)
* .setMaxRetryDelayDuration(Duration.ofSeconds(30))
* .setMaxRpcTimeoutDuration(Duration.ofSeconds(60))
* .setRetryDelayMultiplier(1.3)
* .setRpcTimeoutMultiplier(1.5)
* .setTotalTimeoutDuration(Duration.ofSeconds(300))
* .build());
* SpannerSettings spannerSettings = spannerSettingsBuilder.build();
* }</pre>
*
* Please refer to the [Client Side Retry
* Guide](https://github.com/googleapis/google-cloud-java/blob/main/docs/client_retries.md) for
* additional support in setting retries.
*/
@Generated("by gapic-generator-java")
public class SpannerSettings extends ClientSettings<SpannerSettings> {
Expand Down
Loading
Loading