diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index c67c008467..94b6de149c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -191,6 +191,13 @@ public void setUp() { .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()) .setTrackTransactionStarter() + // The extra BeginTransaction RPC for multiplexed session read-write is causing + // unexpected behavior in tests having a mock on the BeginTransaction RPC. Therefore, + // this is being skipped. + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setSkipVerifyingBeginTransactionForMuxRW(true) + .build()) .build() .getService(); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbortedTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbortedTest.java index 00e396c498..151ac89e3a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbortedTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbortedTest.java @@ -72,6 +72,8 @@ public void testCommitAborted() { AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // verify that the there is no test record try (ResultSet rs = connection.executeQuery(Statement.of("SELECT COUNT(*) AS C FROM TEST WHERE ID=1"))) { @@ -112,6 +114,8 @@ public void testCommitAbortedDuringUpdateWithReturning() { AbortInterceptor interceptor = new AbortInterceptor(0); try (ITConnection connection = createConnection(interceptor, new CountTransactionRetryListener())) { + interceptor.setUsingMultiplexedSession( + isMultiplexedSessionsEnabledForRW(connection.getSpanner())); // verify that the there is no test record try (ResultSet rs = connection.executeQuery(Statement.of("SELECT COUNT(*) AS C FROM TEST WHERE ID=1"))) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java index fa3ab00b13..ac6a6ecbc7 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java @@ -327,4 +327,11 @@ boolean isMultiplexedSessionsEnabled(Spanner spanner) { } return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession(); } + + boolean isMultiplexedSessionsEnabledForRW(Spanner spanner) { + if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) { + return false; + } + return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW(); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ITAbstractSpannerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ITAbstractSpannerTest.java index 5f2d88ac93..7bf6a670d9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ITAbstractSpannerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ITAbstractSpannerTest.java @@ -95,6 +95,8 @@ static ExecutionStep of(StatementExecutionStep step) { private boolean onlyInjectOnce = false; private final Random random = new Random(); + private boolean usingMultiplexedsession = false; + public AbortInterceptor(double probability) { Preconditions.checkArgument(probability >= 0.0D && probability <= 1.0D); this.probability = probability; @@ -110,6 +112,14 @@ public void setOnlyInjectOnce(boolean value) { this.onlyInjectOnce = value; } + /** + * Set this value to true if a multiplexed session is being used. Determining this directly from + * TransactionManagerImpl is challenging as it is a private class. + */ + public void setUsingMultiplexedSession(boolean value) { + this.usingMultiplexedsession = value; + } + protected boolean shouldAbort(String statement, ExecutionStep step) { return probability > random.nextDouble(); } @@ -133,27 +143,35 @@ public void intercept( return; } Class cls = Class.forName("com.google.cloud.spanner.TransactionManagerImpl"); - Class cls2 = - Class.forName("com.google.cloud.spanner.SessionPool$AutoClosingTransactionManager"); - Field delegateField = cls2.getDeclaredField("delegate"); - delegateField.setAccessible(true); - watch = watch.reset().start(); - while (delegateField.get(tx) == null && watch.elapsed(TimeUnit.MILLISECONDS) < 100) { - Thread.sleep(1L); - } - TransactionManager delegate = (TransactionManager) delegateField.get(tx); - if (delegate == null) { - return; + if (usingMultiplexedsession) { + Field stateField = cls.getDeclaredField("txnState"); + stateField.setAccessible(true); + tx.rollback(); + stateField.set(tx, TransactionState.ABORTED); + } else { + Class cls2 = + Class.forName( + "com.google.cloud.spanner.SessionPool$AutoClosingTransactionManager"); + Field delegateField = cls2.getDeclaredField("delegate"); + delegateField.setAccessible(true); + watch = watch.reset().start(); + while (delegateField.get(tx) == null && watch.elapsed(TimeUnit.MILLISECONDS) < 100) { + Thread.sleep(1L); + } + TransactionManager delegate = (TransactionManager) delegateField.get(tx); + if (delegate == null) { + return; + } + Field stateField = cls.getDeclaredField("txnState"); + stateField.setAccessible(true); + + // First rollback the delegate, and then pretend it aborted. + // We should call rollback on the delegate and not the wrapping + // AutoClosingTransactionManager, as the latter would cause the session to be returned + // to the session pool. + delegate.rollback(); + stateField.set(delegate, TransactionState.ABORTED); } - Field stateField = cls.getDeclaredField("txnState"); - stateField.setAccessible(true); - - // First rollback the delegate, and then pretend it aborted. - // We should call rollback on the delegate and not the wrapping - // AutoClosingTransactionManager, as the latter would cause the session to be returned - // to the session pool. - delegate.rollback(); - stateField.set(delegate, TransactionState.ABORTED); } catch (Exception e) { throw new RuntimeException(e); }