diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index dda12b60d64..57feabbfcca 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -20,6 +20,7 @@ import com.google.spanner.v1.DirectedReadOptions; import com.google.spanner.v1.RequestOptions.Priority; import java.io.Serializable; +import java.time.Duration; import java.util.Objects; /** Specifies options for various spanner operations */ @@ -140,6 +141,11 @@ public static ReadQueryUpdateTransactionOption priority(RpcPriority priority) { return new PriorityOption(priority); } + public static ReadQueryUpdateTransactionOption maxCommitDelay(Duration maxCommitDelay) { + Preconditions.checkArgument(!maxCommitDelay.isNegative(), "maxCommitDelay should be positive"); + return new MaxCommitDelayOption(maxCommitDelay); + } + /** * Specifying this will cause the reads, queries, updates and writes operations statistics * collection to be grouped by tag. @@ -247,6 +253,21 @@ void appendToOptions(Options options) { static final CommitStatsOption COMMIT_STATS_OPTION = new CommitStatsOption(); + /** Option to request {@link MaxCommitDelayOption} for read/write transactions. */ + static final class MaxCommitDelayOption extends InternalOption + implements ReadQueryUpdateTransactionOption { + final Duration maxCommitDelay; + + MaxCommitDelayOption(Duration maxCommitDelay) { + this.maxCommitDelay = maxCommitDelay; + } + + @Override + void appendToOptions(Options options) { + options.maxCommitDelay = maxCommitDelay; + } + } + /** Option to request Optimistic Concurrency Control for read/write transactions. */ static final class OptimisticLockOption extends InternalOption implements TransactionOption { @Override @@ -354,6 +375,9 @@ void appendToOptions(Options options) { } private boolean withCommitStats; + + private Duration maxCommitDelay; + private Long limit; private Integer prefetchChunks; private Integer bufferRows; @@ -375,6 +399,14 @@ boolean withCommitStats() { return withCommitStats; } + boolean hasMaxCommitDelay() { + return maxCommitDelay != null; + } + + Duration maxCommitDelay() { + return maxCommitDelay; + } + boolean hasLimit() { return limit != null; } @@ -481,6 +513,9 @@ public String toString() { if (withCommitStats) { b.append("withCommitStats: ").append(withCommitStats).append(' '); } + if (maxCommitDelay != null) { + b.append("maxCommitDelay: ").append(maxCommitDelay).append(' '); + } if (limit != null) { b.append("limit: ").append(limit).append(' '); } @@ -533,6 +568,7 @@ public boolean equals(Object o) { Options that = (Options) o; return Objects.equals(withCommitStats, that.withCommitStats) + && Objects.equals(maxCommitDelay, that.maxCommitDelay) && (!hasLimit() && !that.hasLimit() || hasLimit() && that.hasLimit() && Objects.equals(limit(), that.limit())) && (!hasPrefetchChunks() && !that.hasPrefetchChunks() @@ -562,6 +598,9 @@ public int hashCode() { if (withCommitStats) { result = 31 * result + 1231; } + if (maxCommitDelay != null) { + result = 31 * result + maxCommitDelay.hashCode(); + } if (limit != null) { result = 31 * result + limit.hashCode(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 53bf37feb05..92e75e332e7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -35,6 +35,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; +import com.google.protobuf.Duration; import com.google.protobuf.Empty; import com.google.spanner.v1.BatchWriteRequest; import com.google.spanner.v1.BatchWriteResponse; @@ -60,6 +61,7 @@ * users need not be aware of the actual session management, pooling and handling. */ class SessionImpl implements Session { + private static final Tracer tracer = Tracing.getTracer(); /** Keep track of running transactions on this session per thread. */ @@ -86,8 +88,10 @@ static TransactionOptions createReadWriteTransactionOptions(Options options) { * only have one such transaction active at a time. */ interface SessionTransaction { + /** Invalidates the transaction, generally because a new one has been started on the session. */ void invalidate(); + /** Registers the current span on the transaction. */ void setSpan(Span span); } @@ -176,16 +180,24 @@ public CommitResponse writeAtLeastOnceWithOptions( setActive(null); List mutationsProto = new ArrayList<>(); Mutation.toProto(mutations, mutationsProto); + Options options = Options.fromTransactionOptions(transactionOptions); final CommitRequest.Builder requestBuilder = CommitRequest.newBuilder() .setSession(name) - .setReturnCommitStats( - Options.fromTransactionOptions(transactionOptions).withCommitStats()) + .setReturnCommitStats(options.withCommitStats()) .addAllMutations(mutationsProto) .setSingleUseTransaction( TransactionOptions.newBuilder() .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())); + if (options.hasMaxCommitDelay()) { + requestBuilder.setMaxCommitDelay( + Duration.newBuilder() + .setSeconds(options.maxCommitDelay().getSeconds()) + .setNanos(options.maxCommitDelay().getNano()) + .build()); + } RequestOptions commitRequestOptions = getRequestOptions(transactionOptions); + if (commitRequestOptions != null) { requestBuilder.setRequestOptions(commitRequestOptions); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 21c74a400f0..5a0cd3618e8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -71,6 +71,7 @@ /** Default implementation of {@link TransactionRunner}. */ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner { + private static final Tracer tracer = Tracing.getTracer(); private static final Logger txnLogger = Logger.getLogger(TransactionRunner.class.getName()); /** @@ -84,6 +85,7 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner { @VisibleForTesting static class TransactionContextImpl extends AbstractReadContext implements TransactionContext { + static class Builder extends AbstractReadContext.Builder { private Clock clock = new Clock(); @@ -131,6 +133,7 @@ static Builder newBuilder() { */ private class TransactionContextAsyncResultSetImpl extends ForwardingAsyncResultSet implements ListenableAsyncResultSet { + private TransactionContextAsyncResultSetImpl(ListenableAsyncResultSet delegate) { super(delegate); } @@ -339,6 +342,13 @@ ApiFuture commitAsync() { } builder.setRequestOptions(requestOptionsBuilder.build()); } + if (options.hasMaxCommitDelay()) { + builder.setMaxCommitDelay( + com.google.protobuf.Duration.newBuilder() + .setSeconds(options.maxCommitDelay().getSeconds()) + .setNanos(options.maxCommitDelay().getNano()) + .build()); + } synchronized (lock) { if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) { finishOps = SettableApiFuture.create(); @@ -354,6 +364,7 @@ ApiFuture commitAsync() { } private final class CommitRunnable implements Runnable { + private final SettableApiFuture res; private final ApiFuture prev; private final CommitRequest.Builder requestBuilder; @@ -575,7 +586,9 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude @Nullable String getTransactionTag() { - if (this.options.hasTag()) return this.options.tag(); + if (this.options.hasTag()) { + return this.options.tag(); + } return null; } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 2c2d5a1a139..e527660cfa8 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -133,6 +133,7 @@ @RunWith(JUnit4.class) public class DatabaseClientImplTest { + private static final String TEST_PROJECT = "my-project"; private static final String TEST_INSTANCE = "my-instance"; private static final String TEST_DATABASE = "my-database"; @@ -3635,6 +3636,112 @@ public void testAsyncTransactionManagerCommitWithPriority() { assertEquals(Priority.PRIORITY_HIGH, request.getRequestOptions().getPriority()); } + @Test + public void testCommitWithoutMaxCommitDelay() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(); + runner.run( + transaction -> { + transaction.buffer(Mutation.delete("TEST", KeySet.all())); + return null; + }); + + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(requests).hasSize(1); + CommitRequest request = requests.get(0); + assertFalse(request.hasMaxCommitDelay()); + } + + @Test + public void testCommitWithMaxCommitDelay() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = + client.readWriteTransaction(Options.maxCommitDelay(java.time.Duration.ofMillis(100))); + runner.run( + transaction -> { + transaction.buffer(Mutation.delete("TEST", KeySet.all())); + return null; + }); + + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(requests).hasSize(1); + CommitRequest request = requests.get(0); + assertNotNull(request.getMaxCommitDelay()); + assertEquals( + com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(), + request.getMaxCommitDelay()); + } + + @Test + public void testTransactionManagerCommitWithMaxCommitDelay() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionManager manager = + client.transactionManager(Options.maxCommitDelay(java.time.Duration.ofMillis(100))); + TransactionContext transaction = manager.begin(); + transaction.buffer(Mutation.delete("TEST", KeySet.all())); + manager.commit(); + + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(requests).hasSize(1); + CommitRequest request = requests.get(0); + assertNotNull(request.getMaxCommitDelay()); + assertEquals( + com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(), + request.getMaxCommitDelay()); + } + + @Test + public void testAsyncRunnerCommitWithMaxCommitDelay() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + AsyncRunner runner = client.runAsync(Options.maxCommitDelay(java.time.Duration.ofMillis(100))); + get( + runner.runAsync( + txn -> { + txn.buffer(Mutation.delete("TEST", KeySet.all())); + return ApiFutures.immediateFuture(null); + }, + executor)); + + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(requests).hasSize(1); + CommitRequest request = requests.get(0); + assertNotNull(request.getMaxCommitDelay()); + assertEquals( + com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(), + request.getMaxCommitDelay()); + } + + @Test + public void testAsyncTransactionManagerCommitWithMaxCommitDelay() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (AsyncTransactionManager manager = + client.transactionManagerAsync(Options.maxCommitDelay(java.time.Duration.ofMillis(100)))) { + TransactionContextFuture transaction = manager.beginAsync(); + get( + transaction + .then( + (txn, input) -> { + txn.buffer(Mutation.delete("TEST", KeySet.all())); + return ApiFutures.immediateFuture(null); + }, + executor) + .commitAsync()); + } + + List requests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertThat(requests).hasSize(1); + CommitRequest request = requests.get(0); + assertNotNull(request.getMaxCommitDelay()); + assertEquals( + com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(), + request.getMaxCommitDelay()); + } + @Test public void singleUseNoAction_ClearsCheckedOutSession() { DatabaseClientImpl client = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java index 51c47bd84cc..17f5f8e0ec9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java @@ -304,6 +304,22 @@ public void batchWriteAtLeastOnce() { } } + @Test + public void testWriteWithMaxCommitDelay() { + CommitResponse response = + client.writeWithOptions( + Collections.singletonList( + Mutation.newInsertOrUpdateBuilder("T") + .set("K") + .to(lastKey = uniqueString()) + .set("StringValue") + .to("v1") + .build()), + Options.maxCommitDelay(java.time.Duration.ofMillis(100))); + assertNotNull(response); + assertNotNull(response.getCommitTimestamp()); + } + @Test public void testWriteReturnsCommitStats() { assumeFalse("Emulator does not return commit statistics", isUsingEmulator());