diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 5d3beb2e7b2..f7893cd83d4 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -631,4 +631,11 @@ com/google/cloud/spanner/connection/Connection void setDirectedRead(com.google.spanner.v1.DirectedReadOptions) + + + 7005 + com/google/cloud/spanner/DatabaseClient + long executePartitionedUpdate(com.google.cloud.spanner.Statement, com.google.cloud.spanner.Options$UpdateOption[]) + long executePartitionedUpdate(com.google.cloud.spanner.Statement, com.google.cloud.spanner.Options$PartitionedUpdateOption[]) + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java index 06237131458..68cc604cfd8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java @@ -18,6 +18,7 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Options.PartitionedUpdateOption; import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; @@ -600,5 +601,5 @@ ServerStream batchWriteAtLeastOnce( *

Given the above, Partitioned DML is good fit for large, database-wide, operations that are * idempotent, such as deleting old rows from a very large table. */ - long executePartitionedUpdate(Statement stmt, UpdateOption... options); + long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index b63ad379305..f19d492b492 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -18,8 +18,8 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Options.PartitionedUpdateOption; import com.google.cloud.spanner.Options.TransactionOption; -import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SpannerImpl.ClosedException; import com.google.common.annotations.VisibleForTesting; @@ -240,7 +240,8 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti } @Override - public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) { + public long executePartitionedUpdate( + final Statement stmt, final PartitionedUpdateOption... options) { ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION); try (IScope s = tracer.withSpan(span)) { return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options)); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index 3dbd0c1cda3..ca60eea4c96 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -61,9 +61,6 @@ public interface ReadOption {} public interface ReadQueryUpdateTransactionOption extends ReadOption, QueryOption, UpdateOption, TransactionOption {} - /** Marker interface to mark options applicable to Update and Write operations */ - public interface UpdateTransactionOption extends UpdateOption, TransactionOption {} - /** * Marker interface to mark options applicable to Create, Update and Delete operations in admin * API. @@ -86,8 +83,15 @@ public interface QueryOption {} /** Marker interface to mark options applicable to write operations */ public interface TransactionOption {} + /** Marker interface to mark options applicable to partitioned update */ + public interface PartitionedUpdateOption {} + /** Marker interface to mark options applicable to update operation. */ - public interface UpdateOption {} + public interface UpdateOption extends PartitionedUpdateOption {} + + /** Marker interface to mark options applicable to partitioned update and write operations */ + public interface PartitionedUpdateTransactionOption + extends PartitionedUpdateOption, TransactionOption {} /** Marker interface to mark options applicable to list operations in admin API. */ public interface ListOption {} @@ -118,7 +122,7 @@ public static TransactionOption optimisticLock() { * being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or * unset. */ - public static UpdateTransactionOption excludeTxnFromChangeStreams() { + public static PartitionedUpdateTransactionOption excludeTxnFromChangeStreams() { return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION; } @@ -297,7 +301,7 @@ void appendToOptions(Options options) { /** Option to request the transaction to be excluded from change streams. */ static final class ExcludeTxnFromChangeStreamsOption extends InternalOption - implements UpdateTransactionOption { + implements PartitionedUpdateTransactionOption { @Override void appendToOptions(Options options) { options.withExcludeTxnFromChangeStreams = true; @@ -744,6 +748,16 @@ static Options fromUpdateOptions(UpdateOption... options) { return updateOptions; } + static Options fromPartitinoedUpdateOptions(PartitionedUpdateOption... options) { + Options partitionedUpdateOptions = new Options(); + for (PartitionedUpdateOption option : options) { + if (option instanceof InternalOption) { + ((InternalOption) option).appendToOptions(partitionedUpdateOptions); + } + } + return partitionedUpdateOptions; + } + static Options fromTransactionOptions(TransactionOption... options) { Options transactionOptions = new Options(); for (TransactionOption option : options) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index d498bb232a1..c811cedb845 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -25,7 +25,7 @@ import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.UnavailableException; -import com.google.cloud.spanner.Options.UpdateOption; +import com.google.cloud.spanner.Options.PartitionedUpdateOption; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; @@ -71,7 +71,9 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction * last seen resume token if the server returns any. */ long executeStreamingPartitionedUpdate( - final Statement statement, final Duration timeout, final UpdateOption... updateOptions) { + final Statement statement, + final Duration timeout, + final PartitionedUpdateOption... partitionedUpdateOptions) { checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session"); LOGGER.log(Level.FINER, "Starting PartitionedUpdate statement"); @@ -79,7 +81,7 @@ long executeStreamingPartitionedUpdate( boolean foundStats = false; long updateCount = 0L; Stopwatch stopwatch = Stopwatch.createStarted(ticker); - Options options = Options.fromUpdateOptions(updateOptions); + Options options = Options.fromPartitinoedUpdateOptions(partitionedUpdateOptions); try { ExecuteSqlRequest request = newTransactionRequestFrom(statement, options); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 83ee2812721..215b16f5bfb 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -26,8 +26,8 @@ import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction; import com.google.cloud.spanner.AbstractReadContext.SingleReadContext; import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction; +import com.google.cloud.spanner.Options.PartitionedUpdateOption; import com.google.cloud.spanner.Options.TransactionOption; -import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionId; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; @@ -140,7 +140,7 @@ void markUsed(Instant instant) { } @Override - public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { + public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options) { setActive(null); PartitionedDmlTransaction txn = new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker()); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 8058802a8fc..ec58a728c04 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -52,6 +52,7 @@ import com.google.cloud.Tuple; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; +import com.google.cloud.spanner.Options.PartitionedUpdateOption; import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.ReadOption; import com.google.cloud.spanner.Options.TransactionOption; @@ -1270,7 +1271,7 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti } @Override - public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { + public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options) { try { return get(true).executePartitionedUpdate(stmt, options); } finally { @@ -1470,7 +1471,7 @@ public ServerStream batchWriteAtLeastOnce( } @Override - public long executePartitionedUpdate(Statement stmt, UpdateOption... options) + public long executePartitionedUpdate(Statement stmt, PartitionedUpdateOption... options) throws SpannerException { try { markUsed(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java index 8c9a5d957e8..5c223c80eb5 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java @@ -460,6 +460,37 @@ public void testUpdateOptionsWithPriorityHashCode() { assertNotEquals(optionsWithHighPriority1.hashCode(), optionsWithMediumPriority.hashCode()); } + @Test + public void testPartitionedUpdateOptions() { + Options option1 = Options.fromPartitinoedUpdateOptions(); + Options option2 = Options.fromPartitinoedUpdateOptions(); + assertEquals(option1, option2); + assertEquals(option1.hashCode(), option2.hashCode()); + assertEquals(option1.toString(), option2.toString()); + assertEquals("", option1.toString()); + } + + @Test + public void testPartitionedUpdateOptionsWithPriority() { + Options optionsWithHighPriority1 = + Options.fromPartitinoedUpdateOptions(Options.priority(RpcPriority.HIGH)); + assertEquals(Priority.PRIORITY_HIGH, optionsWithHighPriority1.priority()); + assertEquals("priority: HIGH ", optionsWithHighPriority1.toString()); + + Options optionsWithHighPriority2 = + Options.fromPartitinoedUpdateOptions(Options.priority(RpcPriority.HIGH)); + assertEquals(optionsWithHighPriority1, optionsWithHighPriority2); + assertEquals(optionsWithHighPriority1.hashCode(), optionsWithHighPriority2.hashCode()); + assertEquals(optionsWithHighPriority1.toString(), optionsWithHighPriority2.toString()); + + Options optionsWithMediumPriority = + Options.fromPartitinoedUpdateOptions(Options.priority(RpcPriority.MEDIUM)); + assertEquals(Priority.PRIORITY_MEDIUM, optionsWithMediumPriority.priority()); + assertEquals("priority: MEDIUM ", optionsWithMediumPriority.toString()); + assertNotEquals(optionsWithHighPriority1, optionsWithMediumPriority); + assertNotEquals(optionsWithHighPriority1.hashCode(), optionsWithMediumPriority.hashCode()); + } + @Test public void testQueryOptionsEquality() { Options option1 = Options.fromQueryOptions(); @@ -617,6 +648,26 @@ public void updateEquality() { assertThat(o2.equals(o3)).isFalse(); } + @Test + public void partitionedUpdateWithTag() { + String tag1 = "app=spanner,env=test"; + Options o1 = Options.fromPartitinoedUpdateOptions(Options.tag(tag1)); + assertEquals(tag1, o1.tag()); + assertEquals("tag: " + tag1 + " ", o1.toString()); + + Options o2 = Options.fromPartitinoedUpdateOptions(Options.tag(tag1)); + assertEquals(o1, o2); + assertEquals(o1.hashCode(), o2.hashCode()); + assertEquals(o1.toString(), o2.toString()); + + String tag2 = "app=spanner,env=stage"; + Options o3 = Options.fromPartitinoedUpdateOptions(Options.tag(tag2)); + assertEquals("tag: " + tag2 + " ", o3.toString()); + assertNotEquals(o2, o3); + assertNotEquals(o2.hashCode(), o3.hashCode()); + assertNotEquals(o2.toString(), o3.toString()); + } + @Test public void transactionOptionsTest() { String tag = "app=spanner,env=test"; @@ -706,17 +757,17 @@ public void transactionOptionsExcludeTxnFromChangeStreams() { assertNotEquals(option1.hashCode(), option3.hashCode()); assertTrue(option1.withExcludeTxnFromChangeStreams()); - assertThat(option1.toString()).contains("withExcludeTxnFromChangeStreams: true"); + assertEquals("withExcludeTxnFromChangeStreams: true ", option1.toString()); assertNull(option3.withExcludeTxnFromChangeStreams()); - assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true"); + assertEquals("", option3.toString()); } @Test - public void updateOptionsExcludeTxnFromChangeStreams() { - Options option1 = Options.fromUpdateOptions(Options.excludeTxnFromChangeStreams()); - Options option2 = Options.fromUpdateOptions(Options.excludeTxnFromChangeStreams()); - Options option3 = Options.fromUpdateOptions(); + public void partitionedUpdateOptionsExcludeTxnFromChangeStreams() { + Options option1 = Options.fromPartitinoedUpdateOptions(Options.excludeTxnFromChangeStreams()); + Options option2 = Options.fromPartitinoedUpdateOptions(Options.excludeTxnFromChangeStreams()); + Options option3 = Options.fromPartitinoedUpdateOptions(); assertEquals(option1, option2); assertEquals(option1.hashCode(), option2.hashCode()); @@ -724,9 +775,9 @@ public void updateOptionsExcludeTxnFromChangeStreams() { assertNotEquals(option1.hashCode(), option3.hashCode()); assertTrue(option1.withExcludeTxnFromChangeStreams()); - assertThat(option1.toString()).contains("withExcludeTxnFromChangeStreams: true"); + assertEquals("withExcludeTxnFromChangeStreams: true ", option1.toString()); assertNull(option3.withExcludeTxnFromChangeStreams()); - assertThat(option3.toString()).doesNotContain("withExcludeTxnFromChangeStreams: true"); + assertEquals("", option3.toString()); } }